Changeset 5329
- Timestamp:
- 01/18/11 17:57:14 (14 years ago)
- Location:
- branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.ExperimentManager/3.3/HiveJobDownloader.cs
r5181 r5329 58 58 abort = false; 59 59 tasks = new List<Task<HiveJob>>(); 60 TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException); 60 61 foreach (Guid jobId in jobIds) { 61 62 tasks.Add(Task<SerializedJob>.Factory.StartNew( … … 64 65 } 65 66 } 67 68 private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) { 69 e.SetObserved(); // evoid crash of process because task crashes. first exception found is handled in Results property 70 } 71 66 72 // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory 67 73 private Semaphore downloadSemaphore = new Semaphore(2, 2); -
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs
r5282 r5329 67 67 OperationCollection coll; 68 68 IAtomicOperation operation; 69 TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException); 69 70 70 71 while (ExecutionStack.Count > 0) { … … 116 117 } 117 118 } 119 } 120 121 private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) { 122 e.SetObserved(); // avoid crash of process 118 123 } 119 124 … … 275 280 var keyValuePair = (KeyValuePair<int, OperationJob>)keyValuePairObj; 276 281 var groups = ResourceIds.Split(';'); 277 maxSerializedJobsInMemory.WaitOne(); 278 SerializedJob serializedJob = null; 279 while (serializedJob == null) { // repeat until success; rare race-conditions occur at serializations (enumeration was changed-exceptions); maybe this is because all the parent-scopes and execution-contexts at some point contain the hiveengine and the Log in here 280 cancellationToken.ThrowIfCancellationRequested(); 282 ResponseObject<JobDto> response = null; 283 try { 284 maxSerializedJobsInMemory.WaitOne(); 285 SerializedJob serializedJob = null; 286 while (serializedJob == null) { // repeat until success; rare race-conditions occur at serializations (enumeration was changed-exceptions); maybe this is because all the parent-scopes and execution-contexts at some point contain the hiveengine and the Log in here 287 cancellationToken.ThrowIfCancellationRequested(); 288 try { 289 lock (Log) { 290 serializedJob = new SerializedJob(); 291 } 292 } 293 catch (Exception e) { 294 LogException(e); 295 } 296 } 297 // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems 298 lock (locker) { 299 ((IAtomicOperation)keyValuePair.Value.Operation).Scope.Parent = parentScopeClone; 300 keyValuePair.Value.Operation = (IOperation)keyValuePair.Value.Operation.Clone(); 301 if (keyValuePair.Value.Operation is IAtomicOperation) 302 ((IAtomicOperation)keyValuePair.Value.Operation).Scope.ClearParentScopes(); 303 serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value); 304 } 305 serializedJob.JobInfo = new JobDto(); 306 serializedJob.JobInfo.State = JobState.Offline; 307 serializedJob.JobInfo.CoresNeeded = 1; 308 serializedJob.JobInfo.PluginsNeeded = pluginsNeeded; 309 serializedJob.JobInfo.Priority = priority; 281 310 try { 282 lock (Log) { 283 serializedJob = new SerializedJob(); 284 } 285 } 286 catch (Exception e) { 287 LogException(e); 288 } 289 } 290 // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems 291 lock (locker) { 292 ((IAtomicOperation)keyValuePair.Value.Operation).Scope.Parent = parentScopeClone; 293 keyValuePair.Value.Operation = (IOperation)keyValuePair.Value.Operation.Clone(); 294 if (keyValuePair.Value.Operation is IAtomicOperation) 295 ((IAtomicOperation)keyValuePair.Value.Operation).Scope.ClearParentScopes(); 296 serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value); 297 } 298 serializedJob.JobInfo = new JobDto(); 299 serializedJob.JobInfo.State = JobState.Offline; 300 serializedJob.JobInfo.CoresNeeded = 1; 301 serializedJob.JobInfo.PluginsNeeded = pluginsNeeded; 302 serializedJob.JobInfo.Priority = priority; 303 ResponseObject<JobDto> response = null; 304 maxConcurrentConnections.WaitOne(); 305 while (response == null) { // repeat until success 306 cancellationToken.ThrowIfCancellationRequested(); 307 try { 308 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 309 response = service.Obj.AddJobWithGroupStrings(serializedJob, groups); 310 serializedJob = null; 311 maxSerializedJobsInMemory.Release(); 312 } 313 } 314 catch (Exception e) { 315 LogException(e); 316 } 317 } 318 maxConcurrentConnections.Release(); 311 maxConcurrentConnections.WaitOne(); 312 while (response == null) { // repeat until success 313 cancellationToken.ThrowIfCancellationRequested(); 314 try { 315 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 316 response = service.Obj.AddJobWithGroupStrings(serializedJob, groups); 317 serializedJob = null; 318 } 319 } 320 catch (Exception e) { 321 LogException(e); 322 } 323 } 324 } 325 finally { 326 maxSerializedJobsInMemory.Release(); 327 } 328 } 329 finally { 330 maxConcurrentConnections.Release(); 331 } 319 332 return response.Obj; 320 333 } … … 323 336 Guid jobId = (Guid)jobIdObj; 324 337 SerializedJob serializedJob = null; 325 maxSerializedJobsInMemory.WaitOne(); 326 maxConcurrentConnections.WaitOne(); 327 while (serializedJob == null) { // repeat until success 328 cancellationToken.ThrowIfCancellationRequested(); 329 try { 330 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 331 serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj; 332 } 333 } 334 catch (Exception e) { 335 LogException(e); 336 } 337 } 338 maxConcurrentConnections.Release(); 339 OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData); 340 serializedJob = null; 341 maxSerializedJobsInMemory.Release(); 342 LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId)); 338 OperationJob operationJob = null; 339 try { 340 maxSerializedJobsInMemory.WaitOne(); 341 maxConcurrentConnections.WaitOne(); 342 while (serializedJob == null) { // repeat until success 343 cancellationToken.ThrowIfCancellationRequested(); 344 try { 345 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 346 serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj; 347 } 348 } 349 catch (Exception e) { 350 LogException(e); 351 } 352 } 353 operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData); 354 serializedJob = null; 355 LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId)); 356 } 357 finally { 358 maxConcurrentConnections.Release(); 359 maxSerializedJobsInMemory.Release(); 360 } 343 361 return operationJob; 344 362 }
Note: See TracChangeset
for help on using the changeset viewer.