Free cookie consent management tool by TermsFeed Policy Generator

Changeset 5329


Ignore:
Timestamp:
01/18/11 17:57:14 (13 years ago)
Author:
cneumuel
Message:

#1260

  • robustified HiveEngine and HiveJobDownloader (handling of unobserved exceptions from tasks)
Location:
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.ExperimentManager/3.3/HiveJobDownloader.cs

    r5181 r5329  
    5858      abort = false;
    5959      tasks = new List<Task<HiveJob>>();
     60      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
    6061      foreach (Guid jobId in jobIds) {
    6162        tasks.Add(Task<SerializedJob>.Factory.StartNew(
     
    6465      }
    6566    }
     67
     68    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
     69      e.SetObserved(); // evoid crash of process because task crashes. first exception found is handled in Results property
     70    }
     71
    6672    // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory
    6773    private Semaphore downloadSemaphore = new Semaphore(2, 2);
  • branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs

    r5282 r5329  
    6767      OperationCollection coll;
    6868      IAtomicOperation operation;
     69      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
    6970
    7071      while (ExecutionStack.Count > 0) {
     
    116117        }
    117118      }
     119    }
     120
     121    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
     122      e.SetObserved(); // avoid crash of process
    118123    }
    119124
     
    275280      var keyValuePair = (KeyValuePair<int, OperationJob>)keyValuePairObj;
    276281      var groups = ResourceIds.Split(';');
    277       maxSerializedJobsInMemory.WaitOne();
    278       SerializedJob serializedJob = null;
    279       while (serializedJob == null) { // repeat until success; rare race-conditions occur at serializations (enumeration was changed-exceptions); maybe this is because all the parent-scopes and execution-contexts at some point contain the hiveengine and the Log in here
    280         cancellationToken.ThrowIfCancellationRequested();
     282      ResponseObject<JobDto> response = null;
     283      try {
     284        maxSerializedJobsInMemory.WaitOne();
     285        SerializedJob serializedJob = null;
     286        while (serializedJob == null) { // repeat until success; rare race-conditions occur at serializations (enumeration was changed-exceptions); maybe this is because all the parent-scopes and execution-contexts at some point contain the hiveengine and the Log in here
     287          cancellationToken.ThrowIfCancellationRequested();
     288          try {
     289            lock (Log) {
     290              serializedJob = new SerializedJob();
     291            }
     292          }
     293          catch (Exception e) {
     294            LogException(e);
     295          }
     296        }
     297        // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems
     298        lock (locker) {
     299          ((IAtomicOperation)keyValuePair.Value.Operation).Scope.Parent = parentScopeClone;
     300          keyValuePair.Value.Operation = (IOperation)keyValuePair.Value.Operation.Clone();
     301          if (keyValuePair.Value.Operation is IAtomicOperation)
     302            ((IAtomicOperation)keyValuePair.Value.Operation).Scope.ClearParentScopes();
     303          serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value);
     304        }
     305        serializedJob.JobInfo = new JobDto();
     306        serializedJob.JobInfo.State = JobState.Offline;
     307        serializedJob.JobInfo.CoresNeeded = 1;
     308        serializedJob.JobInfo.PluginsNeeded = pluginsNeeded;
     309        serializedJob.JobInfo.Priority = priority;
    281310        try {
    282           lock (Log) {
    283             serializedJob = new SerializedJob();
    284           }
    285         }
    286         catch (Exception e) {
    287           LogException(e);
    288         }
    289       }
    290       // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems
    291       lock (locker) {
    292         ((IAtomicOperation)keyValuePair.Value.Operation).Scope.Parent = parentScopeClone;
    293         keyValuePair.Value.Operation = (IOperation)keyValuePair.Value.Operation.Clone();
    294         if (keyValuePair.Value.Operation is IAtomicOperation)
    295           ((IAtomicOperation)keyValuePair.Value.Operation).Scope.ClearParentScopes();
    296         serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value);
    297       }
    298       serializedJob.JobInfo = new JobDto();
    299       serializedJob.JobInfo.State = JobState.Offline;
    300       serializedJob.JobInfo.CoresNeeded = 1;
    301       serializedJob.JobInfo.PluginsNeeded = pluginsNeeded;
    302       serializedJob.JobInfo.Priority = priority;
    303       ResponseObject<JobDto> response = null;
    304       maxConcurrentConnections.WaitOne();
    305       while (response == null) { // repeat until success
    306         cancellationToken.ThrowIfCancellationRequested();
    307         try {
    308           using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
    309             response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
    310             serializedJob = null;
    311             maxSerializedJobsInMemory.Release();
    312           }
    313         }
    314         catch (Exception e) {
    315           LogException(e);
    316         }
    317       }
    318       maxConcurrentConnections.Release();
     311          maxConcurrentConnections.WaitOne();
     312          while (response == null) { // repeat until success
     313            cancellationToken.ThrowIfCancellationRequested();
     314            try {
     315              using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     316                response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
     317                serializedJob = null;
     318              }
     319            }
     320            catch (Exception e) {
     321              LogException(e);
     322            }
     323          }
     324        }
     325        finally {
     326          maxSerializedJobsInMemory.Release();
     327        }
     328      }
     329      finally {
     330        maxConcurrentConnections.Release();
     331      }
    319332      return response.Obj;
    320333    }
     
    323336      Guid jobId = (Guid)jobIdObj;
    324337      SerializedJob serializedJob = null;
    325       maxSerializedJobsInMemory.WaitOne();
    326       maxConcurrentConnections.WaitOne();
    327       while (serializedJob == null) { // repeat until success
    328         cancellationToken.ThrowIfCancellationRequested();
    329         try {
    330           using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
    331             serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
    332           }
    333         }
    334         catch (Exception e) {
    335           LogException(e);
    336         }
    337       }
    338       maxConcurrentConnections.Release();
    339       OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
    340       serializedJob = null;
    341       maxSerializedJobsInMemory.Release();
    342       LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId));
     338      OperationJob operationJob = null;
     339      try {
     340        maxSerializedJobsInMemory.WaitOne();
     341        maxConcurrentConnections.WaitOne();
     342        while (serializedJob == null) { // repeat until success
     343          cancellationToken.ThrowIfCancellationRequested();
     344          try {
     345            using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     346              serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
     347            }
     348          }
     349          catch (Exception e) {
     350            LogException(e);
     351          }
     352        }
     353        operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
     354        serializedJob = null;
     355        LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId));
     356      }
     357      finally {
     358        maxConcurrentConnections.Release();
     359        maxSerializedJobsInMemory.Release();
     360      }
    343361      return operationJob;
    344362    }
Note: See TracChangeset for help on using the changeset viewer.