- Timestamp:
- 06/05/11 22:35:40 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.HiveEngine/3.4/HiveEngine.cs
r6219 r6357 37 37 [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")] 38 38 public class HiveEngine : Engine { 39 private static object locker = new object(); 39 40 private static object logLocker = new object(); 40 41 private CancellationToken cancellationToken; 41 42 private bool firstRun = true; 43 42 44 [Storable] 43 45 private IOperator currentOperator; … … 136 138 OperationCollection coll; 137 139 IAtomicOperation operation; 138 TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException); 139 140 this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.IsLocal == false).ToList(); 141 this.AlreadyUploadedPlugins = new List<Plugin>(); 142 143 while (ExecutionStack.Count > 0) { 140 141 if (firstRun) { 142 TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException); 143 this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.IsLocal == false).ToList(); 144 this.AlreadyUploadedPlugins = new List<Plugin>(); 145 firstRun = false; 146 } 147 148 while (executionStack.Count > 0) { 144 149 cancellationToken.ThrowIfCancellationRequested(); 145 150 146 next = ExecutionStack.Pop(); 151 next = executionStack.Pop(); 152 bool isOpCollection = next is OperationCollection; 153 int collCount = isOpCollection ? ((OperationCollection)next).Count : 0; 154 string opName = !isOpCollection ? ((IAtomicOperation)next).Operator.Name : "OpCollection"; 155 147 156 if (next is OperationCollection) { 148 157 coll = (OperationCollection)next; 149 if (coll.Parallel) { 158 159 bool isPMOEvaluator = coll.Count > 0 && coll.First() is HeuristicLab.Core.ExecutionContext && ((HeuristicLab.Core.ExecutionContext)coll.First()).Operator.GetType().Name == "PMOEvaluator"; 160 bool isAlgorithmEvaluator = coll.Count > 0 && coll.First() is HeuristicLab.Core.ExecutionContext && ((HeuristicLab.Core.ExecutionContext)coll.First()).Operator.GetType().Name == "AlgorithmEvaluator"; 161 162 if (coll.Parallel && isPMOEvaluator) { 163 Task[] tasks = new Task[coll.Count]; 164 Stack<IOperation>[] stacks = new Stack<IOperation>[coll.Count]; 165 for (int i = 0; i < coll.Count; i++) { 166 stacks[i] = new Stack<IOperation>(); 167 stacks[i].Push(coll[i]); 168 tasks[i] = Task.Factory.StartNew(Run, stacks[i], cancellationToken); 169 } 170 try { 171 Task.WaitAll(tasks); 172 } 173 catch (AggregateException ex) { 174 OperationCollection remaining = new OperationCollection() { Parallel = true }; 175 for (int i = 0; i < stacks.Length; i++) { 176 if (stacks[i].Count == 1) 177 remaining.Add(stacks[i].Pop()); 178 if (stacks[i].Count > 1) { 179 OperationCollection ops = new OperationCollection(); 180 while (stacks[i].Count > 0) 181 ops.Add(stacks[i].Pop()); 182 remaining.Add(ops); 183 } 184 } 185 if (remaining.Count > 0) executionStack.Push(remaining); 186 throw ex; 187 } 188 } else if (coll.Parallel) { 150 189 // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector 151 190 IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone(); … … 158 197 } 159 198 160 IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken); 161 //IScope[] scopes = ExecuteLocally(jobs, parentScopeClone, cancellationToken); 199 var experiment = CreateHiveExperiment(); 200 IScope[] scopes = ExecuteOnHive(experiment, jobs, parentScopeClone, cancellationToken); 201 DisposeHiveExperiment(experiment); 162 202 163 203 for (int i = 0; i < coll.Count; i++) { … … 178 218 } 179 219 catch (Exception ex) { 180 ExecutionStack.Push(operation);220 executionStack.Push(operation); 181 221 if (ex is OperationCanceledException) throw ex; 182 222 else throw new OperatorExecutionException(operation.Operator, ex); 183 223 } 184 if (next != null) ExecutionStack.Push(next);224 if (next != null) executionStack.Push(next); 185 225 186 226 if (operation.Operator.Breakpoint) { … … 227 267 /// </summary> 228 268 /// <param name="jobs"></param> 229 private IScope[] ExecuteOnHive( EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {269 private IScope[] ExecuteOnHive(RefreshableHiveExperiment refreshableHiveExperiment, EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) { 230 270 LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length)); 231 271 IScope[] scopes = new Scope[jobs.Length]; 232 272 object locker = new object(); 233 273 IDictionary<Guid, int> jobIndices = new Dictionary<Guid, int>(); 234 var hiveExperiment = new HiveExperiment();274 var hiveExperiment = refreshableHiveExperiment.HiveExperiment; 235 275 236 276 try { 237 277 List<Guid> remainingJobIds = new List<Guid>(); 238 278 239 // create hive experiment 279 // create upload-tasks 280 var uploadTasks = new List<Task<Job>>(); 281 for (int i = 0; i < jobs.Length; i++) { 282 hiveExperiment.HiveJobs.Add(new EngineHiveJob(jobs[i], parentScopeClone)); 283 284 // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable) 285 IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext); 286 if (random != null) 287 random.Reset(random.Next()); 288 } 289 ExperimentManagerClient.StartExperiment((e) => { 290 LogException(e); 291 }, refreshableHiveExperiment); 292 293 // do polling until experiment is finished and all jobs are downloaded 294 while (!refreshableHiveExperiment.AllJobsFinished()) { 295 Thread.Sleep(500); 296 this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(hiveExperiments.Sum(x => x.HiveExperiment.ExecutionTime.TotalMilliseconds)); 297 cancellationToken.ThrowIfCancellationRequested(); 298 } 299 LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableHiveExperiment.ToString(), refreshableHiveExperiment.HiveExperiment.ExecutionTime)); 300 301 // get scopes 302 int j = 0; 303 foreach (var hiveJob in hiveExperiment.HiveJobs) { 304 if (hiveJob.Job.State != JobState.Finished) 305 throw new HiveEngineException("Job failed: " + hiveJob.Job.StateLog.Last().Exception); 306 307 var scope = ((IAtomicOperation)((EngineJob)hiveJob.ItemJob).InitialOperation).Scope; 308 scopes[j++] = scope; 309 } 310 return scopes; 311 } 312 catch (OperationCanceledException e) { 313 lock (locker) { 314 if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id); 315 } 316 throw e; 317 } 318 catch (Exception e) { 319 lock (locker) { 320 if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id); 321 } 322 LogException(e); 323 throw e; 324 } 325 } 326 327 private RefreshableHiveExperiment CreateHiveExperiment() { 328 lock (locker) { 329 var hiveExperiment = new HiveExperiment(); 240 330 hiveExperiment.Name = "HiveEngine Run " + hiveExperiments.Count; 241 331 hiveExperiment.DateCreated = DateTime.Now; … … 245 335 refreshableHiveExperiment.IsControllable = false; 246 336 hiveExperiments.Add(refreshableHiveExperiment); 247 248 // create upload-tasks 249 var uploadTasks = new List<Task<Job>>(); 250 for (int i = 0; i < jobs.Length; i++) { 251 hiveExperiment.HiveJobs.Add(new EngineHiveJob(jobs[i], parentScopeClone)); 252 253 // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable) 254 IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext); 255 if (random != null) 256 random.Reset(random.Next()); 257 } 258 ExperimentManagerClient.StartExperiment((e) => { 259 LogException(e); 260 }, refreshableHiveExperiment); 261 262 // do polling until experiment is finished and all jobs are downloaded 263 while (!refreshableHiveExperiment.AllJobsFinished()) { 264 Thread.Sleep(500); 265 this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(hiveExperiments.Sum(x => x.HiveExperiment.ExecutionTime.TotalMilliseconds)); 266 cancellationToken.ThrowIfCancellationRequested(); 267 } 268 LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableHiveExperiment.ToString(), refreshableHiveExperiment.HiveExperiment.ExecutionTime)); 269 270 // get scopes 271 int j = 0; 272 foreach (var hiveJob in hiveExperiment.HiveJobs) { 273 var scope = ((IAtomicOperation) ((EngineJob)hiveJob.ItemJob).InitialOperation).Scope; 274 scopes[j++] = scope; 275 } 276 refreshableHiveExperiment.RefreshAutomatically = false; 277 DeleteHiveExperiment(hiveExperiment.Id); 278 ClearData(refreshableHiveExperiment); 279 return scopes; 280 } 281 catch (OperationCanceledException e) { 282 lock (locker) { 283 if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id); 284 } 285 throw e; 286 } 287 catch (Exception e) { 288 lock (locker) { 289 if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id); 290 } 291 LogException(e); 292 throw e; 293 } 337 return refreshableHiveExperiment; 338 } 339 } 340 341 private void DisposeHiveExperiment(RefreshableHiveExperiment refreshableHiveExperiment) { 342 refreshableHiveExperiment.RefreshAutomatically = false; 343 DeleteHiveExperiment(refreshableHiveExperiment.HiveExperiment.Id); 344 ClearData(refreshableHiveExperiment); 294 345 } 295 346 … … 306 357 }, 5, string.Format("Could not delete jobs")); 307 358 } 308 359 309 360 private List<Guid> GetResourceIds() { 310 361 return ServiceLocator.Instance.CallHiveService(service => {
Note: See TracChangeset
for help on using the changeset viewer.