Changeset 33 for trunk/sources/HeuristicLab.DistributedEngine
- Timestamp:
- 03/01/08 15:05:37 (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs
r32 r33 34 34 public class DistributedEngine : EngineBase, IEditable { 35 35 private IGridServer server; 36 private Dictionary<Guid, AtomicOperation> runningEngines = new Dictionary<Guid, AtomicOperation>();37 36 private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>(); 37 private List<Guid> runningEngines = new List<Guid>(); 38 38 private string serverAddress; 39 39 public string ServerAddress { … … 43 43 serverAddress = value; 44 44 } 45 } 46 } 47 public override bool Terminated { 48 get { 49 return myExecutionStack.Count == 0 && runningEngines.Count == 0; 45 50 } 46 51 } … … 77 82 public override void Abort() { 78 83 base.Abort(); 79 foreach(Guid engineGuid in runningEngines .Keys) {84 foreach(Guid engineGuid in runningEngines) { 80 85 server.AbortEngine(engineGuid); 81 86 } … … 83 88 84 89 protected override void ProcessNextOperation() { 85 ProcessNextOperation(myExecutionStack, 0); 86 } 87 private void ProcessNextOperation(Stack<IOperation> stack, int currentOperatorIndex) { 88 IOperation operation = stack.Pop(); 90 if(runningEngines.Count != 0) { 91 Guid engineGuid = runningEngines[0]; 92 byte[] scopeXml = server.TryEndExecuteEngine(engineGuid,100); 93 if(scopeXml != null) { 94 GZipStream stream = new GZipStream(new MemoryStream(scopeXml), CompressionMode.Decompress); 95 IScope newScope = (IScope)PersistenceManager.Load(stream); 96 IScope oldScope = engineOperations[engineGuid].Scope; 97 oldScope.Clear(); 98 foreach(IVariable variable in newScope.Variables) { 99 oldScope.AddVariable(variable); 100 } 101 foreach(IScope subScope in newScope.SubScopes) { 102 oldScope.AddSubScope(subScope); 103 } 104 runningEngines.Remove(engineGuid); 105 engineOperations.Remove(engineGuid); 106 } 107 108 if(Canceled) { 109 // write back not finished tasks 110 //CompositeOperation remaining = new CompositeOperation(); 111 //remaining.ExecuteInParallel = true; 112 //for(int i = 0; i < list.tasks.Length; i++) { 113 // if(list.tasks[i].Count > 0) { 114 // CompositeOperation task = new CompositeOperation(); 115 // while(list.tasks[i].Count > 0) 116 // task.AddOperation(list.tasks[i].Pop()); 117 // remaining.AddOperation(task); 118 // } 119 //} 120 //if(remaining.Operations.Count > 0) 121 // stack.Push(remaining); 122 } 123 return; 124 } 125 IOperation operation = myExecutionStack.Pop(); 89 126 if(operation is AtomicOperation) { 90 127 AtomicOperation atomicOperation = (AtomicOperation)operation; … … 94 131 } catch(Exception ex) { 95 132 // push operation on stack again 96 stack.Push(atomicOperation);133 myExecutionStack.Push(atomicOperation); 97 134 Abort(); 98 135 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 99 136 } 100 137 if(next != null) 101 stack.Push(next);138 myExecutionStack.Push(next); 102 139 OnOperationExecuted(atomicOperation); 103 140 if(atomicOperation.Operator.Breakpoint) Abort(); … … 112 149 stream.Close(); 113 150 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 114 runningEngines[currentEngineGuid] = parOperation; 151 runningEngines.Add(currentEngineGuid); 152 engineOperations[currentEngineGuid] = parOperation; 115 153 } 116 foreach(Guid engineGuid in runningEngines.Keys) {117 byte[] scopeXml = server.EndExecuteEngine(engineGuid);118 GZipStream stream = new GZipStream(new MemoryStream(scopeXml), CompressionMode.Decompress);119 IScope newScope = (IScope)PersistenceManager.Load(stream);120 IScope oldScope = runningEngines[engineGuid].Scope;121 oldScope.Clear();122 foreach(IVariable variable in newScope.Variables) {123 oldScope.AddVariable(variable);124 }125 foreach(IScope subScope in newScope.SubScopes) {126 oldScope.AddSubScope(subScope);127 }128 }129 130 // TASK (gkronber 12.2.08)131 //if (Canceled) {132 // // write back not finished tasks133 // CompositeOperation remaining = new CompositeOperation();134 // remaining.ExecuteInParallel = true;135 // for (int i = 0; i < list.tasks.Length; i++) {136 // if (list.tasks[i].Count > 0) {137 // CompositeOperation task = new CompositeOperation();138 // while (list.tasks[i].Count > 0)139 // task.AddOperation(list.tasks[i].Pop());140 // remaining.AddOperation(task);141 // }142 // }143 // if (remaining.Operations.Count > 0)144 // stack.Push(remaining);145 //}146 154 } else { 147 155 for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--) 148 stack.Push(compositeOperation.Operations[i]);156 myExecutionStack.Push(compositeOperation.Operations[i]); 149 157 } 150 158 }
Note: See TracChangeset
for help on using the changeset viewer.