Free cookie consent management tool by TermsFeed Policy Generator

Changeset 2117 for trunk


Ignore:
Timestamp:
06/26/09 12:18:32 (15 years ago)
Author:
svonolfe
Message:

Streaming of Jobs and JobsResults directly from/to the DB (#680)

Location:
trunk/sources
Files:
4 added
24 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DataAccess.ADOHelper/3.2/HeuristicLab.DataAccess.ADOHelper-3.2.csproj

    r2061 r2117  
    8080  </ItemGroup>
    8181  <ItemGroup>
     82    <Compile Include="VarBinaryStream.cs" />
    8283    <Compile Include="Properties\AssemblyInfo.cs" />
    8384    <Compile Include="DataAccessADOHelperPlugin.cs" />
  • trunk/sources/HeuristicLab.DataAccess.ADOHelper/3.2/Session.cs

    r2096 r2117  
    4242    private Thread ownerThread;
    4343
    44     private int counter;
     44    private int usageCounter = 0;
    4545
    4646    private IDictionary<Guid, object> adapters =
     
    5050      this.factory = factory;
    5151      this.ownerThread = Thread.CurrentThread;
    52       this.counter = 0;
     52      this.usageCounter = 0;
    5353    }
    5454
     
    7474
    7575    public void IncrementUsageCounter() {
    76       this.counter++;
     76      this.usageCounter++;
    7777    }
    7878
    7979    #region ISession Members
     80    public ISessionFactory Factory {
     81      get {
     82        return this.factory;
     83      }
     84    }
     85
    8086    public ITransaction BeginTransaction() {
    8187      CheckThread();
     
    8591         transaction.Connection = Connection;
    8692      }
     93
     94      transaction.IncrementUsageCounter();
    8795
    8896      return transaction;
     
    134142
    135143    public void EndSession() {
    136       this.counter--;
     144      this.usageCounter--;
    137145
    138       if (counter <= 0) {
     146      if (usageCounter <= 0) {
    139147        CheckThread();
    140148
  • trunk/sources/HeuristicLab.DataAccess.ADOHelper/3.2/TableAdapterWrapperBase.cs

    r1720 r2117  
    3636      new AdapterT();
    3737
    38     private ISession session;
     38    private Session session;
    3939   
    4040    #region IDataAdapterWrapper<AdapterT,ObjT,RowT> Members
     
    5757
    5858    public Session Session {
     59      protected get {
     60        return this.session;
     61      }
     62
    5963      set {
    6064        this.session = value;
  • trunk/sources/HeuristicLab.DataAccess.ADOHelper/3.2/Transaction.cs

    r1529 r2117  
    3434    private Session session;
    3535
     36    private int usageCounter = 0;
     37
    3638    public Transaction(Session session) {
    3739      this.session = session;
     40    }
     41
     42    public void IncrementUsageCounter() {
     43      this.usageCounter++;
    3844    }
    3945
     
    5662      this.session.CheckThread();
    5763
    58       if (transaction != null) {
     64      usageCounter--;
     65
     66      if (transaction != null && usageCounter <= 0) {
    5967        DbConnection conn =
    6068          transaction.Connection;
     
    7381    public void Rollback() {
    7482      this.session.CheckThread();
     83
     84      usageCounter = 0;
    7585
    7686      if (transaction != null) {
  • trunk/sources/HeuristicLab.DataAccess/3.2/Interfaces/ISession.cs

    r1493 r2117  
    2626namespace HeuristicLab.DataAccess.Interfaces {
    2727  public interface ISession {
     28    ISessionFactory Factory { get; }
     29   
    2830    ITransaction BeginTransaction();
    2931
  • trunk/sources/HeuristicLab.Hive.Client.Communication/3.2/Service References/ServerService/Reference.cs

    r1939 r2117  
    184184            }
    185185        }
     186
     187        public byte[] Data {
     188          get {
     189            base.RaiseExceptionIfNecessary();
     190            return ((byte[])(this.results[1]));
     191          }
     192        }
    186193    }
    187194   
  • trunk/sources/HeuristicLab.Hive.Client.Communication/3.2/WcfService.cs

    r2107 r2117  
    194194    void proxy_SendStreamedJobCompleted(object sender, SendStreamedJobCompletedEventArgs e) {
    195195      if (e.Error == null) {
    196         Stream stream =
    197           (Stream)e.Result;
    198                
    199         BinaryFormatter formatter =
    200           new BinaryFormatter();
    201         ResponseJob response = (ResponseJob)formatter.Deserialize(stream);
    202 
    203         SendJobCompletedEventArgs completedEventArgs =
    204           new SendJobCompletedEventArgs(new object[] { response }, e.Error, e.Cancelled, e.UserState);
    205         SendJobCompleted(sender, completedEventArgs);
     196        Stream stream = null;
     197
     198        try {
     199          stream = (Stream)e.Result;
     200
     201          //first deserialize the response
     202          BinaryFormatter formatter =
     203            new BinaryFormatter();
     204          ResponseJob response =
     205            (ResponseJob)formatter.Deserialize(stream);
     206
     207          //second deserialize the BLOB
     208          MemoryStream memStream = new MemoryStream();
     209          byte[] buffer = new byte[3024];
     210          int read = 0;
     211          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
     212            memStream.Write(buffer, 0, read);
     213          }
     214
     215          SendJobCompletedEventArgs completedEventArgs =
     216            new SendJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, e.Error, e.Cancelled, e.UserState);
     217          SendJobCompleted(sender, completedEventArgs);
     218        }
     219        finally {
     220          if(stream != null)
     221            stream.Dispose();
     222        }
    206223      } else
    207224        HandleNetworkError(e.Error);
     
    216233    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> StoreFinishedJobResultCompleted;
    217234    public void StoreFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
    218       if (ConnState == NetworkEnum.WcfConnState.Loggedin)
    219         proxy.StoreFinishedJobResultStreamedAsync(
    220           GetStreamedJobResult(clientId, jobId, result, percentage, exception));
    221     }
     235      if (ConnState == NetworkEnum.WcfConnState.Loggedin) {
     236        Stream stream =
     237          GetStreamedJobResult(clientId, jobId, result, percentage, exception);
     238
     239        proxy.StoreFinishedJobResultStreamedAsync(stream, stream);
     240      }
     241     }
    222242    private void proxy_StoreFinishedJobResultStreamedCompleted(object sender, StoreFinishedJobResultStreamedCompletedEventArgs e) {
     243      Stream stream =
     244        (Stream)e.UserState;
     245      if (stream != null)
     246        stream.Dispose();
     247     
    223248      if (e.Error == null) {
    224249        StoreFinishedJobResultCompletedEventArgs args =
     
    235260    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
    236261    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
    237       if(ConnState == NetworkEnum.WcfConnState.Loggedin)
    238         proxy.ProcessSnapshotStreamedAsync(
    239           GetStreamedJobResult(
    240             clientId, jobId, result, percentage, exception));
     262      if (ConnState == NetworkEnum.WcfConnState.Loggedin) {
     263        Stream stream = GetStreamedJobResult(
     264            clientId, jobId, result, percentage, exception);
     265
     266        proxy.ProcessSnapshotStreamedAsync(stream, stream);
     267      }
    241268    }
    242269    void proxy_ProcessSnapshotStreamedCompleted(object sender, ProcessSnapshotStreamedCompletedEventArgs e) {
     270      Stream stream =
     271        (Stream)e.UserState;
     272      if (stream != null)
     273        stream.Dispose();
     274     
    243275      if (e.Error == null) {
    244276        ProcessSnapshotCompletedEventArgs args =
     
    283315    /// </summary>
    284316    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
    285       SerializedJobResult serializedJobResult =
    286           new SerializedJobResult();
    287       JobResult jobResult = new JobResult();
     317      JobResult jobResult =
     318          new JobResult();
    288319      jobResult.ClientId = clientId;
    289320      jobResult.JobId = jobId;
     
    291322      jobResult.Exception = exception;
    292323
    293       serializedJobResult.JobResult = jobResult;
    294       serializedJobResult.SerializedJobResultData = result;
    295 
    296       MemoryStream stream =
    297         new MemoryStream();
    298 
    299       BinaryFormatter formatter =
    300         new BinaryFormatter();
    301 
    302       formatter.Serialize(stream, serializedJobResult);
    303       stream.Seek(0, SeekOrigin.Begin);
     324      MultiStream stream =
     325              new MultiStream();
     326
     327      //first send result
     328      stream.AddStream(
     329        new StreamedObject<JobResult>(jobResult));
     330
     331      //second stream the job binary data
     332      MemoryStream memStream =
     333        new MemoryStream(result, false);
     334      stream.AddStream(memStream);
    304335
    305336      return stream;
  • trunk/sources/HeuristicLab.Hive.Client.Core/3.2/Core.cs

    r2092 r2117  
    297297        bool sandboxed = false;
    298298        List<byte[]> files = new List<byte[]>();
    299         foreach (CachedHivePluginInfo plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.JobInfo.PluginsNeeded))
     299        foreach (CachedHivePluginInfo plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
    300300          files.AddRange(plugininfo.PluginFiles);
    301301
    302         AppDomain appDomain = PluginManager.Manager.CreateAndInitAppDomainWithSandbox(e.Result.Job.JobInfo.Id.ToString(), sandboxed, null, files);
     302        AppDomain appDomain = PluginManager.Manager.CreateAndInitAppDomainWithSandbox(e.Result.Job.Id.ToString(), sandboxed, null, files);
    303303        appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
    304304        lock (engines) {
    305           if (!jobs.ContainsKey(e.Result.Job.JobInfo.Id)) {
    306             jobs.Add(e.Result.Job.JobInfo.Id, e.Result.Job.JobInfo);
    307             appDomains.Add(e.Result.Job.JobInfo.Id, appDomain);
     305          if (!jobs.ContainsKey(e.Result.Job.Id)) {
     306            jobs.Add(e.Result.Job.Id, e.Result.Job);
     307            appDomains.Add(e.Result.Job.Id, appDomain);
    308308
    309309            Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
    310             engine.JobId = e.Result.Job.JobInfo.Id;
     310            engine.JobId = e.Result.Job.Id;
    311311            engine.Queue = MessageQueue.GetInstance();           
    312             engine.Start(e.Result.Job.SerializedJobData);
    313             engines.Add(e.Result.Job.JobInfo.Id, engine);
     312            engine.Start(e.Data);
     313            engines.Add(e.Result.Job.Id, engine);
    314314
    315315            ClientStatusInfo.JobsFetched++;
  • trunk/sources/HeuristicLab.Hive.Contracts/3.2/HeuristicLab.Hive.Contracts-3.2.csproj

    r2099 r2117  
    100100    <Compile Include="BusinessObjects\Client.cs" />
    101101    <Compile Include="BusinessObjects\ClientConfig.cs" />
     102    <Compile Include="ResponseSerializedJob.cs" />
     103    <Compile Include="MultiStream.cs" />
    102104    <Compile Include="Interfaces\IClientFacade.cs" />
    103105    <Compile Include="Interfaces\IExecutionEngineFacade.cs" />
  • trunk/sources/HeuristicLab.Hive.Contracts/3.2/Interfaces/IClientCommunicator.cs

    r1593 r2117  
    4141    ResponseJob SendJob(Guid clientId);
    4242    [OperationContract]
     43    ResponseSerializedJob SendSerializedJob(Guid clientId);
     44    [OperationContract]
    4345    ResponseResultReceived StoreFinishedJobResult(Guid clientId,
    4446      Guid jobId,
  • trunk/sources/HeuristicLab.Hive.Contracts/3.2/ResponseJob.cs

    r2092 r2117  
    3737  public class ResponseJob : Response {
    3838    [DataMember]
    39     public SerializedJob Job { get; set; }
     39    public Job Job { get; set; }
    4040  }
    4141}
  • trunk/sources/HeuristicLab.Hive.Contracts/3.2/StreamedObject.cs

    r1939 r2117  
    5656      }
    5757      set {
    58         throw new NotImplementedException();
     58        if(this.stream.Position != value)
     59          throw new NotImplementedException();
    5960      }
    6061    }
  • trunk/sources/HeuristicLab.Hive.Server.ADODataAccess/3.2/JobAdapter.cs

    r2099 r2117  
    3232using System.Data.SqlClient;
    3333using HeuristicLab.Hive.Server.ADODataAccess.TableAdapterWrapper;
     34using System.IO;
    3435
    3536namespace HeuristicLab.Hive.Server.ADODataAccess {
     
    436437          }
    437438        });
     439    }
     440
     441    public Stream GetSerializedJobStream(Guid jobId, bool useExistingConnection) {
     442      return
     443        ((JobAdapterWrapper)base.DataAdapterWrapper).
     444          GetSerializedJobStream(jobId, useExistingConnection);
    438445    }
    439446
  • trunk/sources/HeuristicLab.Hive.Server.ADODataAccess/3.2/JobResultsAdapter.cs

    r2099 r2117  
    1010using System.Data.SqlClient;
    1111using HeuristicLab.Hive.Server.ADODataAccess.TableAdapterWrapper;
     12using System.IO;
    1213
    1314namespace HeuristicLab.Hive.Server.ADODataAccess {
     
    152153    }
    153154
     155    public Stream GetSerializedJobResultStream(Guid jobResultId, bool useExistingConnection) {
     156      return
     157        ((JobResultsAdapterWrapper)
     158        base.DataAdapterWrapper).
     159          GetSerializedJobResultStream(jobResultId, useExistingConnection);
     160    }
     161
    154162    public void UpdateSerializedJobResult(SerializedJobResult jobResult) {
    155163      if (jobResult != null &&
  • trunk/sources/HeuristicLab.Hive.Server.ADODataAccess/3.2/TableAdapterWrapper/JobAdapterWrapper.cs

    r2083 r2117  
    77using HeuristicLab.Hive.Contracts.BusinessObjects;
    88using System.Data.Common;
     9using System.IO;
     10using System.Data.SqlTypes;
     11using System.Data;
    912
    1013namespace HeuristicLab.Hive.Server.ADODataAccess.TableAdapterWrapper {
     
    3942    }
    4043
    41     public byte[] GetSerializedJob(Guid jobId) {
    42       return TransactionalAdapter.GetSerializedJobById(jobId);
    43     }
     44    public Stream GetSerializedJobStream(Guid jobId,
     45      bool useExistingConnection) {
     46      SqlConnection connection = null;
     47      SqlTransaction transaction = null;
    4448
    45     public bool UpdateSerialiedJob(byte[] serializedJob, Guid jobId) {
    46       return TransactionalAdapter.UpdateSerializedJob(serializedJob, jobId) > 0;
     49      if (useExistingConnection) {
     50        connection =
     51          base.Session.Connection as SqlConnection;
     52
     53        transaction =
     54          adapter.Transaction;
     55      } else {
     56        connection =
     57         ((SessionFactory)
     58           (base.Session.Factory)).CreateConnection()
     59           as SqlConnection;
     60      }
     61
     62      VarBinarySource source =
     63        new VarBinarySource(
     64          connection, transaction,
     65          "Job", "SerializedJob", "JobId", jobId);
     66
     67      return new VarBinaryStream(source);
    4768    }
    4869
  • trunk/sources/HeuristicLab.Hive.Server.ADODataAccess/3.2/TableAdapterWrapper/JobResultsAdapterWrapper.cs

    r2099 r2117  
    77using HeuristicLab.Hive.Contracts.BusinessObjects;
    88using System.Data.Common;
     9using System.IO;
    910
    1011namespace HeuristicLab.Hive.Server.ADODataAccess.TableAdapterWrapper {
     
    3637    }
    3738
    38     public byte[] GetSerializedJobResult(Guid jobResultId) {
    39       return TransactionalAdapter.GetSerializedJobResultById(jobResultId);
    40     }
     39    public Stream GetSerializedJobResultStream(Guid jobResultId,
     40      bool useExistingConnection) {
     41      SqlConnection connection = null;
     42      SqlTransaction transaction = null;
    4143
    42     public bool UpdateSerialiedJobResult(byte[] serializedJobResult, Guid jobResultId) {
    43       return TransactionalAdapter.UpdateSerializedJobResultById(serializedJobResult, jobResultId) > 0;
     44      if (useExistingConnection) {
     45        connection =
     46          base.Session.Connection as SqlConnection;
     47
     48        transaction =
     49          adapter.Transaction;
     50      } else {
     51        connection =
     52         ((SessionFactory)
     53           (base.Session.Factory)).CreateConnection()
     54           as SqlConnection;
     55      }
     56
     57      VarBinarySource source =
     58        new VarBinarySource(
     59          connection, transaction,
     60          "JobResult", "JobResult", "JobResultId", jobResultId);
     61
     62      return new VarBinaryStream(source);
    4463    }
    4564
  • trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs

    r2099 r2117  
    4242  /// The ClientCommunicator manages the whole communication with the client
    4343  /// </summary>
    44   public class ClientCommunicator: IClientCommunicator {
     44  public class ClientCommunicator: IClientCommunicator,
     45    IInternalClientCommunicator{
    4546    private static Dictionary<Guid, DateTime> lastHeartbeats =
    4647      new Dictionary<Guid,DateTime>();
     
    370371    /// <param name="clientId"></param>
    371372    /// <returns></returns>
    372     public ResponseJob SendJob(Guid clientId) {
     373    public ResponseSerializedJob SendSerializedJob(Guid clientId) {
    373374      ISession session = factory.GetSessionForCurrentThread();
    374375      ITransaction tx = null;
     
    380381        tx = session.BeginTransaction();
    381382
    382         ResponseJob response = new ResponseJob();
     383        ResponseSerializedJob response = new ResponseSerializedJob();
    383384
    384385        Job job2Calculate = scheduler.GetNextJobForClient(clientId);
     
    415416    }
    416417
     418    /// <summary>
     419    /// if the client was told to pull a job he calls this method
     420    /// the server selects a job and sends it to the client
     421    /// </summary>
     422    /// <param name="clientId"></param>
     423    /// <returns></returns>
     424    public ResponseJob SendJob(Guid clientId) {
     425      ISession session = factory.GetSessionForCurrentThread();
     426      ITransaction tx = null;
     427
     428      try {
     429        IJobAdapter jobAdapter =
     430          session.GetDataAdapter<Job, IJobAdapter>();
     431
     432        tx = session.BeginTransaction();
     433
     434        ResponseJob response = new ResponseJob();
     435
     436        Job job2Calculate = scheduler.GetNextJobForClient(clientId);
     437        if (job2Calculate != null) {
     438          response.Job = job2Calculate;
     439          response.Success = true;
     440          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
     441          lock (newAssignedJobs) {
     442            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
     443              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
     444          }
     445        } else {
     446          response.Success = false;
     447          response.Job = null;
     448          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
     449        }
     450
     451        tx.Commit();
     452
     453        return response;
     454      }
     455      catch (Exception ex) {
     456        if (tx != null)
     457          tx.Rollback();
     458        throw ex;
     459      }
     460      finally {
     461        if (session != null)
     462          session.EndSession();
     463      }
     464    }
     465
     466    public ResponseResultReceived ProcessJobResult(
     467      JobResult result,
     468      Stream stream,
     469      bool finished) {
     470      ISession session = factory.GetSessionForCurrentThread();
     471      ITransaction tx = null;
     472      Stream jobResultStream = null;
     473      Stream jobStream = null;
     474
     475      try {
     476        tx = session.BeginTransaction();
     477
     478        ResponseResultReceived response =
     479          ProcessJobResult(
     480          result.ClientId,
     481          result.JobId,
     482          new byte[] {},
     483          result.Percentage,
     484          result.Exception,
     485          finished);
     486
     487        //second deserialize the BLOB
     488        IJobResultsAdapter jobResultsAdapter =
     489          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
     490
     491        IJobAdapter jobAdapter =
     492          session.GetDataAdapter<Job, IJobAdapter>();
     493
     494        jobResultStream =
     495          jobResultsAdapter.GetSerializedJobResultStream(result.Id, true);
     496
     497        jobStream =
     498          jobAdapter.GetSerializedJobStream(result.JobId, true);
     499
     500        byte[] buffer = new byte[3024];
     501        int read = 0;
     502        while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
     503          jobResultStream.Write(buffer, 0, read);
     504
     505          if (finished)
     506            jobStream.Write(buffer, 0, read);
     507        }
     508
     509        jobStream.Close();
     510
     511        tx.Commit();
     512
     513        return response;
     514      }
     515      catch (Exception ex) {
     516        if (tx != null)
     517          tx.Rollback();
     518        throw ex;
     519      }
     520      finally {
     521        if (jobStream != null)
     522          jobStream.Dispose();
     523
     524        if (jobResultStream != null)
     525          jobResultStream.Dispose();
     526
     527        if (session != null)
     528          session.EndSession();
     529      }
     530    }
     531
    417532    private ResponseResultReceived ProcessJobResult(Guid clientId,
    418533      Guid jobId,
     
    484599          return response;
    485600        }
    486         job.SerializedJobData = result;
    487601        job.JobInfo.Percentage = percentage;
    488602
     
    490604          job.JobInfo.State = State.finished;
    491605          job.SerializedJobData = result;
    492         }
    493 
    494         jobAdapter.UpdateSerializedJob(job);
     606          jobAdapter.UpdateSerializedJob(job);
     607        }
    495608
    496609        List<JobResult> jobResults = new List<JobResult>(
  • trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientFacade.cs

    r2099 r2117  
    3131using System.Runtime.Serialization.Formatters.Binary;
    3232using System.ServiceModel;
     33using HeuristicLab.Hive.Server.Core.InternalInterfaces;
    3334
    3435namespace HeuristicLab.Hive.Server.Core {
     
    5253    public ResponseJob SendJob(Guid clientId) {
    5354      return clientCommunicator.SendJob(clientId);
     55    }
     56
     57    public ResponseSerializedJob SendSerializedJob(Guid clientId) {
     58      return clientCommunicator.SendSerializedJob(clientId);
    5459    }
    5560
     
    8388
    8489    public Stream SendStreamedJob(Guid clientId) {
    85       return
    86         new StreamedObject<ResponseJob>(
    87           this.SendJob(clientId));
     90      MultiStream stream =
     91        new MultiStream();
     92
     93      ResponseJob job =
     94        this.SendJob(clientId);
     95
     96      //first send response
     97      stream.AddStream(
     98        new StreamedObject<ResponseJob>(job));
     99
     100      IJobManager jobManager =
     101        ServiceLocator.GetJobManager();
     102
     103      //second stream the job binary data
     104      stream.AddStream(
     105        ((IInternalJobManager)(jobManager)).
     106        GetJobStreamById(
     107          job.Job.Id));
     108
     109      OperationContext clientContext = OperationContext.Current;
     110        clientContext.OperationCompleted += new EventHandler(delegate(object sender, EventArgs args) {
     111          if (stream != null) {
     112            stream.Dispose();
     113          }
     114        });
     115
     116      return stream;
    88117    }
    89118
     
    97126      BinaryFormatter formatter =
    98127          new BinaryFormatter();
    99       SerializedJobResult result =
    100         (SerializedJobResult)formatter.Deserialize(stream);
    101128
    102       return this.StoreFinishedJobResult(
    103           result.JobResult.ClientId,
    104           result.JobResult.JobId,
    105           result.SerializedJobResultData,
    106           result.JobResult.Percentage,
    107           result.JobResult.Exception);
     129      JobResult result =
     130        (JobResult)formatter.Deserialize(stream);
     131
     132      return ((IInternalClientCommunicator)
     133        clientCommunicator).ProcessJobResult(
     134          result, stream, true);
    108135    }
    109136
     
    111138      BinaryFormatter formatter =
    112139          new BinaryFormatter();
    113       SerializedJobResult result = (SerializedJobResult)formatter.Deserialize(stream);
    114140
    115       return this.ProcessSnapshot(
    116           result.JobResult.ClientId,
    117           result.JobResult.JobId,
    118           result.SerializedJobResultData,
    119           result.JobResult.Percentage,
    120           result.JobResult.Exception);
     141      JobResult result =
     142        (JobResult)formatter.Deserialize(stream);
     143
     144      return ((IInternalClientCommunicator)
     145        clientCommunicator).ProcessJobResult(
     146          result, stream, false);
    121147    }
    122148
  • trunk/sources/HeuristicLab.Hive.Server.Core/3.2/DbTestApp.cs

    r1656 r2117  
    3131using System.Diagnostics;
    3232using HeuristicLab.DataAccess.Interfaces;
     33using System.IO;
    3334
    3435namespace HeuristicLab.Hive.Server {
     
    3738      AutoRestart = true)]
    3839  class HiveDbTestApplication : ApplicationBase {
    39   /*  private void TestClientAdapter() {
    40       IClientAdapter clientAdapter =
    41         ServiceLocator.GetClientAdapter();
    42 
    43       ClientInfo client = new ClientInfo();
    44       client.Login = DateTime.Now;
    45       clientAdapter.Update(client);
    46 
    47       ClientInfo clientRead =
    48         clientAdapter.GetById(client.Id);
    49       Debug.Assert(
    50         clientRead != null &&
    51         client.Id == clientRead.Id);
    52 
    53       client.CpuSpeedPerCore = 2000;
    54       clientAdapter.Update(client);
    55       clientRead =
    56         clientAdapter.GetById(client.Id);
    57       Debug.Assert(
    58        clientRead != null &&
    59        client.Id == clientRead.Id &&
    60        clientRead.CpuSpeedPerCore == 2000);
    61 
    62       ICollection<ClientInfo> clients =
    63         clientAdapter.GetAll();
    64       int count = clients.Count;
    65 
    66       clientAdapter.Delete(client);
    67 
    68       clients = clientAdapter.GetAll();
    69       Debug.Assert(count - 1 == clients.Count);
    70     } */
    71 
    72     private void TestClientGroupAdapter() {
    73       ISessionFactory factory =
    74         ServiceLocator.GetSessionFactory();
    75 
    76       ISession session =
    77         factory.GetSessionForCurrentThread();
    78 
    79       ITransaction trans = null;
    80 
    81       try {
    82         IClientGroupAdapter clientGroupAdapter =
    83         session.GetDataAdapter<ClientGroup, IClientGroupAdapter>();
    84 
    85         trans =
    86           session.BeginTransaction();
    87 
    88         ClientInfo client =
    89           new ClientInfo();
    90         client.Name = "Stefan";
     40    /*  private void TestClientAdapter() {
     41        IClientAdapter clientAdapter =
     42          ServiceLocator.GetClientAdapter();
     43
     44        ClientInfo client = new ClientInfo();
    9145        client.Login = DateTime.Now;
    92 
    93         ClientInfo client2 =
    94           new ClientInfo();
    95         client2.Name = "Martin";
    96         client2.Login = DateTime.Now;
    97 
    98         ClientInfo client3 =
    99           new ClientInfo();
    100         client3.Name = "Heinz";
    101         client3.Login = DateTime.Now;
    102 
    103         ClientGroup group =
    104           new ClientGroup();
    105 
    106         ClientGroup subGroup =
    107           new ClientGroup();
    108         subGroup.Resources.Add(client);
    109 
    110         group.Resources.Add(client3);
    111         group.Resources.Add(client2);
    112         group.Resources.Add(subGroup);
    113 
    114         clientGroupAdapter.Update(group);
    115 
    116         ClientGroup read =
    117           clientGroupAdapter.GetById(group.Id);
    118 
    119         ICollection<ClientGroup> clientGroups =
    120           clientGroupAdapter.GetAll();
     46        clientAdapter.Update(client);
     47
     48        ClientInfo clientRead =
     49          clientAdapter.GetById(client.Id);
     50        Debug.Assert(
     51          clientRead != null &&
     52          client.Id == clientRead.Id);
     53
     54        client.CpuSpeedPerCore = 2000;
     55        clientAdapter.Update(client);
     56        clientRead =
     57          clientAdapter.GetById(client.Id);
     58        Debug.Assert(
     59         clientRead != null &&
     60         client.Id == clientRead.Id &&
     61         clientRead.CpuSpeedPerCore == 2000);
     62
     63        ICollection<ClientInfo> clients =
     64          clientAdapter.GetAll();
     65        int count = clients.Count;
     66
     67        clientAdapter.Delete(client);
     68
     69        clients = clientAdapter.GetAll();
     70        Debug.Assert(count - 1 == clients.Count);
     71      }
     72
     73      private void TestClientGroupAdapter() {
     74        ISessionFactory factory =
     75          ServiceLocator.GetSessionFactory();
     76
     77        ISession session =
     78          factory.GetSessionForCurrentThread();
     79
     80        ITransaction trans = null;
     81
     82        try {
     83          IClientGroupAdapter clientGroupAdapter =
     84          session.GetDataAdapter<ClientGroup, IClientGroupAdapter>();
     85
     86          trans =
     87            session.BeginTransaction();
     88
     89          ClientInfo client =
     90            new ClientInfo();
     91          client.Name = "Stefan";
     92          client.Login = DateTime.Now;
     93
     94          ClientInfo client2 =
     95            new ClientInfo();
     96          client2.Name = "Martin";
     97          client2.Login = DateTime.Now;
     98
     99          ClientInfo client3 =
     100            new ClientInfo();
     101          client3.Name = "Heinz";
     102          client3.Login = DateTime.Now;
     103
     104          ClientGroup group =
     105            new ClientGroup();
     106
     107          ClientGroup subGroup =
     108            new ClientGroup();
     109          subGroup.Resources.Add(client);
     110
     111          group.Resources.Add(client3);
     112          group.Resources.Add(client2);
     113          group.Resources.Add(subGroup);
     114
     115          clientGroupAdapter.Update(group);
     116
     117          ClientGroup read =
     118            clientGroupAdapter.GetById(group.Id);
     119
     120          ICollection<ClientGroup> clientGroups =
     121            clientGroupAdapter.GetAll();
     122
     123          IClientAdapter clientAdapter =
     124            session.GetDataAdapter<ClientInfo, IClientAdapter>();
     125
     126          clientAdapter.Delete(client3);
     127
     128          read =
     129             clientGroupAdapter.GetById(group.Id);
     130
     131          clientGroupAdapter.Delete(subGroup);
     132
     133          read =
     134             clientGroupAdapter.GetById(group.Id);
     135
     136          clientGroups =
     137            clientGroupAdapter.GetAll();
     138
     139          clientGroupAdapter.Delete(group);
     140
     141          clientGroups =
     142            clientGroupAdapter.GetAll();
     143
     144          clientAdapter.Delete(client);
     145          clientAdapter.Delete(client2);
     146        }
     147        finally {
     148          if (trans != null)
     149            trans.Rollback();
     150
     151          session.EndSession();
     152        }
     153      }
     154
     155      private void InsertTestClientGroups() {
     156        ISessionFactory factory =
     157          ServiceLocator.GetSessionFactory();
     158
     159        ISession session =
     160          factory.GetSessionForCurrentThread();
     161
     162        ITransaction trans = null;
     163
     164        try {
     165          IClientGroupAdapter clientGroupAdapter =
     166          session.GetDataAdapter<ClientGroup, IClientGroupAdapter>();
     167
     168          trans =
     169            session.BeginTransaction();
     170
     171          ClientInfo client =
     172            new ClientInfo();
     173          client.Name = "Stefan";
     174          client.Login = DateTime.Now;
     175
     176          ClientInfo client2 =
     177            new ClientInfo();
     178          client2.Name = "Martin";
     179          client2.Login = DateTime.Now;
     180
     181          ClientGroup group =
     182            new ClientGroup();
     183          group.Name = "Gruppe1";
     184
     185          ClientGroup subGroup =
     186            new ClientGroup();
     187          subGroup.Name = "Untergruppe1";
     188          subGroup.Resources.Add(client);
     189
     190          group.Resources.Add(client2);
     191          group.Resources.Add(subGroup);
     192
     193          clientGroupAdapter.Update(group);
     194
     195          trans.Commit();
     196        }
     197        finally {
     198          session.EndSession();
     199        }
     200      }
     201
     202      private void TestJobAdapter() {
     203        IJobAdapter jobAdapter =
     204          ServiceLocator.GetJobAdapter();
     205        IClientAdapter clientAdapter =
     206          ServiceLocator.GetClientAdapter();
     207
     208        Job job = new Job();
     209
     210        ClientInfo client = new ClientInfo();
     211        client.Login = DateTime.Now;
     212
     213        job.Client = client;
     214        jobAdapter.Update(job);
     215
     216        ICollection<Job> jobs = jobAdapter.GetAll();
     217
     218        jobAdapter.Delete(job);
     219        clientAdapter.Delete(client);
     220
     221        jobs = jobAdapter.GetAll();
     222      }
     223
     224      private void TestJobResultsAdapter() {
     225        Job job = new Job();
     226
     227        ClientInfo client = new ClientInfo();
     228        client.Login = DateTime.Now;
     229
     230        job.Client = client;
     231
     232        IJobResultsAdapter resultsAdapter =
     233          ServiceLocator.GetJobResultsAdapter();
     234
     235        byte[] resultByte = {0x0f, 0x1f, 0x2f, 0x3f, 0x4f};
     236
     237        JobResult result = new JobResult();
     238        result.Client = client;
     239        result.Job = job;
     240        result.Result = resultByte;
     241
     242        resultsAdapter.Update(result);
     243
     244        JobResult read =
     245          resultsAdapter.GetById(result.Id);
     246        Debug.Assert(
     247          read.Id == result.Id &&
     248          result.Client.Id == read.Client.Id &&
     249          result.Job.Id == read.Job.Id &&
     250          result.Result == result.Result);
     251
     252        int count =
     253          resultsAdapter.GetAll().Count;
     254
     255        resultsAdapter.Delete(result);
     256
     257        ICollection<JobResult> allResults =
     258          resultsAdapter.GetAll();
     259
     260        Debug.Assert(allResults.Count == count - 1);
     261
     262        IJobAdapter jboAdapter =
     263          ServiceLocator.GetJobAdapter();
     264        jboAdapter.Delete(job);
     265        IClientAdapter clientAdapter =
     266          ServiceLocator.GetClientAdapter();
     267        clientAdapter.Delete(client);
     268      }     
     269
     270      private void TestTransaction() {
     271        ISessionFactory factory =
     272          ServiceLocator.GetSessionFactory();
     273
     274        ISession session =
     275          factory.GetSessionForCurrentThread();
    121276
    122277        IClientAdapter clientAdapter =
    123278          session.GetDataAdapter<ClientInfo, IClientAdapter>();
    124279
    125         clientAdapter.Delete(client3);
    126 
    127         read =
    128            clientGroupAdapter.GetById(group.Id);
    129 
    130         clientGroupAdapter.Delete(subGroup);
    131 
    132         read =
    133            clientGroupAdapter.GetById(group.Id);
    134 
    135         clientGroups =
    136           clientGroupAdapter.GetAll();
    137 
    138         clientGroupAdapter.Delete(group);
    139 
    140         clientGroups =
    141           clientGroupAdapter.GetAll();
    142 
    143         clientAdapter.Delete(client);
    144         clientAdapter.Delete(client2);
     280        ITransaction trans =
     281          session.BeginTransaction();
     282
     283        ClientInfo client = new ClientInfo();
     284        client.Login = DateTime.Now;
     285        clientAdapter.Update(client);
     286
     287        trans.Rollback();
     288
     289        session.EndSession();
     290      }  */
     291
     292    private void TestJobStreaming() {
     293      ISessionFactory factory =
     294         ServiceLocator.GetSessionFactory();
     295
     296      ISession session =
     297           factory.GetSessionForCurrentThread();
     298
     299      IJobAdapter jobAdapter =
     300        session.GetDataAdapter<Job, IJobAdapter>();
     301
     302      Stream s = jobAdapter.GetSerializedJobStream(
     303        new Guid("281602a2-1a47-4101-9a75-02974292d490"), true);
     304
     305      byte[] buffer = new byte[1024];
     306      while(s.Read(buffer, 0, buffer.Length)>0)
     307      {
     308         //do nothing
    145309      }
    146       finally {
    147         if (trans != null)
    148           trans.Rollback();
    149 
    150         session.EndSession();
    151       }
    152     }
    153 
    154     private void InsertTestClientGroups() {
    155       ISessionFactory factory =
    156         ServiceLocator.GetSessionFactory();
    157 
    158       ISession session =
    159         factory.GetSessionForCurrentThread();
    160 
    161       ITransaction trans = null;
    162 
    163       try {
    164         IClientGroupAdapter clientGroupAdapter =
    165         session.GetDataAdapter<ClientGroup, IClientGroupAdapter>();
    166 
    167         trans =
    168           session.BeginTransaction();
    169 
    170         ClientInfo client =
    171           new ClientInfo();
    172         client.Name = "Stefan";
    173         client.Login = DateTime.Now;
    174 
    175         ClientInfo client2 =
    176           new ClientInfo();
    177         client2.Name = "Martin";
    178         client2.Login = DateTime.Now;
    179 
    180         ClientGroup group =
    181           new ClientGroup();
    182         group.Name = "Gruppe1";
    183 
    184         ClientGroup subGroup =
    185           new ClientGroup();
    186         subGroup.Name = "Untergruppe1";
    187         subGroup.Resources.Add(client);
    188 
    189         group.Resources.Add(client2);
    190         group.Resources.Add(subGroup);
    191 
    192         clientGroupAdapter.Update(group);
    193 
    194         trans.Commit();
    195       }
    196       finally {
    197         session.EndSession();
    198       }
    199     }
    200 
    201    /* private void TestJobAdapter() {
    202       IJobAdapter jobAdapter =
    203         ServiceLocator.GetJobAdapter();
    204       IClientAdapter clientAdapter =
    205         ServiceLocator.GetClientAdapter();
    206 
    207       Job job = new Job();
    208 
    209       ClientInfo client = new ClientInfo();
    210       client.Login = DateTime.Now;
    211 
    212       job.Client = client;
    213       jobAdapter.Update(job);
    214 
    215       ICollection<Job> jobs = jobAdapter.GetAll();
    216 
    217       jobAdapter.Delete(job);
    218       clientAdapter.Delete(client);
    219 
    220       jobs = jobAdapter.GetAll();
    221     }
    222 
    223     private void TestJobResultsAdapter() {
    224       Job job = new Job();
    225 
    226       ClientInfo client = new ClientInfo();
    227       client.Login = DateTime.Now;
    228 
    229       job.Client = client;
    230 
    231       IJobResultsAdapter resultsAdapter =
    232         ServiceLocator.GetJobResultsAdapter();
    233 
    234       byte[] resultByte = {0x0f, 0x1f, 0x2f, 0x3f, 0x4f};
    235 
    236       JobResult result = new JobResult();
    237       result.Client = client;
    238       result.Job = job;
    239       result.Result = resultByte;
    240 
    241       resultsAdapter.Update(result);
    242 
    243       JobResult read =
    244         resultsAdapter.GetById(result.Id);
    245       Debug.Assert(
    246         read.Id == result.Id &&
    247         result.Client.Id == read.Client.Id &&
    248         result.Job.Id == read.Job.Id &&
    249         result.Result == result.Result);
    250 
    251       int count =
    252         resultsAdapter.GetAll().Count;
    253 
    254       resultsAdapter.Delete(result);
    255 
    256       ICollection<JobResult> allResults =
    257         resultsAdapter.GetAll();
    258 
    259       Debug.Assert(allResults.Count == count - 1);
    260 
    261       IJobAdapter jboAdapter =
    262         ServiceLocator.GetJobAdapter();
    263       jboAdapter.Delete(job);
    264       IClientAdapter clientAdapter =
    265         ServiceLocator.GetClientAdapter();
    266       clientAdapter.Delete(client);
    267     }      */
    268 
    269     private void TestTransaction() {
    270       ISessionFactory factory =
    271         ServiceLocator.GetSessionFactory();
    272 
    273       ISession session =
    274         factory.GetSessionForCurrentThread();
    275 
    276       IClientAdapter clientAdapter =
    277         session.GetDataAdapter<ClientInfo, IClientAdapter>();
    278 
    279       ITransaction trans =
    280         session.BeginTransaction();
    281 
    282       ClientInfo client = new ClientInfo();
    283       client.Login = DateTime.Now;
    284       clientAdapter.Update(client);
    285 
    286       trans.Rollback();
     310
     311      s.Close();
    287312
    288313      session.EndSession();
     
    290315
    291316    public override void Run() {
    292       TestClientGroupAdapter();
     317      //TestClientGroupAdapter();
    293318      //InsertTestClientGroups();
     319      TestJobStreaming();
    294320    }     
    295321  }
  • trunk/sources/HeuristicLab.Hive.Server.Core/3.2/HeuristicLab.Hive.Server.Core-3.2.csproj

    r2070 r2117  
    100100    <Compile Include="HiveServerMessages.Designer.cs" />
    101101    <Compile Include="InternalInterfaces\IHivePermissionManager.cs" />
     102    <Compile Include="InternalInterfaces\IInternalClientCommunicator.cs" />
    102103    <Compile Include="InternalInterfaces\IInternalJobManager.cs" />
    103104    <Compile Include="InternalInterfaces\IScheduler.cs" />
  • trunk/sources/HeuristicLab.Hive.Server.Core/3.2/InternalInterfaces/IInternalJobManager.cs

    r1141 r2117  
    2525using System.Text;
    2626using HeuristicLab.Hive.Contracts.BusinessObjects;
     27using System.IO;
    2728
    2829namespace HeuristicLab.Hive.Server.Core.InternalInterfaces {
    2930  interface IInternalJobManager {
    3031    void ResetJobsDependingOnResults(Job job);
     32
     33    Stream GetJobStreamById(Guid jobId);
    3134  }
    3235}
  • trunk/sources/HeuristicLab.Hive.Server.Core/3.2/JobManager.cs

    r2099 r2117  
    3131using HeuristicLab.DataAccess.Interfaces;
    3232using System.Data;
     33using System.IO;
    3334
    3435namespace HeuristicLab.Hive.Server.Core {
     
    171172           session.EndSession();
    172173       }
     174    }
     175
     176    /// <summary>
     177    /// Gets the streamed job
     178    /// </summary>
     179    /// <param name="jobId"></param>
     180    /// <returns></returns>
     181    public Stream GetJobStreamById(Guid jobId) {
     182      ISession session = factory.GetSessionForCurrentThread();
     183      try {
     184        IJobAdapter jobAdapter =
     185          session.GetDataAdapter<Job, IJobAdapter>();
     186
     187        return jobAdapter.GetSerializedJobStream(jobId, false);
     188      }
     189      finally {
     190        if (session != null)
     191          session.EndSession();
     192      }
    173193    }
    174194
  • trunk/sources/HeuristicLab.Hive.Server.DataAccess/3.2/IJobAdapter.cs

    r2099 r2117  
    2525using HeuristicLab.Hive.Contracts.BusinessObjects;
    2626using HeuristicLab.DataAccess.Interfaces;
     27using System.IO;
    2728
    2829namespace HeuristicLab.Hive.Server.DataAccess {
     
    8990
    9091    /// <summary>
     92    /// Gets a stream object for the large serialized job data
     93    /// </summary>
     94    /// <param name="jobId"></param>
     95    /// <returns></returns>
     96    Stream GetSerializedJobStream(Guid jobId, bool useExistingConnection);
     97
     98    /// <summary>
    9199    /// Saves or update the computable job
    92100    /// </summary>
  • trunk/sources/HeuristicLab.Hive.Server.DataAccess/3.2/IJobResultsAdapter.cs

    r2099 r2117  
    2525using HeuristicLab.Hive.Contracts.BusinessObjects;
    2626using HeuristicLab.DataAccess.Interfaces;
     27using System.IO;
    2728
    2829namespace HeuristicLab.Hive.Server.DataAccess {
     
    5051
    5152    /// <summary>
     53    /// Gets a stream object for the large serialized job result data
     54    /// </summary>
     55    /// <param name="jobId"></param>
     56    /// <returns></returns>
     57    Stream GetSerializedJobResultStream(Guid jobResultId, bool useExistingConnection);
     58
     59    /// <summary>
    5260    /// Saves or update the computable job result
    5361    /// </summary>
Note: See TracChangeset for help on using the changeset viewer.