Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.DataAccess/3.2/VarBinaryStream.cs @ 3155

Last change on this file since 3155 was 3011, checked in by kgrading, 15 years ago

changed the complete DAL to LINQ 2 SQL (with the exception of the job streaming), did a lot of refactoring, Introduced DTOs (that are named DTOs for better understanding), added the spring.NET Interceptor, reintroduced transactions and cleaned up the whole JobResult thing and updated a part of the config merger (#830)

File size: 8.5 KB
Line 
1/* Based on:
2 *   http://www.eggheadcafe.com/conversation.aspx?messageid=29988910&threadid=29988841
3 * */
4
5using System;
6using System.IO;
7using System.Data.SqlClient;
8using System.Data;
9
10namespace HeuristicLab.Hive.Server.DataAccess {
11  /// <summary>
12  /// The VarBinaryStream class inherits from Stream. It uses a
13  /// VarBinarySource class to execute the actual TSQL.
14  /// </summary>
15  public class VarBinaryStream : Stream, IDisposable {
16    private long _position;
17    private readonly VarBinarySource _source;
18
19    public VarBinaryStream(VarBinarySource source) {
20      _position = 0;
21      _source = source;
22    }
23
24    public override bool CanRead {
25      get { return true; }
26    }
27
28    public override bool CanSeek {
29      get { return true; }
30    }
31
32    public override bool CanWrite {
33      get { return true; }
34    }
35
36    public override long Length {
37      get { return _source.Length; }
38    }
39
40    public override long Position {
41      get { return _position; }
42      set { this.Seek(value, SeekOrigin.Begin); }
43    }
44
45    public override void Flush() { }
46
47    public override long Seek(long offset, SeekOrigin origin) {
48      switch (origin) {
49        case SeekOrigin.Begin: {
50            if ((offset < 0) && (offset > this.Length))
51              throw new ArgumentException("Invalid seek origin.");
52            _position = offset;
53            break;
54          }
55        case SeekOrigin.End: {
56            if ((offset > 0) && (offset < -this.Length))
57              throw new ArgumentException("Invalid seek origin.");
58            _position = this.Length - offset;
59            break;
60          }
61        case SeekOrigin.Current: {
62            if ((_position + offset > this.Length))
63              throw new ArgumentException("Invalid seek origin.");
64            _position = _position + offset;
65            break;
66          }
67        default: {
68            throw new ArgumentOutOfRangeException("origin");
69          }
70      }
71      return _position;
72    }
73
74    public override void SetLength(long value) {
75      throw new NotSupportedException();
76    }
77
78    public override int Read(byte[] buffer, int offset, int count) {
79      if (buffer == null)
80        throw new ArgumentNullException("buffer");
81      if (offset < 0)
82        throw new ArgumentOutOfRangeException("offset");
83      if (count < 0)
84        throw new ArgumentOutOfRangeException("count");
85      if (buffer.Length - offset < count)
86        throw new ArgumentException("Offset and length were out of bounds for the array");
87
88      byte[] data = _source.Read(Position, count);
89      if (data == null)
90        return 0;
91
92      Buffer.BlockCopy(data, 0, buffer, offset, data.Length);
93      _position += data.Length;
94      return data.Length;
95    }
96
97    public override void Write(byte[] buffer, int offset, int count) {
98      if (buffer == null)
99        throw new ArgumentNullException("buffer");
100      if (offset < 0)
101        throw new ArgumentOutOfRangeException("offset");
102      if (count < 0)
103        throw new ArgumentOutOfRangeException("count");
104      if (buffer.Length - offset < count)
105        throw new ArgumentException("Offset and length were out of bounds for the array");
106
107      byte[] data = GetWriteBuffer(buffer, count, offset);
108      _source.Write(data, _position, count);
109      _position += count;
110    }
111
112    private static byte[] GetWriteBuffer(byte[] buffer, int count, int
113    offset) {
114      if (buffer.Length == count)
115        return buffer;
116      byte[] data = new byte[count];
117      Buffer.BlockCopy(buffer, offset, data, 0, count);
118      return data;
119    }
120
121    public override void Close() {
122      _source.Close();
123    }
124
125    protected override void Dispose(bool disposing) {
126      if (!disposing) {
127        if (_source != null) {
128          _source.Close();
129          _source.Dispose();
130        }
131      }
132      base.Dispose(disposing);
133    }
134  }
135
136
137  public class VarBinarySource : IDisposable {
138    private SqlCommand _readCommand;
139    private SqlCommand _writeCommand;
140    private SqlConnection _connection;
141    private SqlTransaction _transaction;
142    private bool _ownedTransaction = false;
143    private bool _ownedConnection = false;
144    private readonly long _length;
145
146    public VarBinarySource(SqlConnection connection,
147      SqlTransaction transaction,
148      string table, string
149    dataColumn, string keyColumn, Guid key) {
150      _connection = connection;
151     
152      if (_connection.State == ConnectionState.Closed) {
153        _connection.Open();
154        _ownedConnection = true;
155
156        _transaction =
157          _connection.BeginTransaction(IsolationLevel.RepeatableRead) as SqlTransaction;
158        _ownedTransaction = true;
159      } else {
160        _ownedConnection = false;
161
162        if (transaction != null) {
163          _transaction = transaction;
164
165          _ownedTransaction = false;
166        } else {
167          _transaction =
168            _connection.BeginTransaction(IsolationLevel.ReadCommitted) as SqlTransaction;
169
170          _ownedTransaction = true;
171        }
172      }
173
174      _length = GetLength(connection, table, dataColumn, keyColumn, key);
175      _readCommand = CreateReadCommand(connection, table, dataColumn,
176      keyColumn, key);
177      _writeCommand = CreateWriteCommand(connection, table, dataColumn,
178      keyColumn, key);
179    }
180
181    public long Length {
182      get { return _length; }
183    }
184
185    private static SqlCommand CreateReadCommand(SqlConnection connection,
186    string table, string dataColumn, string keyColumn,
187    Guid key) {
188      SqlCommand readCommand = connection.CreateCommand();
189      readCommand.CommandText = string.Format(@"
190select substring({0}, @offset, @length)
191from {1}
192where {2} = @key", dataColumn, table, keyColumn);
193      readCommand.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
194      readCommand.Parameters.Add("@offset", SqlDbType.BigInt);
195      readCommand.Parameters.Add("@length", SqlDbType.BigInt);
196      return readCommand;
197    }
198
199    private static SqlCommand CreateWriteCommand(SqlConnection connection,
200    string table, string dataColumn, string keyColumn,
201    Guid key) {
202      SqlCommand writecommand = connection.CreateCommand();
203      writecommand.CommandText = string.Format(@"
204update {0}
205set {1}.write(@buffer, @offset, @length)
206where {2} = @key", table, dataColumn, keyColumn);
207      writecommand.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
208      writecommand.Parameters.Add("@offset", SqlDbType.BigInt);
209      writecommand.Parameters.Add("@length", SqlDbType.BigInt);
210      writecommand.Parameters.Add("@buffer", SqlDbType.VarBinary);
211      return writecommand;
212    }
213
214    private long GetLength(SqlConnection connection,
215      string table,
216    string dataColumn, string keyColumn,
217    Guid key) {
218      using (SqlCommand command = connection.CreateCommand()) {
219        command.Transaction = _transaction;
220
221        SqlParameter length = command.Parameters.Add("@length",
222        SqlDbType.BigInt);
223        length.Direction = ParameterDirection.Output;
224
225        command.CommandText = string.Format(@"
226select @length = cast(datalength({0}) as bigint)
227from {1}
228where {2} = @key", dataColumn, table, keyColumn);
229        command.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
230        command.ExecuteNonQuery();
231        return length.Value == DBNull.Value ? 0 : (long)length.Value;
232      }
233    }
234
235    public byte[] Read(long offset, long length) {
236      _readCommand.Transaction = _transaction;
237
238      // substring is 1-based.
239      _readCommand.Parameters["@offset"].Value = offset + 1;
240      _readCommand.Parameters["@length"].Value = length;
241      return (byte[])_readCommand.ExecuteScalar();
242    }
243
244    public void Write(byte[] buffer, long offset, long length) {
245      _writeCommand.Transaction = _transaction;
246
247      _writeCommand.Parameters["@buffer"].Value = buffer;
248      _writeCommand.Parameters["@offset"].Value = offset;
249      _writeCommand.Parameters["@length"].Value = length;
250      _writeCommand.ExecuteNonQuery();
251    }
252
253    public void Close() {
254      if (_transaction != null) {
255        if (_ownedTransaction)
256          _transaction.Commit();
257        _transaction = null;
258      }
259
260      if (_connection != null) {
261        if (_ownedConnection)
262          _connection.Close();
263        _connection = null;
264      }
265    }
266
267    public void Dispose() {
268      if (_readCommand != null) {
269        _readCommand.Dispose();
270      }
271      if (_writeCommand != null) {
272        _writeCommand.Dispose();
273      }
274    }
275  }
276}
277
Note: See TracBrowser for help on using the repository browser.