Free cookie consent management tool by TermsFeed Policy Generator

source: branches/Persistence Test/HeuristicLab.DataAccess.ADOHelper/3.2/VarBinaryStream.cs @ 3043

Last change on this file since 3043 was 2123, checked in by svonolfe, 16 years ago

Avoided possible race conditions when streaming data from/into the DB (#680)

File size: 9.1 KB
Line 
1/* Based on:
2 *   http://www.eggheadcafe.com/conversation.aspx?messageid=29988910&threadid=29988841
3 * */
4
5using System;
6using System.IO;
7using System.Data.SqlClient;
8using System.Data;
9
10namespace HeuristicLab.DataAccess.ADOHelper {
11  /// <summary>
12  /// The VarBinaryStream class inherits from Stream. It uses a
13  /// VarBinarySource class to execute the actual TSQL.
14  /// </summary>
15  public class VarBinaryStream : Stream, IDisposable {
16    private long _position;
17    private readonly VarBinarySource _source;
18
19    public VarBinaryStream(VarBinarySource source) {
20      _position = 0;
21      _source = source;
22    }
23
24    public override bool CanRead {
25      get { return true; }
26    }
27
28    public override bool CanSeek {
29      get { return true; }
30    }
31
32    public override bool CanWrite {
33      get { return true; }
34    }
35
36    public override long Length {
37      get { return _source.Length; }
38    }
39
40    public override long Position {
41      get { return _position; }
42      set { this.Seek(value, SeekOrigin.Begin); }
43    }
44
45    public override void Flush() { }
46
47    public override long Seek(long offset, SeekOrigin origin) {
48      switch (origin) {
49        case SeekOrigin.Begin: {
50            if ((offset < 0) && (offset > this.Length))
51              throw new ArgumentException("Invalid seek origin.");
52            _position = offset;
53            break;
54          }
55        case SeekOrigin.End: {
56            if ((offset > 0) && (offset < -this.Length))
57              throw new ArgumentException("Invalid seek origin.");
58            _position = this.Length - offset;
59            break;
60          }
61        case SeekOrigin.Current: {
62            if ((_position + offset > this.Length))
63              throw new ArgumentException("Invalid seek origin.");
64            _position = _position + offset;
65            break;
66          }
67        default: {
68            throw new ArgumentOutOfRangeException("origin");
69          }
70      }
71      return _position;
72    }
73
74    public override void SetLength(long value) {
75      throw new NotSupportedException();
76    }
77
78    public override int Read(byte[] buffer, int offset, int count) {
79      if (buffer == null)
80        throw new ArgumentNullException("buffer");
81      if (offset < 0)
82        throw new ArgumentOutOfRangeException("offset");
83      if (count < 0)
84        throw new ArgumentOutOfRangeException("count");
85      if (buffer.Length - offset < count)
86        throw new ArgumentException("Offset and length were out of bounds for the array");
87
88      byte[] data = _source.Read(Position, count);
89      if (data == null)
90        return 0;
91
92      Buffer.BlockCopy(data, 0, buffer, offset, data.Length);
93      _position += data.Length;
94      return data.Length;
95    }
96
97    public override void Write(byte[] buffer, int offset, int count) {
98      if (buffer == null)
99        throw new ArgumentNullException("buffer");
100      if (offset < 0)
101        throw new ArgumentOutOfRangeException("offset");
102      if (count < 0)
103        throw new ArgumentOutOfRangeException("count");
104      if (buffer.Length - offset < count)
105        throw new ArgumentException("Offset and length were out of bounds for the array");
106
107      byte[] data = GetWriteBuffer(buffer, count, offset);
108      _source.Write(data, _position, count);
109      _position += count;
110    }
111
112    private static byte[] GetWriteBuffer(byte[] buffer, int count, int
113    offset) {
114      if (buffer.Length == count)
115        return buffer;
116      byte[] data = new byte[count];
117      Buffer.BlockCopy(buffer, offset, data, 0, count);
118      return data;
119    }
120
121    public override void Close() {
122      _source.Close();
123    }
124
125    protected override void Dispose(bool disposing) {
126      if (!disposing) {
127        if (_source != null) {
128          _source.Close();
129          _source.Dispose();
130        }
131      }
132      base.Dispose(disposing);
133    }
134  }
135
136
137  /// <summary>
138  /// The VarBinarySource class constructs the TSQL used
139  /// to read to / write from the VARBINARY(MAX) column.
140  /// IT is currently specialised for a table with a single
141  /// int column for a PK, but this can be easily generalised
142  /// for compound keys.
143  /// </summary>
144  public class VarBinarySource : IDisposable {
145    private SqlCommand _readCommand;
146    private SqlCommand _writeCommand;
147    private SqlConnection _connection;
148    private SqlTransaction _transaction;
149    private bool _ownedTransaction = false;
150    private bool _ownedConnection = false;
151    private readonly long _length;
152
153    public VarBinarySource(SqlConnection connection,
154      SqlTransaction transaction,
155      string table, string
156    dataColumn, string keyColumn, Guid key) {
157      _connection = connection;
158
159      if (_connection.State == ConnectionState.Closed) {
160        _connection.Open();
161        _ownedConnection = true;
162
163        _transaction =
164          _connection.BeginTransaction(
165          IsolationLevel.RepeatableRead)
166            as SqlTransaction;
167        _ownedTransaction = true;
168      } else {
169        _ownedConnection = false;
170
171        if (transaction != null) {
172          _transaction = transaction;
173
174          _ownedTransaction = false;
175
176          if (_transaction.IsolationLevel != IsolationLevel.RepeatableRead &&
177              _transaction.IsolationLevel != IsolationLevel.Serializable) {
178            throw new ArgumentException("Transaction level must be at least repeatable read");
179          }
180        } else {
181          _transaction =
182            _connection.BeginTransaction(
183          IsolationLevel.RepeatableRead)
184            as SqlTransaction;
185
186          _ownedTransaction = true;
187        }
188      }
189
190      _length = GetLength(connection,  table, dataColumn, keyColumn, key);
191      _readCommand = CreateReadCommand(connection, table, dataColumn,
192      keyColumn, key);
193      _writeCommand = CreateWriteCommand(connection, table, dataColumn,
194      keyColumn, key);
195    }
196
197    public long Length {
198      get { return _length; }
199    }
200
201    private static SqlCommand CreateReadCommand(SqlConnection connection,
202    string table, string dataColumn, string keyColumn,
203    Guid key) {
204      SqlCommand readCommand = connection.CreateCommand();
205      readCommand.CommandText = string.Format(@"
206select substring({0}, @offset, @length)
207from {1}
208where {2} = @key", dataColumn, table, keyColumn);
209      readCommand.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
210      readCommand.Parameters.Add("@offset", SqlDbType.BigInt);
211      readCommand.Parameters.Add("@length", SqlDbType.BigInt);
212      return readCommand;
213    }
214
215    private static SqlCommand CreateWriteCommand(SqlConnection connection,
216    string table, string dataColumn, string keyColumn,
217    Guid key) {
218      SqlCommand writecommand = connection.CreateCommand();
219      writecommand.CommandText = string.Format(@"
220update {0}
221set {1}.write(@buffer, @offset, @length)
222where {2} = @key", table, dataColumn, keyColumn);
223      writecommand.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
224      writecommand.Parameters.Add("@offset", SqlDbType.BigInt);
225      writecommand.Parameters.Add("@length", SqlDbType.BigInt);
226      writecommand.Parameters.Add("@buffer", SqlDbType.VarBinary);
227      return writecommand;
228    }
229
230    private long GetLength(SqlConnection connection,
231      string table,
232    string dataColumn, string keyColumn,
233    Guid key) {
234      using (SqlCommand command = connection.CreateCommand()) {
235        command.Transaction = _transaction;
236
237        SqlParameter length = command.Parameters.Add("@length",
238        SqlDbType.BigInt);
239        length.Direction = ParameterDirection.Output;
240
241        command.CommandText = string.Format(@"
242select @length = cast(datalength({0}) as bigint)
243from {1}
244where {2} = @key", dataColumn, table, keyColumn);
245        command.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
246        command.ExecuteNonQuery();
247        return length.Value == DBNull.Value ? 0 : (long)length.Value;
248      }
249    }
250
251    public byte[] Read(long offset, long length) {
252      _readCommand.Transaction = _transaction;
253
254      // substring is 1-based.
255      _readCommand.Parameters["@offset"].Value = offset + 1;
256      _readCommand.Parameters["@length"].Value = length;
257      return (byte[])_readCommand.ExecuteScalar();
258    }
259
260    public void Write(byte[] buffer, long offset, long length) {
261      _writeCommand.Transaction = _transaction;
262     
263      _writeCommand.Parameters["@buffer"].Value = buffer;
264      _writeCommand.Parameters["@offset"].Value = offset;
265      _writeCommand.Parameters["@length"].Value = length;
266      _writeCommand.ExecuteNonQuery();
267    }
268
269    public void Close() {
270      if (_transaction != null) {
271        if(_ownedTransaction)
272          _transaction.Commit();
273        _transaction = null;
274      }
275
276      if (_connection != null) {
277        if (_ownedConnection)
278          _connection.Close();
279        _connection = null;
280      }
281    }
282
283    public void Dispose() {
284      if (_readCommand != null) {
285        _readCommand.Dispose();
286      }
287      if (_writeCommand != null) {
288        _writeCommand.Dispose();
289      }
290    }
291  }
292}
293
Note: See TracBrowser for help on using the repository browser.