Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/26/09 12:18:32 (15 years ago)
Author:
svonolfe
Message:

Streaming of Jobs and JobsResults directly from/to the DB (#680)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Hive.Client.Communication/3.2/WcfService.cs

    r2107 r2117  
    194194    void proxy_SendStreamedJobCompleted(object sender, SendStreamedJobCompletedEventArgs e) {
    195195      if (e.Error == null) {
    196         Stream stream =
    197           (Stream)e.Result;
    198                
    199         BinaryFormatter formatter =
    200           new BinaryFormatter();
    201         ResponseJob response = (ResponseJob)formatter.Deserialize(stream);
    202 
    203         SendJobCompletedEventArgs completedEventArgs =
    204           new SendJobCompletedEventArgs(new object[] { response }, e.Error, e.Cancelled, e.UserState);
    205         SendJobCompleted(sender, completedEventArgs);
     196        Stream stream = null;
     197
     198        try {
     199          stream = (Stream)e.Result;
     200
     201          //first deserialize the response
     202          BinaryFormatter formatter =
     203            new BinaryFormatter();
     204          ResponseJob response =
     205            (ResponseJob)formatter.Deserialize(stream);
     206
     207          //second deserialize the BLOB
     208          MemoryStream memStream = new MemoryStream();
     209          byte[] buffer = new byte[3024];
     210          int read = 0;
     211          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
     212            memStream.Write(buffer, 0, read);
     213          }
     214
     215          SendJobCompletedEventArgs completedEventArgs =
     216            new SendJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, e.Error, e.Cancelled, e.UserState);
     217          SendJobCompleted(sender, completedEventArgs);
     218        }
     219        finally {
     220          if(stream != null)
     221            stream.Dispose();
     222        }
    206223      } else
    207224        HandleNetworkError(e.Error);
     
    216233    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> StoreFinishedJobResultCompleted;
    217234    public void StoreFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
    218       if (ConnState == NetworkEnum.WcfConnState.Loggedin)
    219         proxy.StoreFinishedJobResultStreamedAsync(
    220           GetStreamedJobResult(clientId, jobId, result, percentage, exception));
    221     }
     235      if (ConnState == NetworkEnum.WcfConnState.Loggedin) {
     236        Stream stream =
     237          GetStreamedJobResult(clientId, jobId, result, percentage, exception);
     238
     239        proxy.StoreFinishedJobResultStreamedAsync(stream, stream);
     240      }
     241     }
    222242    private void proxy_StoreFinishedJobResultStreamedCompleted(object sender, StoreFinishedJobResultStreamedCompletedEventArgs e) {
     243      Stream stream =
     244        (Stream)e.UserState;
     245      if (stream != null)
     246        stream.Dispose();
     247     
    223248      if (e.Error == null) {
    224249        StoreFinishedJobResultCompletedEventArgs args =
     
    235260    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
    236261    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
    237       if(ConnState == NetworkEnum.WcfConnState.Loggedin)
    238         proxy.ProcessSnapshotStreamedAsync(
    239           GetStreamedJobResult(
    240             clientId, jobId, result, percentage, exception));
     262      if (ConnState == NetworkEnum.WcfConnState.Loggedin) {
     263        Stream stream = GetStreamedJobResult(
     264            clientId, jobId, result, percentage, exception);
     265
     266        proxy.ProcessSnapshotStreamedAsync(stream, stream);
     267      }
    241268    }
    242269    void proxy_ProcessSnapshotStreamedCompleted(object sender, ProcessSnapshotStreamedCompletedEventArgs e) {
     270      Stream stream =
     271        (Stream)e.UserState;
     272      if (stream != null)
     273        stream.Dispose();
     274     
    243275      if (e.Error == null) {
    244276        ProcessSnapshotCompletedEventArgs args =
     
    283315    /// </summary>
    284316    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
    285       SerializedJobResult serializedJobResult =
    286           new SerializedJobResult();
    287       JobResult jobResult = new JobResult();
     317      JobResult jobResult =
     318          new JobResult();
    288319      jobResult.ClientId = clientId;
    289320      jobResult.JobId = jobId;
     
    291322      jobResult.Exception = exception;
    292323
    293       serializedJobResult.JobResult = jobResult;
    294       serializedJobResult.SerializedJobResultData = result;
    295 
    296       MemoryStream stream =
    297         new MemoryStream();
    298 
    299       BinaryFormatter formatter =
    300         new BinaryFormatter();
    301 
    302       formatter.Serialize(stream, serializedJobResult);
    303       stream.Seek(0, SeekOrigin.Begin);
     324      MultiStream stream =
     325              new MultiStream();
     326
     327      //first send result
     328      stream.AddStream(
     329        new StreamedObject<JobResult>(jobResult));
     330
     331      //second stream the job binary data
     332      MemoryStream memStream =
     333        new MemoryStream(result, false);
     334      stream.AddStream(memStream);
    304335
    305336      return stream;
Note: See TracChangeset for help on using the changeset viewer.