Changeset 2055 for trunk/sources/HeuristicLab.DistributedEngine
- Timestamp:
- 06/17/09 18:07:15 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified trunk/sources/HeuristicLab.DistributedEngine/3.2/DistributedEngine.cs ¶
r1529 r2055 42 42 get { return serverAddress; } 43 43 set { 44 if (value != serverAddress) {44 if (value != serverAddress) { 45 45 serverAddress = value; 46 46 } … … 72 72 73 73 public override void Execute() { 74 if (jobManager == null) this.jobManager = new JobManager(serverAddress);74 if (jobManager == null) this.jobManager = new JobManager(serverAddress); 75 75 jobManager.Reset(); 76 76 base.Execute(); … … 82 82 83 83 protected override void ProcessNextOperation() { 84 if (suspendedEngines.Count > 0) {84 if (suspendedEngines.Count > 0) { 85 85 ProcessSuspendedEngines(); 86 86 } else { … … 91 91 92 92 private void ProcessSuspendedEngines() { 93 WaitHandle[] waitHandles = new WaitHandle[suspendedEngines.Count];93 AsyncGridResult[] asyncResults = new AsyncGridResult[suspendedEngines.Count]; 94 94 int i = 0; 95 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {96 waitHandles[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key);97 } 98 WaitForAll( waitHandles);95 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) { 96 asyncResults[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key); 97 } 98 WaitForAll(asyncResults); 99 99 // collect results 100 100 List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>(); 101 101 try { 102 foreach(KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) { 102 int resultIndex = 0; 103 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) { 103 104 KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>( 104 jobManager.EndExecuteOperation(suspendedPair.Value),105 (ProcessingEngine)jobManager.EndExecuteEngine(asyncResults[resultIndex++]), 105 106 suspendedPair.Value); 106 107 results.Add(p); 107 108 } 108 } catch(Exception e) { 109 } 110 catch (Exception e) { 109 111 // this exception means there was a problem with the underlying communication infrastructure 110 112 // -> show message dialog and abort engine … … 118 120 119 121 private void ProcessOperation(IOperation operation) { 120 if (operation is AtomicOperation) {122 if (operation is AtomicOperation) { 121 123 AtomicOperation atomicOperation = (AtomicOperation)operation; 122 124 IOperation next = null; 123 125 try { 124 126 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 125 } catch(Exception ex) { 127 } 128 catch (Exception ex) { 126 129 // push operation on stack again 127 130 myExecutionStack.Push(atomicOperation); … … 129 132 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 130 133 } 131 if (next != null)134 if (next != null) 132 135 myExecutionStack.Push(next); 133 136 OnOperationExecuted(atomicOperation); 134 if (atomicOperation.Operator.Breakpoint) Abort();135 } else if (operation is CompositeOperation) {137 if (atomicOperation.Operator.Breakpoint) Abort(); 138 } else if (operation is CompositeOperation) { 136 139 CompositeOperation compositeOperation = (CompositeOperation)operation; 137 if (compositeOperation.ExecuteInParallel) {140 if (compositeOperation.ExecuteInParallel) { 138 141 ProcessParallelOperation(compositeOperation); 139 142 OnOperationExecuted(compositeOperation); 140 143 } else { 141 for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--)144 for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--) 142 145 myExecutionStack.Push(compositeOperation.Operations[i]); 143 146 } … … 147 150 private void ProcessParallelOperation(CompositeOperation compositeOperation) { 148 151 // send operations to grid 149 WaitHandle[] waithandles = BeginExecuteOperations(compositeOperation);150 WaitForAll( waithandles);152 AsyncGridResult[] asyncResults = BeginExecuteOperations(compositeOperation); 153 WaitForAll(asyncResults); 151 154 // collect results 152 155 List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>(); 153 156 try { 154 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 155 KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>( 156 jobManager.EndExecuteOperation(parOperation), parOperation); 157 results.Add(p); 158 } 159 } catch(Exception e) { 157 int i = 0; 158 foreach (AtomicOperation parOperation in compositeOperation.Operations) { 159 results.Add(new KeyValuePair<ProcessingEngine, AtomicOperation>( 160 (ProcessingEngine)jobManager.EndExecuteEngine(asyncResults[i++]), parOperation)); 161 } 162 } 163 catch (Exception e) { 160 164 // this exception means there was a problem with the underlying communication infrastructure 161 165 // -> show message dialog, abort engine, requeue the whole composite operation again and return … … 169 173 } 170 174 171 private WaitHandle[] BeginExecuteOperations(CompositeOperation compositeOperation) {172 WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];175 private AsyncGridResult[] BeginExecuteOperations(CompositeOperation compositeOperation) { 176 AsyncGridResult[] asyncResults = new AsyncGridResult[compositeOperation.Operations.Count]; 173 177 int i = 0; 174 178 // HACK: assume that all atomicOperations have the same parent scope. … … 182 186 PruneToParentScope(GlobalScope, parentScope, prunedScopes); 183 187 List<IScope> subScopes = new List<IScope>(parentScope.SubScopes); 184 foreach (IScope scope in subScopes) {188 foreach (IScope scope in subScopes) { 185 189 parentScope.RemoveSubScope(scope); 186 190 } 187 191 // start all parallel jobs 188 foreach (AtomicOperation parOperation in compositeOperation.Operations) {192 foreach (AtomicOperation parOperation in compositeOperation.Operations) { 189 193 parentScope.AddSubScope(parOperation.Scope); 190 waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation);194 asyncResults[i++] = jobManager.BeginExecuteEngine(new ProcessingEngine(GlobalScope, parOperation)); 191 195 parentScope.RemoveSubScope(parOperation.Scope); 192 196 } 193 foreach (IScope scope in subScopes) {197 foreach (IScope scope in subScopes) { 194 198 parentScope.AddSubScope(scope); 195 199 } … … 197 201 RestoreFullTree(GlobalScope, prunedScopes); 198 202 199 return waithandles;200 } 201 202 private void WaitForAll( WaitHandle[] waithandles) {203 return asyncResults; 204 } 205 206 private void WaitForAll(AsyncGridResult[] asyncResults) { 203 207 // wait until all jobs are finished 204 208 // WaitAll works only with maximally 64 waithandles 205 if(waithandles.Length <= 64) { 206 WaitHandle.WaitAll(waithandles); 209 if (asyncResults.Length <= 64) { 210 WaitHandle[] waitHandles = new WaitHandle[asyncResults.Length]; 211 for (int i = 0; i < asyncResults.Length; i++) { 212 waitHandles[i] = asyncResults[i].WaitHandle; 213 } 214 WaitHandle.WaitAll(waitHandles); 207 215 } else { 208 216 int i; 209 for(i = 0; i < waithandles.Length; i++) { 210 waithandles[i].WaitOne(); 211 waithandles[i].Close(); 217 for (i = 0; i < asyncResults.Length; i++) { 218 asyncResults[i].WaitHandle.WaitOne(); 212 219 } 213 220 } … … 221 228 suspendedEngines.Clear(); 222 229 // retrieve results and merge into scope-tree 223 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in results) {230 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in results) { 224 231 ProcessingEngine resultEngine = p.Key; 225 232 AtomicOperation parOperation = p.Value; 226 if (resultEngine.Canceled && !resultEngine.Suspended) {233 if (resultEngine.Canceled && !resultEngine.Suspended) { 227 234 // when an engine was canceled but not suspended this means there was a problem 228 235 // show error message and queue the operation for restart (again parallel) … … 230 237 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(new JobExecutionException(resultEngine.ErrorMessage)); }); 231 238 canceledOperations.AddOperation(parOperation); 232 } else if (resultEngine.Suspended) {239 } else if (resultEngine.Suspended) { 233 240 // when the engine was suspended it means it was stopped because of a breakpoint 234 241 // -> merge the partial results and queue the engine (which has operations remaining in the execution stack) to be resumed (parallel) … … 243 250 } 244 251 // if there were exceptions -> abort 245 if (canceledOperations.Operations.Count > 0) {252 if (canceledOperations.Operations.Count > 0) { 246 253 // requeue the aborted operations 247 254 myExecutionStack.Push(canceledOperations); … … 249 256 } 250 257 // if there were breakpoints -> abort 251 if (suspendedEngines.Count > 0) {258 if (suspendedEngines.Count > 0) { 252 259 Abort(); 253 260 } … … 255 262 256 263 private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) { 257 if (savedScopes.Count == 0) return;264 if (savedScopes.Count == 0) return; 258 265 IScope remainingBranch = currentScope.SubScopes[0]; 259 266 currentScope.RemoveSubScope(remainingBranch); 260 267 IList<IScope> savedScopesForCurrent = savedScopes[0]; 261 foreach (IScope savedScope in savedScopesForCurrent) {268 foreach (IScope savedScope in savedScopesForCurrent) { 262 269 currentScope.AddSubScope(savedScope); 263 270 } … … 267 274 268 275 private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) { 269 if (currentScope == scope) return currentScope;270 if (currentScope.SubScopes.Count == 0) return null;276 if (currentScope == scope) return currentScope; 277 if (currentScope.SubScopes.Count == 0) return null; 271 278 IScope foundScope = null; 272 279 // try to find the searched scope in all my sub-scopes 273 foreach (IScope subScope in currentScope.SubScopes) {280 foreach (IScope subScope in currentScope.SubScopes) { 274 281 foundScope = PruneToParentScope(subScope, scope, prunedScopes); 275 if (foundScope != null) break; // we can stop as soon as we find the scope in a branch276 } 277 if (foundScope != null) { // when we found the scopes in my sub-scopes282 if (foundScope != null) break; // we can stop as soon as we find the scope in a branch 283 } 284 if (foundScope != null) { // when we found the scopes in my sub-scopes 278 285 List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes 279 286 prunedScopes.Add(subScopes); 280 287 // remove all my sub-scopes 281 foreach (IScope subScope in subScopes) {288 foreach (IScope subScope in subScopes) { 282 289 currentScope.RemoveSubScope(subScope); 283 290 } … … 291 298 292 299 private IScope FindParentScope(IScope currentScope, IScope childScope) { 293 if (currentScope.SubScopes.Contains(childScope)) return currentScope;294 foreach (IScope subScope in currentScope.SubScopes) {300 if (currentScope.SubScopes.Contains(childScope)) return currentScope; 301 foreach (IScope subScope in currentScope.SubScopes) { 295 302 IScope result = FindParentScope(subScope, childScope); 296 if (result != null) return result;303 if (result != null) return result; 297 304 } 298 305 return null; … … 302 309 // merge the results 303 310 original.Clear(); 304 foreach (IVariable variable in result.Variables) {311 foreach (IVariable variable in result.Variables) { 305 312 original.AddVariable(variable); 306 313 } 307 foreach (IScope subScope in result.SubScopes) {314 foreach (IScope subScope in result.SubScopes) { 308 315 original.AddSubScope(subScope); 309 316 } 310 foreach (KeyValuePair<string, string> alias in result.Aliases) {317 foreach (KeyValuePair<string, string> alias in result.Aliases) { 311 318 original.AddAlias(alias.Key, alias.Value); 312 319 } … … 319 326 addressAttribute.Value = ServerAddress; 320 327 node.Attributes.Append(addressAttribute); 321 if (suspendedEngines.Count > 0) {328 if (suspendedEngines.Count > 0) { 322 329 XmlNode suspendedEnginesNode = document.CreateElement("SuspendedEngines"); 323 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) {330 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) { 324 331 XmlNode n = document.CreateElement("Entry"); 325 332 n.AppendChild(PersistenceManager.Persist(p.Key, document, persistedObjects)); … … 335 342 ServerAddress = node.Attributes["ServerAddress"].Value; 336 343 XmlNode suspendedEnginesNode = node.SelectSingleNode("SuspendedEngines"); 337 if (suspendedEnginesNode != null) {338 foreach (XmlNode n in suspendedEnginesNode.ChildNodes) {344 if (suspendedEnginesNode != null) { 345 foreach (XmlNode n in suspendedEnginesNode.ChildNodes) { 339 346 KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>( 340 347 (ProcessingEngine)PersistenceManager.Restore(n.ChildNodes[0], restoredObjects),
Note: See TracChangeset
for help on using the changeset viewer.