Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
01/07/11 17:08:36 (14 years ago)
Author:
cneumuel
Message:

#1260

  • added cancellation to HiveEngine
  • made HiveEngine more failproof for flaky connection to hive server
  • increased WCF message quotas to 300MB
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs

    r5228 r5232  
    7373            }
    7474
    75             ExecuteOnHive(jobs);
     75            ExecuteOnHive(jobs, cancellationToken);
    7676
    7777            foreach (var kvp in jobs) {
     
    134134    /// <summary>
    135135    /// This method blocks until all jobs are finished
     136    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
    136137    /// </summary>
    137138    /// <param name="jobDict"></param>
    138     private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict) {
     139    private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict, CancellationToken cancellationToken) {
    139140      LogMessage(string.Format("Executing {0} operations on the hive.", jobDict.Count));
    140141
     
    147148        var pluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
    148149        int finishedCount = 0;
     150        int uploadCount = 0;
    149151
    150152        // create upload-tasks
     
    157159
    158160          uploadTasks.Add(Task.Factory.StartNew<JobDto>((keyValuePairObj) => {
    159             return UploadJob(pluginsNeeded, keyValuePairObj);
    160           }, kvp));
    161         }
    162 
    163         Task processUploadedJobsTask = Task.Factory.StartNew(() => {
     161            return UploadJob(pluginsNeeded, keyValuePairObj, cancellationToken);
     162          }, kvp, cancellationToken));
     163        }
     164
     165        Task processUploadedJobsTask = new Task(() => {
    164166          // process finished upload-tasks
    165167          int uploadTasksCount = uploadTasks.Count;
    166168          for (int i = 0; i < uploadTasksCount; i++) {
     169            cancellationToken.ThrowIfCancellationRequested();
     170
    167171            var uploadTasksArray = uploadTasks.ToArray();
    168172            var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)];
     
    174178            IOperation key = ((KeyValuePair<IOperation, OperationJob>)task.AsyncState).Key;
    175179            JobDto jobDto = task.Result;
    176 
    177             jobIds.Add(jobDto.Id, key);
    178             remainingJobIds.Add(jobDto.Id);
    179             jobNumbers.Add(jobDto.Id, remainingJobIds.Count);
    180 
     180            lock (locker) {
     181              uploadCount++;
     182              jobIds.Add(jobDto.Id, key);
     183              remainingJobIds.Add(jobDto.Id);
     184              jobNumbers.Add(jobDto.Id, uploadCount);
     185            }
    181186            LogMessage(string.Format("Submitted job #{0}", jobNumbers[jobDto.Id], jobDto.Id));
    182187            uploadTasks.Remove(task);
    183188          }
    184         });
     189        }, cancellationToken, TaskCreationOptions.PreferFairness);
     190        processUploadedJobsTask.Start();
    185191
    186192        // poll job-statuses and create tasks for those which are finished
     
    188194        var executionTimes = new List<TimeSpan>();
    189195        while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) {
     196          cancellationToken.ThrowIfCancellationRequested();
     197
    190198          Thread.Sleep(10000);
    191199          using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
     
    199207            if (result.State == JobState.Finished) {
    200208              downloadTasks.Add(Task.Factory.StartNew<OperationJob>((jobIdObj) => {
    201                 return DownloadJob(jobNumbers, jobIdObj);
    202               }, result.Id));
     209                return DownloadJob(jobNumbers, jobIdObj, cancellationToken);
     210              }, result.Id, cancellationToken));
    203211            } else if (result.State == JobState.Aborted) {
    204212              LogMessage(string.Format("Job #{0} aborted (id: {1})", jobNumbers[result.Id], result.Id));
     
    214222        int downloadTasksCount = downloadTasks.Count;
    215223        for (int i = 0; i < downloadTasksCount; i++) {
     224          cancellationToken.ThrowIfCancellationRequested();
     225
    216226          var downloadTasksArray = downloadTasks.ToArray();
    217227          var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)];
     
    241251    }
    242252
    243     private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj) {
     253    private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj, CancellationToken cancellationToken) {
    244254      var keyValuePair = (KeyValuePair<IOperation, OperationJob>)keyValuePairObj;
    245255      var groups = ResourceIds.Split(';');
    246256      maxSerializedJobsInMemory.WaitOne();
    247       SerializedJob serializedJob = new SerializedJob();
     257      SerializedJob serializedJob = null;
     258      while (serializedJob == null) { // repeat until success; rare race-conditions occur at serializations (enumeration was changed-exceptions); maybe this is because all the parent-scopes and execution-contexts at some point contain the hiveengine and the Log in here
     259        cancellationToken.ThrowIfCancellationRequested();
     260        try {
     261          lock (Log) {
     262            serializedJob = new SerializedJob();
     263          }
     264        }
     265        catch (Exception e) {
     266          LogException(e);
     267        }
     268      }
    248269      maxConcurrentSerializations.WaitOne();
    249270      serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value);
     
    253274      serializedJob.JobInfo.CoresNeeded = 1;
    254275      serializedJob.JobInfo.PluginsNeeded = pluginsNeeded;
    255       ResponseObject<JobDto> response;
     276      ResponseObject<JobDto> response = null;
    256277      maxConcurrentConnections.WaitOne();
    257       using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
    258         response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
    259         serializedJob = null;
    260         maxSerializedJobsInMemory.Release();
     278      while (response == null) { // repeat until success
     279        cancellationToken.ThrowIfCancellationRequested();
     280        try {
     281          using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     282            response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
     283            serializedJob = null;
     284            maxSerializedJobsInMemory.Release();
     285          }
     286        }
     287        catch (Exception e) {
     288          LogException(e);
     289        }
    261290      }
    262291      maxConcurrentConnections.Release();
     
    264293    }
    265294
    266     private OperationJob DownloadJob(IDictionary<Guid, int> jobNumbers, object jobIdObj) {
     295    private OperationJob DownloadJob(IDictionary<Guid, int> jobNumbers, object jobIdObj, CancellationToken cancellationToken) {
    267296      Guid jobId = (Guid)jobIdObj;
    268       SerializedJob serializedJob;
     297      SerializedJob serializedJob = null;
     298      maxSerializedJobsInMemory.WaitOne();
    269299      maxConcurrentConnections.WaitOne();
    270       maxSerializedJobsInMemory.WaitOne();
    271       using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
    272         serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
     300      while (serializedJob == null) { // repeat until success
     301        cancellationToken.ThrowIfCancellationRequested();
     302        try {
     303          using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     304            serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
     305          }
     306        }
     307        catch (Exception e) {
     308          LogException(e);
     309        }
    273310      }
    274311      maxConcurrentConnections.Release();
Note: See TracChangeset for help on using the changeset viewer.