Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
01/06/11 13:17:56 (14 years ago)
Author:
cneumuel
Message:

#1260

  • improved efficiency of HiveEngine
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs

    r5213 r5227  
    9898
    9999          if (operation.Operator.Breakpoint) {
    100             Log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
     100            LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
    101101            Pause();
    102102          }
     
    136136    /// <param name="jobDict"></param>
    137137    private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict) {
    138       Log.LogMessage(string.Format("Executing {0} operations on the hive.", jobDict.Count));
     138      LogMessage(string.Format("Executing {0} operations on the hive.", jobDict.Count));
     139      Semaphore concurrentUploads = new Semaphore(6, 6);
    139140      try {
    140141        IDictionary<Guid, IOperation> jobIds = new Dictionary<Guid, IOperation>();
     142        List<Guid> remainingJobIds = new List<Guid>();
     143        IDictionary<Guid, int> jobNumbers = new Dictionary<Guid, int>(); // for better readability of log
    141144        JobResultList results;
    142 
    143         using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
    144           List<JobDto> jobs = new List<JobDto>();
    145           foreach (var kvp in jobDict) {
    146             // shuffle random variable to avoid the same random sequence in each operation
    147             IRandom random = FindRandomParameter(kvp.Key as IExecutionContext);
    148             if (random != null)
    149               random.Reset(random.Next());
    150 
     145        var pluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
     146
     147        List<JobDto> jobs = new List<JobDto>();
     148        foreach (var kvp in jobDict) {
     149
     150          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
     151          IRandom random = FindRandomParameter(kvp.Key as IExecutionContext);
     152          if (random != null)
     153            random.Reset(random.Next());
     154
     155          Task.Factory.StartNew((operationJob) => {
    151156            var groups = ResourceIds.Split(';');
    152157            SerializedJob serializedJob = new SerializedJob();
    153             serializedJob.SerializedJobData = SerializedJob.Serialize(kvp.Value);
     158            serializedJob.SerializedJobData = SerializedJob.Serialize(operationJob);
    154159            serializedJob.JobInfo = new JobDto();
    155160            serializedJob.JobInfo.State = JobState.Offline;
    156161            serializedJob.JobInfo.CoresNeeded = 1;
    157             serializedJob.JobInfo.PluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
    158             ResponseObject<JobDto> response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
     162            serializedJob.JobInfo.PluginsNeeded = pluginsNeeded;
     163            ResponseObject<JobDto> response;
     164            concurrentUploads.WaitOne();
     165            using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     166              response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
     167            }
     168            concurrentUploads.Release();
    159169            jobs.Add(response.Obj);
    160170            jobIds.Add(response.Obj.Id, kvp.Key);
    161           }
    162           results = service.Obj.GetJobResults(jobIds.Keys).Obj;
    163         }
    164 
    165         while (!results.All(
    166             x => x.State == JobState.Finished ||
    167             x.State == JobState.Failed ||
    168             x.State == JobState.Aborted)) {
     171            lock (remainingJobIds) {
     172              remainingJobIds.Add(response.Obj.Id);
     173              jobNumbers.Add(response.Obj.Id, remainingJobIds.Count);
     174            }
     175            LogMessage(string.Format("Submitted job #{0} (id: {1})", jobNumbers[response.Obj.Id], response.Obj.Id));
     176          }, kvp.Value);
     177        }
     178
     179        while (remainingJobIds.Count != jobDict.Count) {
     180          Thread.Sleep(1000);
     181        }
     182
     183        LogMessage("Waiting for results...");
     184        int jobsFinishedCount = 0;
     185        Semaphore concurrentDownloads = new Semaphore(4, 4);
     186        while (remainingJobIds.Count > 0) {
    169187          Thread.Sleep(5000);
    170188          using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
    171             results = service.Obj.GetJobResults(jobIds.Keys).Obj;
    172           }
    173         }
    174 
    175         // all finished
    176         using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
    177           foreach (Guid jobId in jobIds.Keys) {
    178             SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
    179             OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
    180             jobDict[jobIds[jobId]] = operationJob;
    181           }
    182         }
    183 
     189            results = service.Obj.GetJobResults(remainingJobIds).Obj;
     190          }
     191          foreach (var result in results) {
     192            if (result.State == JobState.Finished) {
     193              Task.Factory.StartNew((jobIdObj) => {
     194                Guid jobId = (Guid)jobIdObj;
     195                SerializedJob serializedJob;
     196                concurrentDownloads.WaitOne();
     197                using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
     198                  serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
     199                }
     200                concurrentDownloads.Release();
     201                OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
     202                jobDict[jobIds[jobId]] = operationJob;
     203                LogMessage(string.Format("Downloaded job #{0} (id: {1})", jobNumbers[jobId], jobId));
     204                jobsFinishedCount++;
     205              }, result.Id);
     206
     207              remainingJobIds.Remove(result.Id);
     208            } else if (result.State == JobState.Aborted) {
     209              LogMessage(string.Format("Job #{0} aborted (id: {1})", jobNumbers[result.Id], result.Id));
     210              remainingJobIds.Remove(result.Id);
     211            } else if (result.State == JobState.Failed) {
     212              LogMessage(string.Format("Job {0} failed (id: {1}): {2}", jobNumbers[result.Id], result.Id, result.Exception));
     213              remainingJobIds.Remove(result.Id);
     214            }
     215          }
     216        }
     217
     218        // wait for all tasks to finish downloading and deserializing
     219        while (jobsFinishedCount != jobDict.Count) {
     220          Thread.Sleep(1000);
     221        }
     222
     223        LogMessage("All jobs finished. Deleting jobs on hive.");
    184224        // delete jobs
    185225        using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
     
    189229        }
    190230
    191         Log.LogMessage(string.Format("Operations on the hive finished.", jobDict.Count));
     231        LogMessage(string.Format("Operations on the hive finished.", jobDict.Count));
    192232      }
    193233      catch (Exception e) {
    194         Log.LogException(e);
     234        LogException(e);
    195235        throw e;
     236      }
     237    }
     238
     239    private void DownloadJob(Guid jobId, IDictionary<IOperation, OperationJob> jobDict, IDictionary<Guid, IOperation> jobIds) {
     240      SerializedJob serializedJob;
     241      using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
     242        serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
     243      }
     244      OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
     245      jobDict[jobIds[jobId]] = operationJob;
     246      LogMessage(string.Format("Downloaded job (id: {0})", jobId));
     247    }
     248
     249    /// <summary>
     250    /// Threadsafe message logging
     251    /// </summary>
     252    private void LogMessage(string message) {
     253      lock (Log) {
     254        Log.LogMessage(message);
     255      }
     256    }
     257
     258    /// <summary>
     259    /// Threadsafe exception logging
     260    /// </summary>
     261    private void LogException(Exception exception) {
     262      lock (Log) {
     263        Log.LogException(exception);
    196264      }
    197265    }
Note: See TracChangeset for help on using the changeset viewer.