Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.DataAccess.ADOHelper/3.2/VarBinaryStream.cs @ 3041

Last change on this file since 3041 was 2606, checked in by kgrading, 15 years ago

changed the isolation level of the binary stream and added output to PersistableObject (#828)

File size: 8.9 KB
Line 
1/* Based on:
2 *   http://www.eggheadcafe.com/conversation.aspx?messageid=29988910&threadid=29988841
3 * */
4
5using System;
6using System.IO;
7using System.Data.SqlClient;
8using System.Data;
9
10namespace HeuristicLab.DataAccess.ADOHelper {
11  /// <summary>
12  /// The VarBinaryStream class inherits from Stream. It uses a
13  /// VarBinarySource class to execute the actual TSQL.
14  /// </summary>
15  public class VarBinaryStream : Stream, IDisposable {
16    private long _position;
17    private readonly VarBinarySource _source;
18
19    public VarBinaryStream(VarBinarySource source) {
20      _position = 0;
21      _source = source;
22    }
23
24    public override bool CanRead {
25      get { return true; }
26    }
27
28    public override bool CanSeek {
29      get { return true; }
30    }
31
32    public override bool CanWrite {
33      get { return true; }
34    }
35
36    public override long Length {
37      get { return _source.Length; }
38    }
39
40    public override long Position {
41      get { return _position; }
42      set { this.Seek(value, SeekOrigin.Begin); }
43    }
44
45    public override void Flush() { }
46
47    public override long Seek(long offset, SeekOrigin origin) {
48      switch (origin) {
49        case SeekOrigin.Begin: {
50            if ((offset < 0) && (offset > this.Length))
51              throw new ArgumentException("Invalid seek origin.");
52            _position = offset;
53            break;
54          }
55        case SeekOrigin.End: {
56            if ((offset > 0) && (offset < -this.Length))
57              throw new ArgumentException("Invalid seek origin.");
58            _position = this.Length - offset;
59            break;
60          }
61        case SeekOrigin.Current: {
62            if ((_position + offset > this.Length))
63              throw new ArgumentException("Invalid seek origin.");
64            _position = _position + offset;
65            break;
66          }
67        default: {
68            throw new ArgumentOutOfRangeException("origin");
69          }
70      }
71      return _position;
72    }
73
74    public override void SetLength(long value) {
75      throw new NotSupportedException();
76    }
77
78    public override int Read(byte[] buffer, int offset, int count) {
79      if (buffer == null)
80        throw new ArgumentNullException("buffer");
81      if (offset < 0)
82        throw new ArgumentOutOfRangeException("offset");
83      if (count < 0)
84        throw new ArgumentOutOfRangeException("count");
85      if (buffer.Length - offset < count)
86        throw new ArgumentException("Offset and length were out of bounds for the array");
87
88      byte[] data = _source.Read(Position, count);
89      if (data == null)
90        return 0;
91
92      Buffer.BlockCopy(data, 0, buffer, offset, data.Length);
93      _position += data.Length;
94      return data.Length;
95    }
96
97    public override void Write(byte[] buffer, int offset, int count) {
98      if (buffer == null)
99        throw new ArgumentNullException("buffer");
100      if (offset < 0)
101        throw new ArgumentOutOfRangeException("offset");
102      if (count < 0)
103        throw new ArgumentOutOfRangeException("count");
104      if (buffer.Length - offset < count)
105        throw new ArgumentException("Offset and length were out of bounds for the array");
106
107      byte[] data = GetWriteBuffer(buffer, count, offset);
108      _source.Write(data, _position, count);
109      _position += count;
110    }
111
112    private static byte[] GetWriteBuffer(byte[] buffer, int count, int
113    offset) {
114      if (buffer.Length == count)
115        return buffer;
116      byte[] data = new byte[count];
117      Buffer.BlockCopy(buffer, offset, data, 0, count);
118      return data;
119    }
120
121    public override void Close() {
122      _source.Close();
123    }
124
125    protected override void Dispose(bool disposing) {
126      if (!disposing) {
127        if (_source != null) {
128          _source.Close();
129          _source.Dispose();
130        }
131      }
132      base.Dispose(disposing);
133    }
134  }
135
136
137  /// <summary>
138  /// The VarBinarySource class constructs the TSQL used
139  /// to read to / write from the VARBINARY(MAX) column.
140  /// IT is currently specialised for a table with a single
141  /// int column for a PK, but this can be easily generalised
142  /// for compound keys.
143  /// </summary>
144  public class VarBinarySource : IDisposable {
145    private SqlCommand _readCommand;
146    private SqlCommand _writeCommand;
147    private SqlConnection _connection;
148    private SqlTransaction _transaction;
149    private bool _ownedTransaction = false;
150    private bool _ownedConnection = false;
151    private readonly long _length;
152
153    public VarBinarySource(SqlConnection connection,
154      SqlTransaction transaction,
155      string table, string
156    dataColumn, string keyColumn, Guid key) {
157      _connection = connection;
158
159      if (_connection.State == ConnectionState.Closed) {
160        _connection.Open();
161        _ownedConnection = true;
162
163        _transaction =
164          _connection.BeginTransaction(
165          IsolationLevel.RepeatableRead)
166            as SqlTransaction;
167        _ownedTransaction = true;
168      } else {
169        _ownedConnection = false;
170
171        if (transaction != null) {
172          _transaction = transaction;
173
174          _ownedTransaction = false;
175        } else {
176          _transaction =
177            _connection.BeginTransaction(
178          IsolationLevel.ReadCommitted)
179            as SqlTransaction;
180
181          _ownedTransaction = true;
182        }
183      }
184
185      _length = GetLength(connection,  table, dataColumn, keyColumn, key);
186      _readCommand = CreateReadCommand(connection, table, dataColumn,
187      keyColumn, key);
188      _writeCommand = CreateWriteCommand(connection, table, dataColumn,
189      keyColumn, key);
190    }
191
192    public long Length {
193      get { return _length; }
194    }
195
196    private static SqlCommand CreateReadCommand(SqlConnection connection,
197    string table, string dataColumn, string keyColumn,
198    Guid key) {
199      SqlCommand readCommand = connection.CreateCommand();
200      readCommand.CommandText = string.Format(@"
201select substring({0}, @offset, @length)
202from {1}
203where {2} = @key", dataColumn, table, keyColumn);
204      readCommand.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
205      readCommand.Parameters.Add("@offset", SqlDbType.BigInt);
206      readCommand.Parameters.Add("@length", SqlDbType.BigInt);
207      return readCommand;
208    }
209
210    private static SqlCommand CreateWriteCommand(SqlConnection connection,
211    string table, string dataColumn, string keyColumn,
212    Guid key) {
213      SqlCommand writecommand = connection.CreateCommand();
214      writecommand.CommandText = string.Format(@"
215update {0}
216set {1}.write(@buffer, @offset, @length)
217where {2} = @key", table, dataColumn, keyColumn);
218      writecommand.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
219      writecommand.Parameters.Add("@offset", SqlDbType.BigInt);
220      writecommand.Parameters.Add("@length", SqlDbType.BigInt);
221      writecommand.Parameters.Add("@buffer", SqlDbType.VarBinary);
222      return writecommand;
223    }
224
225    private long GetLength(SqlConnection connection,
226      string table,
227    string dataColumn, string keyColumn,
228    Guid key) {
229      using (SqlCommand command = connection.CreateCommand()) {
230        command.Transaction = _transaction;
231
232        SqlParameter length = command.Parameters.Add("@length",
233        SqlDbType.BigInt);
234        length.Direction = ParameterDirection.Output;
235
236        command.CommandText = string.Format(@"
237select @length = cast(datalength({0}) as bigint)
238from {1}
239where {2} = @key", dataColumn, table, keyColumn);
240        command.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
241        command.ExecuteNonQuery();
242        return length.Value == DBNull.Value ? 0 : (long)length.Value;
243      }
244    }
245
246    public byte[] Read(long offset, long length) {
247      _readCommand.Transaction = _transaction;
248
249      // substring is 1-based.
250      _readCommand.Parameters["@offset"].Value = offset + 1;
251      _readCommand.Parameters["@length"].Value = length;
252      return (byte[])_readCommand.ExecuteScalar();
253    }
254
255    public void Write(byte[] buffer, long offset, long length) {
256      _writeCommand.Transaction = _transaction;
257     
258      _writeCommand.Parameters["@buffer"].Value = buffer;
259      _writeCommand.Parameters["@offset"].Value = offset;
260      _writeCommand.Parameters["@length"].Value = length;
261      _writeCommand.ExecuteNonQuery();
262    }
263
264    public void Close() {
265      if (_transaction != null) {
266        if(_ownedTransaction)
267          _transaction.Commit();
268        _transaction = null;
269      }
270
271      if (_connection != null) {
272        if (_ownedConnection)
273          _connection.Close();
274        _connection = null;
275      }
276    }
277
278    public void Dispose() {
279      if (_readCommand != null) {
280        _readCommand.Dispose();
281      }
282      if (_writeCommand != null) {
283        _writeCommand.Dispose();
284      }
285    }
286  }
287}
288
Note: See TracBrowser for help on using the repository browser.