Changeset 35 for trunk/sources/HeuristicLab.DistributedEngine
- Timestamp:
- 03/01/08 16:19:11 (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs
r34 r35 37 37 private List<Guid> runningEngines = new List<Guid>(); 38 38 private string serverAddress; 39 private bool cancelRequested; 39 40 public string ServerAddress { 40 41 get { return serverAddress; } … … 50 51 } 51 52 } 52 53 53 public override object Clone(IDictionary<Guid, object> clonedObjects) { 54 54 DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects); … … 81 81 82 82 public override void Abort() { 83 base.Abort(); 84 foreach(Guid engineGuid in runningEngines) { 85 server.AbortEngine(engineGuid); 83 lock(runningEngines) { 84 cancelRequested = true; 85 foreach(Guid engineGuid in runningEngines) { 86 server.AbortEngine(engineGuid); 87 } 86 88 } 89 } 90 public override void Reset() { 91 base.Reset(); 92 engineOperations.Clear(); 93 runningEngines.Clear(); 94 cancelRequested = false; 87 95 } 88 96 89 97 protected override void ProcessNextOperation() { 90 if(runningEngines.Count != 0) { 91 Guid engineGuid = runningEngines[0]; 92 byte[] resultXml = server.TryEndExecuteEngine(engineGuid,100); 93 if(resultXml != null) { 94 GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress); 95 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 96 IScope oldScope = engineOperations[engineGuid].Scope; 97 oldScope.Clear(); 98 foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) { 99 oldScope.AddVariable(variable); 98 lock(runningEngines) { 99 if(runningEngines.Count == 0 && cancelRequested) { 100 base.Abort(); 101 cancelRequested = false; 102 return; 103 } 104 if(runningEngines.Count != 0) { 105 Guid engineGuid = runningEngines[0]; 106 byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100); 107 if(resultXml != null) { 108 GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress); 109 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 110 IScope oldScope = engineOperations[engineGuid].Scope; 111 oldScope.Clear(); 112 foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) { 113 oldScope.AddVariable(variable); 114 } 115 foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) { 116 oldScope.AddSubScope(subScope); 117 } 118 OnOperationExecuted(engineOperations[engineGuid]); 119 if(resultEngine.ExecutionStack.Count != 0) { 120 foreach(IOperation op in resultEngine.ExecutionStack) { 121 myExecutionStack.Push(op); 122 } 123 } 124 runningEngines.Remove(engineGuid); 125 engineOperations.Remove(engineGuid); 100 126 } 101 foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) { 102 oldScope.AddSubScope(subScope); 127 return; 128 } 129 IOperation operation = myExecutionStack.Pop(); 130 if(operation is AtomicOperation) { 131 AtomicOperation atomicOperation = (AtomicOperation)operation; 132 IOperation next = null; 133 try { 134 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 135 } catch(Exception ex) { 136 // push operation on stack again 137 myExecutionStack.Push(atomicOperation); 138 Abort(); 139 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 103 140 } 104 runningEngines.Remove(engineGuid); 105 engineOperations.Remove(engineGuid); 106 } 107 108 if(Canceled) { 109 // write back not finished tasks 110 //CompositeOperation remaining = new CompositeOperation(); 111 //remaining.ExecuteInParallel = true; 112 //for(int i = 0; i < list.tasks.Length; i++) { 113 // if(list.tasks[i].Count > 0) { 114 // CompositeOperation task = new CompositeOperation(); 115 // while(list.tasks[i].Count > 0) 116 // task.AddOperation(list.tasks[i].Pop()); 117 // remaining.AddOperation(task); 118 // } 119 //} 120 //if(remaining.Operations.Count > 0) 121 // stack.Push(remaining); 122 } 123 return; 124 } 125 IOperation operation = myExecutionStack.Pop(); 126 if(operation is AtomicOperation) { 127 AtomicOperation atomicOperation = (AtomicOperation)operation; 128 IOperation next = null; 129 try { 130 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 131 } catch(Exception ex) { 132 // push operation on stack again 133 myExecutionStack.Push(atomicOperation); 134 Abort(); 135 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 136 } 137 if(next != null) 138 myExecutionStack.Push(next); 139 OnOperationExecuted(atomicOperation); 140 if(atomicOperation.Operator.Breakpoint) Abort(); 141 } else if(operation is CompositeOperation) { 142 CompositeOperation compositeOperation = (CompositeOperation)operation; 143 if(compositeOperation.ExecuteInParallel) { 144 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 145 ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed? 146 MemoryStream memStream = new MemoryStream(); 147 GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); 148 PersistenceManager.Save(engine, stream); 149 stream.Close(); 150 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 151 runningEngines.Add(currentEngineGuid); 152 engineOperations[currentEngineGuid] = parOperation; 141 if(next != null) 142 myExecutionStack.Push(next); 143 OnOperationExecuted(atomicOperation); 144 if(atomicOperation.Operator.Breakpoint) Abort(); 145 } else if(operation is CompositeOperation) { 146 CompositeOperation compositeOperation = (CompositeOperation)operation; 147 if(compositeOperation.ExecuteInParallel) { 148 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 149 ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed? 150 MemoryStream memStream = new MemoryStream(); 151 GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); 152 PersistenceManager.Save(engine, stream); 153 stream.Close(); 154 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 155 runningEngines.Add(currentEngineGuid); 156 engineOperations[currentEngineGuid] = parOperation; 157 } 158 } else { 159 for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--) 160 myExecutionStack.Push(compositeOperation.Operations[i]); 153 161 } 154 } else {155 for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)156 myExecutionStack.Push(compositeOperation.Operations[i]);157 162 } 158 163 }
Note: See TracChangeset
for help on using the changeset viewer.