Changeset 414 for trunk/sources/HeuristicLab.DistributedEngine
- Timestamp:
- 07/31/08 13:08:41 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs
r413 r414 36 36 namespace HeuristicLab.DistributedEngine { 37 37 public class DistributedEngine : EngineBase, IEditable { 38 private List<KeyValuePair<ProcessingEngine, AtomicOperation>> suspendedEngines = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>(); 38 39 private JobManager jobManager; 39 40 private string serverAddress; … … 59 60 } 60 61 62 public override bool Terminated { 63 get { 64 return base.Terminated && suspendedEngines.Count == 0; 65 } 66 } 67 68 public override void Reset() { 69 suspendedEngines.Clear(); 70 base.Reset(); 71 } 72 61 73 public override void Execute() { 62 74 if(jobManager == null) this.jobManager = new JobManager(serverAddress); … … 70 82 71 83 protected override void ProcessNextOperation() { 72 IOperation operation = myExecutionStack.Pop(); 84 if(suspendedEngines.Count > 0) { 85 ProcessSuspendedEngines(); 86 } else { 87 IOperation operation = myExecutionStack.Pop(); 88 ProcessOperation(operation); 89 } 90 } 91 92 private void ProcessSuspendedEngines() { 93 WaitHandle[] waitHandles = new WaitHandle[suspendedEngines.Count]; 94 int i = 0; 95 foreach(KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) { 96 waitHandles[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key); 97 } 98 WaitForAll(waitHandles); 99 // collect results 100 List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>(); 101 try { 102 foreach(KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) { 103 KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>( 104 jobManager.EndExecuteOperation(suspendedPair.Value), 105 suspendedPair.Value); 106 results.Add(p); 107 } 108 } catch(Exception e) { 109 // this exception means there was a problem with the underlying communication infrastructure 110 // -> show message dialog and abort engine 111 Abort(); 112 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); }); 113 return; 114 } 115 // got all result engines without an exception -> merge results 116 ProcessResults(results); 117 } 118 119 private void ProcessOperation(IOperation operation) { 73 120 if(operation is AtomicOperation) { 74 121 AtomicOperation atomicOperation = (AtomicOperation)operation; … … 89 136 CompositeOperation compositeOperation = (CompositeOperation)operation; 90 137 if(compositeOperation.ExecuteInParallel) { 91 try { 92 WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count]; 93 int i = 0; 94 // HACK: assume that all atomicOperations have the same parent scope. 95 // 1) find that parent scope 96 // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation 97 // 3) keep the branches to 'repair' the scope-tree later 98 // 4) for each parallel job attach only the sub-scope that this operation uses 99 // 5) after starting all parallel jobs restore the whole scope-tree 100 IScope parentScope = FindParentScope(GlobalScope, ((AtomicOperation)compositeOperation.Operations[0]).Scope); 101 List<IList<IScope>> prunedScopes = new List<IList<IScope>>(); 102 PruneToParentScope(GlobalScope, parentScope, prunedScopes); 103 List<IScope> subScopes = new List<IScope>(parentScope.SubScopes); 104 foreach(IScope scope in subScopes) { 105 parentScope.RemoveSubScope(scope); 106 } 107 // start all parallel jobs 108 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 109 parentScope.AddSubScope(parOperation.Scope); 110 waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation); 111 parentScope.RemoveSubScope(parOperation.Scope); 112 } 113 foreach(IScope scope in subScopes) { 114 parentScope.AddSubScope(scope); 115 } 116 prunedScopes.Reverse(); 117 RestoreFullTree(GlobalScope, prunedScopes); 118 119 // wait until all jobs are finished 120 // WaitAll works only with maximally 64 waithandles 121 if(waithandles.Length <= 64) { 122 WaitHandle.WaitAll(waithandles); 123 } else { 124 for(i = 0; i < waithandles.Length; i++) { 125 waithandles[i].WaitOne(); 126 waithandles[i].Close(); 127 } 128 } 129 130 CompositeOperation canceledOperations = new CompositeOperation(); 131 canceledOperations.ExecuteInParallel = true; 132 // retrieve results and merge into scope-tree 133 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 134 ProcessingEngine resultEngine = jobManager.EndExecuteOperation(parOperation); 135 if(resultEngine.Canceled) { 136 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(new JobExecutionException(resultEngine.ErrorMessage)); }); 137 canceledOperations.AddOperation(parOperation); 138 } else { 139 // if everything went fine we can merge the results into our local scope-tree 140 MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope); 141 } 142 } 143 144 if(canceledOperations.Operations.Count > 0) { 145 myExecutionStack.Push(canceledOperations); 146 Abort(); 147 } 148 } catch(Exception e) { 149 myExecutionStack.Push(compositeOperation); 150 Abort(); 151 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); }); 152 } 138 ProcessParallelOperation(compositeOperation); 153 139 OnOperationExecuted(compositeOperation); 154 140 } else { … … 156 142 myExecutionStack.Push(compositeOperation.Operations[i]); 157 143 } 144 } 145 } 146 147 private void ProcessParallelOperation(CompositeOperation compositeOperation) { 148 // send operations to grid 149 WaitHandle[] waithandles = BeginExecuteOperations(compositeOperation); 150 WaitForAll(waithandles); 151 // collect results 152 List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>(); 153 try { 154 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 155 KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>( 156 jobManager.EndExecuteOperation(parOperation), parOperation); 157 results.Add(p); 158 } 159 } catch(Exception e) { 160 // this exception means there was a problem with the underlying communication infrastructure 161 // -> show message dialog, abort engine, requeue the whole composite operation again and return 162 myExecutionStack.Push(compositeOperation); 163 Abort(); 164 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); }); 165 return; 166 } 167 // got all result engines without an exception -> merge results 168 ProcessResults(results); 169 } 170 171 private WaitHandle[] BeginExecuteOperations(CompositeOperation compositeOperation) { 172 WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count]; 173 int i = 0; 174 // HACK: assume that all atomicOperations have the same parent scope. 175 // 1) find that parent scope 176 // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation 177 // 3) keep the branches to 'repair' the scope-tree later 178 // 4) for each parallel job attach only the sub-scope that this operation uses 179 // 5) after starting all parallel jobs restore the whole scope-tree 180 IScope parentScope = FindParentScope(GlobalScope, ((AtomicOperation)compositeOperation.Operations[0]).Scope); 181 List<IList<IScope>> prunedScopes = new List<IList<IScope>>(); 182 PruneToParentScope(GlobalScope, parentScope, prunedScopes); 183 List<IScope> subScopes = new List<IScope>(parentScope.SubScopes); 184 foreach(IScope scope in subScopes) { 185 parentScope.RemoveSubScope(scope); 186 } 187 // start all parallel jobs 188 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 189 parentScope.AddSubScope(parOperation.Scope); 190 waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation); 191 parentScope.RemoveSubScope(parOperation.Scope); 192 } 193 foreach(IScope scope in subScopes) { 194 parentScope.AddSubScope(scope); 195 } 196 prunedScopes.Reverse(); 197 RestoreFullTree(GlobalScope, prunedScopes); 198 199 return waithandles; 200 } 201 202 private void WaitForAll(WaitHandle[] waithandles) { 203 // wait until all jobs are finished 204 // WaitAll works only with maximally 64 waithandles 205 if(waithandles.Length <= 64) { 206 WaitHandle.WaitAll(waithandles); 207 } else { 208 int i; 209 for(i = 0; i < waithandles.Length; i++) { 210 waithandles[i].WaitOne(); 211 waithandles[i].Close(); 212 } 213 } 214 } 215 216 private void ProcessResults(List<KeyValuePair<ProcessingEngine, AtomicOperation>> results) { 217 // create a new compositeOperation to hold canceled operations that should be restarted 218 CompositeOperation canceledOperations = new CompositeOperation(); 219 canceledOperations.ExecuteInParallel = true; 220 221 suspendedEngines.Clear(); 222 // retrieve results and merge into scope-tree 223 foreach(KeyValuePair<ProcessingEngine, AtomicOperation> p in results) { 224 ProcessingEngine resultEngine = p.Key; 225 AtomicOperation parOperation = p.Value; 226 if(resultEngine.Canceled && !resultEngine.Suspended) { 227 // when an engine was canceled but not suspended this means there was a problem 228 // show error message and queue the operation for restart (again parallel) 229 // but don't merge the results of the aborted engine 230 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(new JobExecutionException(resultEngine.ErrorMessage)); }); 231 canceledOperations.AddOperation(parOperation); 232 } else if(resultEngine.Suspended) { 233 // when the engine was suspended it means it was stopped because of a breakpoint 234 // -> merge the partial results and queue the engine (which has operations remaining in the execution stack) to be resumed (parallel) 235 MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope); 236 resultEngine.InitialOperation = parOperation; 237 suspendedEngines.Add(new KeyValuePair<ProcessingEngine, AtomicOperation>(resultEngine, parOperation)); 238 } else { 239 // engine is finished -> 240 // simply merge the results into our local scope-tree 241 MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope); 242 } 243 } 244 // if there were exceptions -> abort 245 if(canceledOperations.Operations.Count > 0) { 246 // requeue the aborted operations 247 myExecutionStack.Push(canceledOperations); 248 Abort(); 249 } 250 // if there were breakpoints -> abort 251 if(suspendedEngines.Count > 0) { 252 Abort(); 158 253 } 159 254 } … … 223 318 XmlAttribute addressAttribute = document.CreateAttribute("ServerAddress"); 224 319 addressAttribute.Value = ServerAddress; 225 node.Attributes.Append(addressAttribute); 320 if(suspendedEngines.Count > 0) { 321 node.Attributes.Append(addressAttribute); 322 XmlNode suspendedEnginesNode = document.CreateElement("SuspendedEngines"); 323 foreach(KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) { 324 XmlNode n = document.CreateElement("Entry"); 325 n.AppendChild(PersistenceManager.Persist(p.Key, document, persistedObjects)); 326 n.AppendChild(PersistenceManager.Persist(p.Value, document, persistedObjects)); 327 suspendedEnginesNode.AppendChild(n); 328 } 329 node.AppendChild(suspendedEnginesNode); 330 } 226 331 return node; 227 332 } … … 229 334 base.Populate(node, restoredObjects); 230 335 ServerAddress = node.Attributes["ServerAddress"].Value; 336 XmlNode suspendedEnginesNode = node.SelectSingleNode("SuspendedEngines"); 337 if(suspendedEnginesNode != null) { 338 foreach(XmlNode n in suspendedEnginesNode.ChildNodes) { 339 KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>( 340 (ProcessingEngine)PersistenceManager.Restore(n.ChildNodes[0], restoredObjects), 341 (AtomicOperation)PersistenceManager.Restore(n.ChildNodes[1], restoredObjects)); 342 suspendedEngines.Add(p); 343 } 344 } 231 345 } 232 346 #endregion
Note: See TracChangeset
for help on using the changeset viewer.