Changeset 5268 for branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3
- Timestamp:
- 01/10/11 15:48:54 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs
r5263 r5268 67 67 coll = (OperationCollection)next; 68 68 if (coll.Parallel) { 69 // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector 70 IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone(); 71 parentScopeClone.SubScopes.Clear(); 72 parentScopeClone.ClearParentScopes(); 73 69 74 OperationJob[] jobs = new OperationJob[coll.Count]; 70 75 for (int i = 0; i < coll.Count; i++) { … … 72 77 } 73 78 74 IScope[] scopes = ExecuteOnHive(jobs, cancellationToken);79 IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken); 75 80 76 81 for (int i = 0; i < coll.Count; i++) { … … 136 141 /// </summary> 137 142 /// <param name="jobs"></param> 138 private IScope[] ExecuteOnHive(OperationJob[] jobs, CancellationToken cancellationToken) {143 private IScope[] ExecuteOnHive(OperationJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) { 139 144 LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length)); 140 145 IScope[] scopes = new Scope[jobs.Length]; … … 160 165 161 166 uploadTasks.Add(Task.Factory.StartNew<JobDto>((keyValuePairObj) => { 162 return UploadJob(pluginsNeeded, keyValuePairObj, cancellationToken);167 return UploadJob(pluginsNeeded, keyValuePairObj, parentScopeClone, cancellationToken); 163 168 }, new KeyValuePair<int, OperationJob>(i, job), cancellationToken)); 164 169 } … … 198 203 199 204 Thread.Sleep(10000); 200 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 201 results = service.Obj.GetJobResults(remainingJobIds).Obj; 202 } 203 var jobsFinished = results.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted); 204 finishedCount += jobsFinished.Count(); 205 var totalExecutionTime = TimeSpan.FromMilliseconds(results.Select(j => j.ExecutionTime).Union(executionTimes).Select(e => e.TotalMilliseconds).Sum()); 206 LogMessage(string.Format("Results polled. Jobs finished: {0}/{1}, TotalExecutionTime: {2}", finishedCount, jobs.Length, totalExecutionTime)); 207 foreach (var result in jobsFinished) { 208 if (result.State == JobState.Finished) { 209 downloadTasks.Add(Task.Factory.StartNew<OperationJob>((jobIdObj) => { 210 return DownloadJob(jobIndices, jobIdObj, cancellationToken); 211 }, result.Id, cancellationToken)); 212 } else if (result.State == JobState.Aborted) { 213 LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id)); 214 } else if (result.State == JobState.Failed) { 215 LogMessage(string.Format("Job {0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.Exception)); 216 } 217 remainingJobIds.Remove(result.Id); 218 executionTimes.Add(result.ExecutionTime); 205 try { 206 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 207 results = service.Obj.GetJobResults(remainingJobIds).Obj; 208 } 209 var jobsFinished = results.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted); 210 finishedCount += jobsFinished.Count(); 211 var totalExecutionTime = TimeSpan.FromMilliseconds(results.Select(j => j.ExecutionTime).Union(executionTimes).Select(e => e.TotalMilliseconds).Sum()); 212 LogMessage(string.Format("Results polled. Jobs finished: {0}/{1}, TotalExecutionTime: {2}", finishedCount, jobs.Length, totalExecutionTime)); 213 foreach (var result in jobsFinished) { 214 if (result.State == JobState.Finished) { 215 downloadTasks.Add(Task.Factory.StartNew<OperationJob>((jobIdObj) => { 216 return DownloadJob(jobIndices, jobIdObj, cancellationToken); 217 }, result.Id, cancellationToken)); 218 } else if (result.State == JobState.Aborted) { 219 LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id)); 220 } else if (result.State == JobState.Failed) { 221 LogMessage(string.Format("Job {0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.Exception)); 222 } 223 remainingJobIds.Remove(result.Id); 224 executionTimes.Add(result.ExecutionTime); 225 } 226 } 227 catch (Exception e) { 228 LogException(e); 219 229 } 220 230 } … … 254 264 255 265 private static object locker = new object(); 256 private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj, CancellationToken cancellationToken) {266 private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj, IScope parentScopeClone, CancellationToken cancellationToken) { 257 267 var keyValuePair = (KeyValuePair<int, OperationJob>)keyValuePairObj; 258 268 var groups = ResourceIds.Split(';'); … … 272 282 // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems 273 283 lock (locker) { 284 ((IAtomicOperation)keyValuePair.Value.Operation).Scope.Parent = parentScopeClone; 274 285 keyValuePair.Value.Operation = (IOperation)keyValuePair.Value.Operation.Clone(); 275 286 if (keyValuePair.Value.Operation is IAtomicOperation)
Note: See TracChangeset
for help on using the changeset viewer.