Changeset 5213 for branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs
- Timestamp:
- 01/05/11 02:05:05 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs
r5153 r5213 14 14 using System.Threading; 15 15 using HeuristicLab.Random; 16 using System.Threading.Tasks; 16 17 17 18 namespace HeuristicLab.HiveEngine { … … 22 23 [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")] 23 24 public class HiveEngine : Engine { 25 private CancellationToken cancellationToken; 26 24 27 [Storable] 25 28 private IOperator currentOperator; … … 44 47 #endregion 45 48 46 protected override void ProcessNextOperation() {47 currentOperator = null;48 IOperation next = ExecutionStack.Pop();49 OperationCollection coll = next as OperationCollection;49 protected override void Run(CancellationToken cancellationToken) { 50 this.cancellationToken = cancellationToken; 51 Run(ExecutionStack); 52 } 50 53 51 while (coll != null) { 52 if (coll.Parallel) { 53 IDictionary<IOperation, OperationJob> jobs = new Dictionary<IOperation, OperationJob>(); 54 foreach (IOperation op in coll) { 55 jobs.Add(op, new OperationJob(op)); 54 private void Run(object state) { 55 Stack<IOperation> executionStack = (Stack<IOperation>)state; 56 IOperation next; 57 OperationCollection coll; 58 IAtomicOperation operation; 59 60 while (ExecutionStack.Count > 0) { 61 cancellationToken.ThrowIfCancellationRequested(); 62 63 next = ExecutionStack.Pop(); 64 if (next is OperationCollection) { 65 coll = (OperationCollection)next; 66 if (coll.Parallel) { 67 IDictionary<IOperation, OperationJob> jobs = new Dictionary<IOperation, OperationJob>(); 68 foreach (IOperation op in coll) { 69 jobs.Add(op, new OperationJob(op)); 70 } 71 72 ExecuteOnHive(jobs); 73 74 foreach (var kvp in jobs) { 75 if (kvp.Key is IAtomicOperation) { 76 ReIntegrateScope(kvp.Value.Operation as IAtomicOperation, kvp.Key as IAtomicOperation); 77 } else if (kvp.Key is OperationCollection) { 78 OperationCollection ocoll = (OperationCollection)kvp.Key; 79 for (int i = ocoll.Count - 1; i >= 0; i--) 80 if (ocoll[i] != null) executionStack.Push(ocoll[i]); 81 } 82 } 83 } else { 84 for (int i = coll.Count - 1; i >= 0; i--) 85 if (coll[i] != null) executionStack.Push(coll[i]); 56 86 } 87 } else if (next is IAtomicOperation) { 88 operation = (IAtomicOperation)next; 89 try { 90 next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken); 91 } 92 catch (Exception ex) { 93 ExecutionStack.Push(operation); 94 if (ex is OperationCanceledException) throw ex; 95 else throw new OperatorExecutionException(operation.Operator, ex); 96 } 97 if (next != null) ExecutionStack.Push(next); 57 98 58 ExecuteOnHive(jobs); 59 60 foreach (var kvp in jobs) { 61 if (kvp.Key is IAtomicOperation) { 62 ReIntegrateScope(kvp.Value.Operation as IAtomicOperation, kvp.Key as IAtomicOperation); 63 } else if (kvp.Key is OperationCollection) { 64 // todo 65 } 99 if (operation.Operator.Breakpoint) { 100 Log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName)); 101 Pause(); 66 102 } 67 } else {68 for (int i = coll.Count - 1; i >= 0; i--)69 ExecutionStack.Push(coll[i]);70 }71 next = ExecutionStack.Count > 0 ? ExecutionStack.Pop() : null;72 coll = next as OperationCollection;73 }74 75 IAtomicOperation operation = next as IAtomicOperation;76 if (operation != null) {77 try {78 currentOperator = operation.Operator;79 ExecutionStack.Push(operation.Operator.Execute((IExecutionContext)operation));80 }81 catch (Exception ex) {82 ExecutionStack.Push(operation);83 OnExceptionOccurred(new OperatorExecutionException(operation.Operator, ex));84 Pause();85 }86 if (operation.Operator.Breakpoint) {87 Log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));88 Pause();89 103 } 90 104 } … … 123 137 private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict) { 124 138 Log.LogMessage(string.Format("Executing {0} operations on the hive.", jobDict.Count)); 125 IDictionary<Guid, IOperation> jobIds = new Dictionary<Guid, IOperation>(); 126 JobResultList results; 139 try { 140 IDictionary<Guid, IOperation> jobIds = new Dictionary<Guid, IOperation>(); 141 JobResultList results; 127 142 128 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {129 List<JobDto> jobs = new List<JobDto>();130 foreach (var kvp in jobDict) {131 // shuffle random variable to avoid the same random sequence in each operation132 IRandom random = FindRandomParameter(kvp.Key as IExecutionContext);133 if (random != null)134 random.Reset(random.Next());143 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 144 List<JobDto> jobs = new List<JobDto>(); 145 foreach (var kvp in jobDict) { 146 // shuffle random variable to avoid the same random sequence in each operation 147 IRandom random = FindRandomParameter(kvp.Key as IExecutionContext); 148 if (random != null) 149 random.Reset(random.Next()); 135 150 136 var groups = ResourceIds.Split(';'); 137 SerializedJob serializedJob = new SerializedJob(); 138 serializedJob.SerializedJobData = SerializedJob.Serialize(kvp.Value); 139 serializedJob.JobInfo = new JobDto(); 140 serializedJob.JobInfo.State = JobState.Offline; 141 serializedJob.JobInfo.CoresNeeded = 1; 142 serializedJob.JobInfo.PluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList(); 143 ResponseObject<JobDto> response = service.Obj.AddJobWithGroupStrings(serializedJob, groups); 144 jobs.Add(response.Obj); 145 jobIds.Add(response.Obj.Id, kvp.Key); 146 } 147 results = service.Obj.GetJobResults(jobIds.Keys).Obj; 148 } 149 150 while (!results.All( 151 x => x.State == JobState.Finished || 152 x.State == JobState.Failed || 153 x.State == JobState.Aborted)) { 154 Thread.Sleep(5000); 155 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 151 var groups = ResourceIds.Split(';'); 152 SerializedJob serializedJob = new SerializedJob(); 153 serializedJob.SerializedJobData = SerializedJob.Serialize(kvp.Value); 154 serializedJob.JobInfo = new JobDto(); 155 serializedJob.JobInfo.State = JobState.Offline; 156 serializedJob.JobInfo.CoresNeeded = 1; 157 serializedJob.JobInfo.PluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList(); 158 ResponseObject<JobDto> response = service.Obj.AddJobWithGroupStrings(serializedJob, groups); 159 jobs.Add(response.Obj); 160 jobIds.Add(response.Obj.Id, kvp.Key); 161 } 156 162 results = service.Obj.GetJobResults(jobIds.Keys).Obj; 157 163 } 164 165 while (!results.All( 166 x => x.State == JobState.Finished || 167 x.State == JobState.Failed || 168 x.State == JobState.Aborted)) { 169 Thread.Sleep(5000); 170 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 171 results = service.Obj.GetJobResults(jobIds.Keys).Obj; 172 } 173 } 174 175 // all finished 176 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 177 foreach (Guid jobId in jobIds.Keys) { 178 SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj; 179 OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData); 180 jobDict[jobIds[jobId]] = operationJob; 181 } 182 } 183 184 // delete jobs 185 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 186 foreach (Guid jobId in jobIds.Keys) { 187 service.Obj.DeleteJob(jobId); 188 } 189 } 190 191 Log.LogMessage(string.Format("Operations on the hive finished.", jobDict.Count)); 158 192 } 159 160 // all finished 161 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 162 foreach (Guid jobId in jobIds.Keys) { 163 SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj; 164 OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData); 165 jobDict[jobIds[jobId]] = operationJob; 166 } 193 catch (Exception e) { 194 Log.LogException(e); 195 throw e; 167 196 } 168 169 // delete jobs170 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {171 foreach (Guid jobId in jobIds.Keys) {172 service.Obj.DeleteJob(jobId);173 }174 }175 176 Log.LogMessage(string.Format("Operations on the hive finished.", jobDict.Count));177 197 } 178 179 public override void Pause() {180 base.Pause();181 if (currentOperator != null) currentOperator.Abort();182 }183 public override void Stop() {184 base.Stop();185 if (currentOperator != null) currentOperator.Abort();186 }187 188 198 } 189 199 }
Note: See TracChangeset
for help on using the changeset viewer.