Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.LINQDataAccess/3.3/VarBinaryStream.cs @ 5134

Last change on this file since 5134 was 4710, checked in by cneumuel, 14 years ago

#1254

  • enabled full IIS7 compatibility
  • removed dependency to HeuristicLab.Tracing
  • changed server-side logging to wcf-tracing (needs to be enhanced to trace job-lifecycle)
File size: 8.8 KB
Line 
1/* Based on:
2 *   http://www.eggheadcafe.com/conversation.aspx?messageid=29988910&threadid=29988841
3 * */
4
5using System;
6using System.Data;
7using System.Data.SqlClient;
8using System.IO;
9using HeuristicLab.Hive.Tracing;
10
11namespace HeuristicLab.Hive.Server.LINQDataAccess {
12  /// <summary>
13  /// The VarBinaryStream class inherits from Stream. It uses a
14  /// VarBinarySource class to execute the actual TSQL.
15  /// </summary>
16  public class VarBinaryStream : Stream, IDisposable {
17    private long _position;
18    private readonly VarBinarySource _source;
19
20    public VarBinaryStream(VarBinarySource source) {
21      _position = 0;
22      _source = source;
23    }
24
25    public override bool CanRead {
26      get { return true; }
27    }
28
29    public override bool CanSeek {
30      get { return true; }
31    }
32
33    public override bool CanWrite {
34      get { return true; }
35    }
36
37    public override long Length {
38      get { return _source.Length; }
39    }
40
41    public override long Position {
42      get { return _position; }
43      set { this.Seek(value, SeekOrigin.Begin); }
44    }
45
46    public override void Flush() { }
47
48    public override long Seek(long offset, SeekOrigin origin) {
49      switch (origin) {
50        case SeekOrigin.Begin: {
51            if ((offset < 0) && (offset > this.Length))
52              throw new ArgumentException("Invalid seek origin.");
53            _position = offset;
54            break;
55          }
56        case SeekOrigin.End: {
57            if ((offset > 0) && (offset < -this.Length))
58              throw new ArgumentException("Invalid seek origin.");
59            _position = this.Length - offset;
60            break;
61          }
62        case SeekOrigin.Current: {
63            if ((_position + offset > this.Length))
64              throw new ArgumentException("Invalid seek origin.");
65            _position = _position + offset;
66            break;
67          }
68        default: {
69            throw new ArgumentOutOfRangeException("origin");
70          }
71      }
72      return _position;
73    }
74
75    public override void SetLength(long value) {
76      throw new NotSupportedException();
77    }
78
79    public override int Read(byte[] buffer, int offset, int count) {
80      if (buffer == null)
81        throw new ArgumentNullException("buffer");
82      if (offset < 0)
83        throw new ArgumentOutOfRangeException("offset");
84      if (count < 0)
85        throw new ArgumentOutOfRangeException("count");
86      if (buffer.Length - offset < count)
87        throw new ArgumentException("Offset and length were out of bounds for the array");
88
89      byte[] data = _source.Read(Position, count);
90      if (data == null)
91        return 0;
92
93      Buffer.BlockCopy(data, 0, buffer, offset, data.Length);
94      _position += data.Length;
95      return data.Length;
96    }
97
98    public override void Write(byte[] buffer, int offset, int count) {
99      if (buffer == null)
100        throw new ArgumentNullException("buffer");
101      if (offset < 0)
102        throw new ArgumentOutOfRangeException("offset");
103      if (count < 0)
104        throw new ArgumentOutOfRangeException("count");
105      if (buffer.Length - offset < count)
106        throw new ArgumentException("Offset and length were out of bounds for the array");
107
108      byte[] data = GetWriteBuffer(buffer, count, offset);
109      _source.Write(data, _position, count);
110      _position += count;
111    }
112
113    private static byte[] GetWriteBuffer(byte[] buffer, int count, int
114    offset) {
115      if (buffer.Length == count)
116        return buffer;
117      byte[] data = new byte[count];
118      Buffer.BlockCopy(buffer, offset, data, 0, count);
119      return data;
120    }
121
122    public override void Close() {
123      _source.Close();
124    }
125
126    protected override void Dispose(bool disposing) {
127      if (!disposing) {
128        if (_source != null) {
129          _source.Close();
130          _source.Dispose();
131        }
132      }
133      base.Dispose(disposing);
134    }
135  }
136
137
138  public class VarBinarySource : IDisposable {
139    private SqlCommand _readCommand;
140    private SqlCommand _writeCommand;
141    private SqlConnection _connection;
142    private SqlTransaction _transaction;
143    private bool _ownedTransaction = false;
144    private bool _ownedConnection = false;
145    private readonly long _length;
146
147    public VarBinarySource(SqlConnection connection,
148      SqlTransaction transaction,
149      string table, string
150    dataColumn, string keyColumn, Guid key) {
151      _connection = connection;
152      _transaction = (SqlTransaction) ContextFactory.Instance.CurrentContext.Transaction;
153      /*
154      if (_connection.State == ConnectionState.Closed) {
155        _connection.Open();
156        _ownedConnection = true;
157
158        _transaction =
159          _connection.BeginTransaction(IsolationLevel.RepeatableRead) as SqlTransaction;
160        _ownedTransaction = true;
161      } else {
162        _ownedConnection = false;
163
164        if (transaction != null) {
165          _transaction = transaction;
166
167          _ownedTransaction = false;
168        } else {
169          _transaction =
170            _connection.BeginTransaction(IsolationLevel.ReadCommitted) as SqlTransaction;
171
172          _ownedTransaction = true;
173        }
174      }        */
175
176      _length = GetLength(connection, table, dataColumn, keyColumn, key);
177      _readCommand = CreateReadCommand(connection, table, dataColumn,
178      keyColumn, key);
179      _writeCommand = CreateWriteCommand(connection, table, dataColumn,
180      keyColumn, key);
181    }
182
183    public long Length {
184      get { return _length; }
185    }
186
187    private static SqlCommand CreateReadCommand(SqlConnection connection,
188    string table, string dataColumn, string keyColumn,
189    Guid key) {
190      SqlCommand readCommand = connection.CreateCommand();
191      readCommand.CommandText = string.Format(@"
192select substring({0}, @offset, @length)
193from {1}
194where {2} = @key", dataColumn, table, keyColumn);
195      readCommand.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
196      readCommand.Parameters.Add("@offset", SqlDbType.BigInt);
197      readCommand.Parameters.Add("@length", SqlDbType.BigInt);
198      return readCommand;
199    }
200
201    private static SqlCommand CreateWriteCommand(SqlConnection connection,
202    string table, string dataColumn, string keyColumn,
203    Guid key) {
204      SqlCommand writecommand = connection.CreateCommand();
205      writecommand.CommandText = string.Format(@"
206update {0}
207set {1}.write(@buffer, @offset, @length)
208where {2} = @key", table, dataColumn, keyColumn);
209      writecommand.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
210      writecommand.Parameters.Add("@offset", SqlDbType.BigInt);
211      writecommand.Parameters.Add("@length", SqlDbType.BigInt);
212      writecommand.Parameters.Add("@buffer", SqlDbType.VarBinary);
213      return writecommand;
214    }
215
216    private long GetLength(SqlConnection connection,
217      string table,
218    string dataColumn, string keyColumn,
219    Guid key) {
220      using (SqlCommand command = connection.CreateCommand()) {
221        command.Transaction = _transaction;
222
223        SqlParameter length = command.Parameters.Add("@length",
224        SqlDbType.BigInt);
225        length.Direction = ParameterDirection.Output;
226
227        command.CommandText = string.Format(@"
228select @length = cast(datalength({0}) as bigint)
229from {1}
230where {2} = @key", dataColumn, table, keyColumn);
231        command.Parameters.Add("@key", SqlDbType.UniqueIdentifier).Value = key;
232        command.ExecuteNonQuery();
233        return length.Value == DBNull.Value ? 0 : (long)length.Value;
234      }
235    }
236
237    public byte[] Read(long offset, long length) {
238      _readCommand.Transaction = _transaction;
239
240      // substring is 1-based.
241      _readCommand.Parameters["@offset"].Value = offset + 1;
242      _readCommand.Parameters["@length"].Value = length;
243      return (byte[])_readCommand.ExecuteScalar();
244    }
245
246    public void Write(byte[] buffer, long offset, long length) {
247      _writeCommand.Transaction = _transaction;
248
249      _writeCommand.Parameters["@buffer"].Value = buffer;
250      _writeCommand.Parameters["@offset"].Value = offset;
251      _writeCommand.Parameters["@length"].Value = length;
252      _writeCommand.ExecuteNonQuery();
253    }
254
255    public void Close() {
256      Logger.Info("Closing Stream");
257      if (_transaction != null) {
258        _transaction.Commit();
259        _transaction = null;
260        Logger.Info("Transaction commited and closed");
261      }
262
263      if (_connection != null) {
264        _connection.Close();
265        ContextFactory.Instance.CurrentContext.Dispose();
266        ContextFactory.Instance.RemoveContext();
267        Logger.Info("Connection and Context disposed and set null");
268      }
269    }
270
271    public void Dispose() {
272      if (_readCommand != null) {
273        _readCommand.Dispose();
274      }
275      if (_writeCommand != null) {
276        _writeCommand.Dispose();
277      }
278    }
279  }
280}
281
Note: See TracBrowser for help on using the repository browser.