Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.LINQDataAccess/3.2/VarBinaryStream.cs @ 3220

Last change on this file since 3220 was 3220, checked in by kgrading, 14 years ago

improved the DAL further, changed minor details for the presentation (#830)

File size: 8.6 KB
Line 
1/* Based on:
2 *   http://www.eggheadcafe.com/conversation.aspx?messageid=29988910&threadid=29988841
3 * */
4
5using System;
6using System.IO;
7using System.Data.SqlClient;
8using System.Data;
9
10namespace HeuristicLab.Hive.Server.LINQDataAccess {
11  /// <summary>
12  /// The VarBinaryStream class inherits from Stream. It uses a
13  /// VarBinarySource class to execute the actual TSQL.
14  /// </summary>
15  public class VarBinaryStream : Stream, IDisposable {
16    private long _position;
17    private readonly VarBinarySource _source;
18
19    public VarBinaryStream(VarBinarySource source) {
20      _position = 0;
21      _source = source;
22    }
23
24    public override bool CanRead {
25      get { return true; }
26    }
27
28    public override bool CanSeek {
29      get { return true; }
30    }
31
32    public override bool CanWrite {
33      get { return true; }
34    }
35
36    public override long Length {
37      get { return _source.Length; }
38    }
39
40    public override long Position {
41      get { return _position; }
42      set { this.Seek(value, SeekOrigin.Begin); }
43    }
44
45    public override void Flush() { }
46
47    public override long Seek(long offset, SeekOrigin origin) {
48      switch (origin) {
49        case SeekOrigin.Begin: {
50            if ((offset < 0) && (offset > this.Length))
51              throw new ArgumentException("Invalid seek origin.");
52            _position = offset;
53            break;
54          }
55        case SeekOrigin.End: {
56            if ((offset > 0) && (offset < -this.Length))
57              throw new ArgumentException("Invalid seek origin.");
58            _position = this.Length - offset;
59            break;
60          }
61        case SeekOrigin.Current: {
62            if ((_position + offset > this.Length))
63              throw new ArgumentException("Invalid seek origin.");
64            _position = _position + offset;
65            break;
66          }
67        default: {
68            throw new ArgumentOutOfRangeException("origin");
69          }
70      }
71      return _position;
72    }
73
74    public override void SetLength(long value) {
75      throw new NotSupportedException();
76    }
77
78    public override int Read(byte[] buffer, int offset, int count) {
79      if (buffer == null)
80        throw new ArgumentNullException("buffer");
81      if (offset < 0)
82        throw new ArgumentOutOfRangeException("offset");
83      if (count < 0)
84        throw new ArgumentOutOfRangeException("count");
85      if (buffer.Length - offset < count)
86        throw new ArgumentException("Offset and length were out of bounds for the array");
87
88      byte[] data = _source.Read(Position, count);
89      if (data == null)
90        return 0;
91
92      Buffer.BlockCopy(data, 0, buffer, offset, data.Length);
93      _position += data.Length;
94      return data.Length;
95    }
96
97    public override void Write(byte[] buffer, int offset, int count) {
98      if (buffer == null)
99        throw new ArgumentNullException("buffer");
100      if (offset < 0)
101        throw new ArgumentOutOfRangeException("offset");
102      if (count < 0)
103        throw new ArgumentOutOfRangeException("count");
104      if (buffer.Length - offset < count)
105        throw new ArgumentException("Offset and length were out of bounds for the array");
106
107      byte[] data = GetWriteBuffer(buffer, count, offset);
108      _source.Write(data, _position, count);
109      _position += count;
110    }
111
112    private static byte[] GetWriteBuffer(byte[] buffer, int count, int
113    offset) {
114      if (buffer.Length == count)
115        return buffer;
116      byte[] data = new byte[count];
117      Buffer.BlockCopy(buffer, offset, data, 0, count);
118      return data;
119    }
120
121    public override void Close() {
122      _source.Close();
123    }
124
125    protected override void Dispose(bool disposing) {
126      if (!disposing) {
127        if (_source != null) {
128          _source.Close();
129          _source.Dispose();
130        }
131      }
132      base.Dispose(disposing);
133    }
134  }
135
136
137  public class VarBinarySource : IDisposable {
138    private SqlCommand _readCommand;
139    private SqlCommand _writeCommand;
140    private SqlConnection _connection;
141    private SqlTransaction _transaction;
142    private bool _ownedTransaction = false;
143    private bool _ownedConnection = false;
144    private readonly long _length;
145
146    public VarBinarySource(SqlConnection connection,
147      SqlTransaction transaction,
148      string table, string
149    dataColumn, string keyColumn, Guid key) {
150      _connection = connection;
151      _transaction = (SqlTransaction) ContextFactory.Context.Transaction;
152      /*
153      if (_connection.State == ConnectionState.Closed) {
154        _connection.Open();
155        _ownedConnection = true;
156
157        _transaction =
158          _connection.BeginTransaction(IsolationLevel.RepeatableRead) as SqlTransaction;
159        _ownedTransaction = true;
160      } else {
161        _ownedConnection = false;
162
163        if (transaction != null) {
164          _transaction = transaction;
165
166          _ownedTransaction = false;
167        } else {
168          _transaction =
169            _connection.BeginTransaction(IsolationLevel.ReadCommitted) as SqlTransaction;
170
171          _ownedTransaction = true;
172        }
173      }        */
174
175      _length = GetLength(connection, table, dataColumn, keyColumn, key);
176      _readCommand = CreateReadCommand(connection, table, dataColumn,
177      keyColumn, key);
178      _writeCommand = CreateWriteCommand(connection, table, dataColumn,
179      keyColumn, key);
180    }
181
182    public long Length {
183      get { return _length; }
184    }
185
186    private static SqlCommand CreateReadCommand(SqlConnection connection,
187    string table, string dataColumn, string keyColumn,
188    Guid key) {
189      SqlCommand readCommand = connection.CreateCommand();
190      readCommand.CommandText = string.Format(@"
191select substring({0}, @offset, @length)
192from {1}
193where {2} = @key", dataColumn, table, keyColumn);
194      readCommand.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
195      readCommand.Parameters.Add("@offset", SqlDbType.BigInt);
196      readCommand.Parameters.Add("@length", SqlDbType.BigInt);
197      return readCommand;
198    }
199
200    private static SqlCommand CreateWriteCommand(SqlConnection connection,
201    string table, string dataColumn, string keyColumn,
202    Guid key) {
203      SqlCommand writecommand = connection.CreateCommand();
204      writecommand.CommandText = string.Format(@"
205update {0}
206set {1}.write(@buffer, @offset, @length)
207where {2} = @key", table, dataColumn, keyColumn);
208      writecommand.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
209      writecommand.Parameters.Add("@offset", SqlDbType.BigInt);
210      writecommand.Parameters.Add("@length", SqlDbType.BigInt);
211      writecommand.Parameters.Add("@buffer", SqlDbType.VarBinary);
212      return writecommand;
213    }
214
215    private long GetLength(SqlConnection connection,
216      string table,
217    string dataColumn, string keyColumn,
218    Guid key) {
219      using (SqlCommand command = connection.CreateCommand()) {
220        command.Transaction = _transaction;
221
222        SqlParameter length = command.Parameters.Add("@length",
223        SqlDbType.BigInt);
224        length.Direction = ParameterDirection.Output;
225
226        command.CommandText = string.Format(@"
227select @length = cast(datalength({0}) as bigint)
228from {1}
229where {2} = @key", dataColumn, table, keyColumn);
230        command.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
231        command.ExecuteNonQuery();
232        return length.Value == DBNull.Value ? 0 : (long)length.Value;
233      }
234    }
235
236    public byte[] Read(long offset, long length) {
237      _readCommand.Transaction = _transaction;
238
239      // substring is 1-based.
240      _readCommand.Parameters["@offset"].Value = offset + 1;
241      _readCommand.Parameters["@length"].Value = length;
242      return (byte[])_readCommand.ExecuteScalar();
243    }
244
245    public void Write(byte[] buffer, long offset, long length) {
246      _writeCommand.Transaction = _transaction;
247
248      _writeCommand.Parameters["@buffer"].Value = buffer;
249      _writeCommand.Parameters["@offset"].Value = offset;
250      _writeCommand.Parameters["@length"].Value = length;
251      _writeCommand.ExecuteNonQuery();
252    }
253
254    public void Close() {
255      if (_transaction != null) {
256        _transaction.Commit();
257        _transaction = null;
258      }
259
260      if (_connection != null) {
261        _connection.Close();
262        ContextFactory.Context.Dispose();
263        ContextFactory.Context = null;
264      }
265    }
266
267    public void Dispose() {
268      if (_readCommand != null) {
269        _readCommand.Dispose();
270      }
271      if (_writeCommand != null) {
272        _writeCommand.Dispose();
273      }
274    }
275  }
276}
277
Note: See TracBrowser for help on using the repository browser.