Free cookie consent management tool by TermsFeed Policy Generator

Changeset 5228


Ignore:
Timestamp:
01/06/11 18:34:38 (14 years ago)
Author:
cneumuel
Message:

#1260

  • further tweaked HiveEngine to be faster and more robust
Location:
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3
Files:
1 added
2 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HeuristicLab.HiveEngine-3.3.csproj

    r5179 r5228  
    8888  </ItemGroup>
    8989  <ItemGroup>
     90    <Compile Include="HiveEngineException.cs" />
    9091    <Compile Include="ScopeMergeException.cs" />
    9192    <Compile Include="Views\HiveEngineView.cs">
  • branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs

    r5227 r5228  
    2323  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
    2424  public class HiveEngine : Engine {
     25    private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections
     26    private Semaphore maxConcurrentSerializations = new Semaphore(1, 1); // allow ony ONE concurrent serialization, because the operations share the same ParentScopes and serializing the same objects concurrently causes problems
     27    private Semaphore maxSerializedJobsInMemory = new Semaphore(2, 2); // avoid memory problems
    2528    private CancellationToken cancellationToken;
    2629
     
    7679                ReIntegrateScope(kvp.Value.Operation as IAtomicOperation, kvp.Key as IAtomicOperation);
    7780              } else if (kvp.Key is OperationCollection) {
    78                 OperationCollection ocoll = (OperationCollection)kvp.Key;
    79                 for (int i = ocoll.Count - 1; i >= 0; i--)
    80                   if (ocoll[i] != null) executionStack.Push(ocoll[i]);
     81                // todo ??
    8182              }
    8283            }
     
    137138    private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict) {
    138139      LogMessage(string.Format("Executing {0} operations on the hive.", jobDict.Count));
    139       Semaphore concurrentUploads = new Semaphore(6, 6);
     140
     141      object locker = new object();
    140142      try {
    141143        IDictionary<Guid, IOperation> jobIds = new Dictionary<Guid, IOperation>();
     
    144146        JobResultList results;
    145147        var pluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
    146 
    147         List<JobDto> jobs = new List<JobDto>();
     148        int finishedCount = 0;
     149
     150        // create upload-tasks
     151        var uploadTasks = new List<Task<JobDto>>();
    148152        foreach (var kvp in jobDict) {
    149 
    150153          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
    151154          IRandom random = FindRandomParameter(kvp.Key as IExecutionContext);
     
    153156            random.Reset(random.Next());
    154157
    155           Task.Factory.StartNew((operationJob) => {
    156             var groups = ResourceIds.Split(';');
    157             SerializedJob serializedJob = new SerializedJob();
    158             serializedJob.SerializedJobData = SerializedJob.Serialize(operationJob);
    159             serializedJob.JobInfo = new JobDto();
    160             serializedJob.JobInfo.State = JobState.Offline;
    161             serializedJob.JobInfo.CoresNeeded = 1;
    162             serializedJob.JobInfo.PluginsNeeded = pluginsNeeded;
    163             ResponseObject<JobDto> response;
    164             concurrentUploads.WaitOne();
    165             using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
    166               response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
     158          uploadTasks.Add(Task.Factory.StartNew<JobDto>((keyValuePairObj) => {
     159            return UploadJob(pluginsNeeded, keyValuePairObj);
     160          }, kvp));
     161        }
     162
     163        Task processUploadedJobsTask = Task.Factory.StartNew(() => {
     164          // process finished upload-tasks
     165          int uploadTasksCount = uploadTasks.Count;
     166          for (int i = 0; i < uploadTasksCount; i++) {
     167            var uploadTasksArray = uploadTasks.ToArray();
     168            var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)];
     169            if (task.Status == TaskStatus.Faulted) {
     170              LogException(task.Exception);
     171              throw task.Exception;
    167172            }
    168             concurrentUploads.Release();
    169             jobs.Add(response.Obj);
    170             jobIds.Add(response.Obj.Id, kvp.Key);
    171             lock (remainingJobIds) {
    172               remainingJobIds.Add(response.Obj.Id);
    173               jobNumbers.Add(response.Obj.Id, remainingJobIds.Count);
    174             }
    175             LogMessage(string.Format("Submitted job #{0} (id: {1})", jobNumbers[response.Obj.Id], response.Obj.Id));
    176           }, kvp.Value);
    177         }
    178 
    179         while (remainingJobIds.Count != jobDict.Count) {
    180           Thread.Sleep(1000);
    181         }
    182 
    183         LogMessage("Waiting for results...");
    184         int jobsFinishedCount = 0;
    185         Semaphore concurrentDownloads = new Semaphore(4, 4);
    186         while (remainingJobIds.Count > 0) {
    187           Thread.Sleep(5000);
     173
     174            IOperation key = ((KeyValuePair<IOperation, OperationJob>)task.AsyncState).Key;
     175            JobDto jobDto = task.Result;
     176
     177            jobIds.Add(jobDto.Id, key);
     178            remainingJobIds.Add(jobDto.Id);
     179            jobNumbers.Add(jobDto.Id, remainingJobIds.Count);
     180
     181            LogMessage(string.Format("Submitted job #{0}", jobNumbers[jobDto.Id], jobDto.Id));
     182            uploadTasks.Remove(task);
     183          }
     184        });
     185
     186        // poll job-statuses and create tasks for those which are finished
     187        var downloadTasks = new List<Task<OperationJob>>();
     188        var executionTimes = new List<TimeSpan>();
     189        while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) {
     190          Thread.Sleep(10000);
    188191          using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
    189192            results = service.Obj.GetJobResults(remainingJobIds).Obj;
    190193          }
    191           foreach (var result in results) {
     194          var jobsFinished = results.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted);
     195          finishedCount += jobsFinished.Count();
     196          var totalExecutionTime = TimeSpan.FromMilliseconds(results.Select(j => j.ExecutionTime).Union(executionTimes).Select(e => e.TotalMilliseconds).Sum());
     197          LogMessage(string.Format("Results polled. Jobs finished: {0}/{1}, TotalExecutionTime: {2}", finishedCount, jobDict.Count, totalExecutionTime));
     198          foreach (var result in jobsFinished) {
    192199            if (result.State == JobState.Finished) {
    193               Task.Factory.StartNew((jobIdObj) => {
    194                 Guid jobId = (Guid)jobIdObj;
    195                 SerializedJob serializedJob;
    196                 concurrentDownloads.WaitOne();
    197                 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
    198                   serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
    199                 }
    200                 concurrentDownloads.Release();
    201                 OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
    202                 jobDict[jobIds[jobId]] = operationJob;
    203                 LogMessage(string.Format("Downloaded job #{0} (id: {1})", jobNumbers[jobId], jobId));
    204                 jobsFinishedCount++;
    205               }, result.Id);
    206 
    207               remainingJobIds.Remove(result.Id);
     200              downloadTasks.Add(Task.Factory.StartNew<OperationJob>((jobIdObj) => {
     201                return DownloadJob(jobNumbers, jobIdObj);
     202              }, result.Id));
    208203            } else if (result.State == JobState.Aborted) {
    209204              LogMessage(string.Format("Job #{0} aborted (id: {1})", jobNumbers[result.Id], result.Id));
    210               remainingJobIds.Remove(result.Id);
    211205            } else if (result.State == JobState.Failed) {
    212206              LogMessage(string.Format("Job {0} failed (id: {1}): {2}", jobNumbers[result.Id], result.Id, result.Exception));
    213               remainingJobIds.Remove(result.Id);
    214207            }
    215           }
    216         }
    217 
    218         // wait for all tasks to finish downloading and deserializing
    219         while (jobsFinishedCount != jobDict.Count) {
    220           Thread.Sleep(1000);
    221         }
    222 
    223         LogMessage("All jobs finished. Deleting jobs on hive.");
     208            remainingJobIds.Remove(result.Id);
     209            executionTimes.Add(result.ExecutionTime);
     210          }
     211        }
     212
     213        // process finished download-tasks
     214        int downloadTasksCount = downloadTasks.Count;
     215        for (int i = 0; i < downloadTasksCount; i++) {
     216          var downloadTasksArray = downloadTasks.ToArray();
     217          var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)];
     218          var jobId = (Guid)task.AsyncState;
     219          if (task.Status == TaskStatus.Faulted) {
     220            LogException(task.Exception);
     221            throw task.Exception;
     222          }
     223          jobDict[jobIds[(Guid)task.AsyncState]] = task.Result;
     224          downloadTasks.Remove(task);
     225        }
     226
     227        LogMessage(string.Format("All jobs finished (TotalExecutionTime: {0}). Deleting jobs on hive.", TimeSpan.FromMilliseconds(executionTimes.Select(e => e.TotalMilliseconds).Sum())));
    224228        // delete jobs
    225229        using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
     
    237241    }
    238242
    239     private void DownloadJob(Guid jobId, IDictionary<IOperation, OperationJob> jobDict, IDictionary<Guid, IOperation> jobIds) {
     243    private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj) {
     244      var keyValuePair = (KeyValuePair<IOperation, OperationJob>)keyValuePairObj;
     245      var groups = ResourceIds.Split(';');
     246      maxSerializedJobsInMemory.WaitOne();
     247      SerializedJob serializedJob = new SerializedJob();
     248      maxConcurrentSerializations.WaitOne();
     249      serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value);
     250      maxConcurrentSerializations.Release();
     251      serializedJob.JobInfo = new JobDto();
     252      serializedJob.JobInfo.State = JobState.Offline;
     253      serializedJob.JobInfo.CoresNeeded = 1;
     254      serializedJob.JobInfo.PluginsNeeded = pluginsNeeded;
     255      ResponseObject<JobDto> response;
     256      maxConcurrentConnections.WaitOne();
     257      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     258        response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
     259        serializedJob = null;
     260        maxSerializedJobsInMemory.Release();
     261      }
     262      maxConcurrentConnections.Release();
     263      return response.Obj;
     264    }
     265
     266    private OperationJob DownloadJob(IDictionary<Guid, int> jobNumbers, object jobIdObj) {
     267      Guid jobId = (Guid)jobIdObj;
    240268      SerializedJob serializedJob;
    241       using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
     269      maxConcurrentConnections.WaitOne();
     270      maxSerializedJobsInMemory.WaitOne();
     271      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
    242272        serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
    243273      }
     274      maxConcurrentConnections.Release();
    244275      OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
    245       jobDict[jobIds[jobId]] = operationJob;
    246       LogMessage(string.Format("Downloaded job (id: {0})", jobId));
     276      serializedJob = null;
     277      maxSerializedJobsInMemory.Release();
     278      LogMessage(string.Format("Downloaded job #{0}", jobNumbers[jobId], jobId));
     279      return operationJob;
    247280    }
    248281
Note: See TracChangeset for help on using the changeset viewer.