Changeset 33
- Timestamp:
- 03/01/08 15:05:37 (17 years ago)
- Location:
- trunk/sources
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs
r32 r33 34 34 public class DistributedEngine : EngineBase, IEditable { 35 35 private IGridServer server; 36 private Dictionary<Guid, AtomicOperation> runningEngines = new Dictionary<Guid, AtomicOperation>();37 36 private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>(); 37 private List<Guid> runningEngines = new List<Guid>(); 38 38 private string serverAddress; 39 39 public string ServerAddress { … … 43 43 serverAddress = value; 44 44 } 45 } 46 } 47 public override bool Terminated { 48 get { 49 return myExecutionStack.Count == 0 && runningEngines.Count == 0; 45 50 } 46 51 } … … 77 82 public override void Abort() { 78 83 base.Abort(); 79 foreach(Guid engineGuid in runningEngines .Keys) {84 foreach(Guid engineGuid in runningEngines) { 80 85 server.AbortEngine(engineGuid); 81 86 } … … 83 88 84 89 protected override void ProcessNextOperation() { 85 ProcessNextOperation(myExecutionStack, 0); 86 } 87 private void ProcessNextOperation(Stack<IOperation> stack, int currentOperatorIndex) { 88 IOperation operation = stack.Pop(); 90 if(runningEngines.Count != 0) { 91 Guid engineGuid = runningEngines[0]; 92 byte[] scopeXml = server.TryEndExecuteEngine(engineGuid,100); 93 if(scopeXml != null) { 94 GZipStream stream = new GZipStream(new MemoryStream(scopeXml), CompressionMode.Decompress); 95 IScope newScope = (IScope)PersistenceManager.Load(stream); 96 IScope oldScope = engineOperations[engineGuid].Scope; 97 oldScope.Clear(); 98 foreach(IVariable variable in newScope.Variables) { 99 oldScope.AddVariable(variable); 100 } 101 foreach(IScope subScope in newScope.SubScopes) { 102 oldScope.AddSubScope(subScope); 103 } 104 runningEngines.Remove(engineGuid); 105 engineOperations.Remove(engineGuid); 106 } 107 108 if(Canceled) { 109 // write back not finished tasks 110 //CompositeOperation remaining = new CompositeOperation(); 111 //remaining.ExecuteInParallel = true; 112 //for(int i = 0; i < list.tasks.Length; i++) { 113 // if(list.tasks[i].Count > 0) { 114 // CompositeOperation task = new CompositeOperation(); 115 // while(list.tasks[i].Count > 0) 116 // task.AddOperation(list.tasks[i].Pop()); 117 // remaining.AddOperation(task); 118 // } 119 //} 120 //if(remaining.Operations.Count > 0) 121 // stack.Push(remaining); 122 } 123 return; 124 } 125 IOperation operation = myExecutionStack.Pop(); 89 126 if(operation is AtomicOperation) { 90 127 AtomicOperation atomicOperation = (AtomicOperation)operation; … … 94 131 } catch(Exception ex) { 95 132 // push operation on stack again 96 stack.Push(atomicOperation);133 myExecutionStack.Push(atomicOperation); 97 134 Abort(); 98 135 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 99 136 } 100 137 if(next != null) 101 stack.Push(next);138 myExecutionStack.Push(next); 102 139 OnOperationExecuted(atomicOperation); 103 140 if(atomicOperation.Operator.Breakpoint) Abort(); … … 112 149 stream.Close(); 113 150 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 114 runningEngines[currentEngineGuid] = parOperation; 151 runningEngines.Add(currentEngineGuid); 152 engineOperations[currentEngineGuid] = parOperation; 115 153 } 116 foreach(Guid engineGuid in runningEngines.Keys) {117 byte[] scopeXml = server.EndExecuteEngine(engineGuid);118 GZipStream stream = new GZipStream(new MemoryStream(scopeXml), CompressionMode.Decompress);119 IScope newScope = (IScope)PersistenceManager.Load(stream);120 IScope oldScope = runningEngines[engineGuid].Scope;121 oldScope.Clear();122 foreach(IVariable variable in newScope.Variables) {123 oldScope.AddVariable(variable);124 }125 foreach(IScope subScope in newScope.SubScopes) {126 oldScope.AddSubScope(subScope);127 }128 }129 130 // TASK (gkronber 12.2.08)131 //if (Canceled) {132 // // write back not finished tasks133 // CompositeOperation remaining = new CompositeOperation();134 // remaining.ExecuteInParallel = true;135 // for (int i = 0; i < list.tasks.Length; i++) {136 // if (list.tasks[i].Count > 0) {137 // CompositeOperation task = new CompositeOperation();138 // while (list.tasks[i].Count > 0)139 // task.AddOperation(list.tasks[i].Pop());140 // remaining.AddOperation(task);141 // }142 // }143 // if (remaining.Operations.Count > 0)144 // stack.Push(remaining);145 //}146 154 } else { 147 155 for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--) 148 stack.Push(compositeOperation.Operations[i]);156 myExecutionStack.Push(compositeOperation.Operations[i]); 149 157 } 150 158 } -
trunk/sources/HeuristicLab.Grid/EngineStore.cs
r32 r33 32 32 private Dictionary<Guid, byte[]> waitingEngines; 33 33 private Dictionary<Guid, byte[]> runningEngines; 34 private Dictionary<Guid, ManualResetEvent> waitHandles; 34 35 private Dictionary<Guid, byte[]> results; 35 36 private Dictionary<Guid, string> runningClients; 36 37 private object bigLock; 37 38 private ChannelFactory<IClient> clientChannelFactory; 38 39 private event EventHandler ResultRecieved;40 41 39 public int WaitingJobs { 42 40 get { … … 62 60 runningEngines = new Dictionary<Guid, byte[]>(); 63 61 runningClients = new Dictionary<Guid, string>(); 62 waitHandles = new Dictionary<Guid, ManualResetEvent>(); 64 63 results = new Dictionary<Guid, byte[]>(); 65 64 bigLock = new object(); … … 98 97 runningClients.Remove(guid); 99 98 results[guid] = result; 100 OnResultRecieved(guid);99 waitHandles[guid].Set(); 101 100 } 102 101 } … … 106 105 engineQueue.Enqueue(guid); 107 106 waitingEngines.Add(guid, engine); 107 waitHandles.Add(guid, new ManualResetEvent(false)); 108 108 } 109 109 } … … 118 118 119 119 internal byte[] GetResult(Guid guid) { 120 ManualResetEvent waitHandle = new ManualResetEvent(false); 120 return GetResult(guid, System.Threading.Timeout.Infinite); 121 } 122 internal byte[] GetResult(Guid guid, int timeout) { 121 123 lock(bigLock) { 122 if(results.ContainsKey(guid)) { 123 byte[] result = results[guid]; 124 results.Remove(guid); 125 return result; 124 if(waitHandles.ContainsKey(guid)) { 125 ManualResetEvent waitHandle = waitHandles[guid]; 126 if(waitHandle.WaitOne(timeout, false)) { 127 waitHandle.Close(); 128 waitHandles.Remove(guid); 129 byte[] result = results[guid]; 130 results.Remove(guid); 131 return result; 132 } else { 133 return null; 134 } 126 135 } else { 127 ResultRecieved += delegate(object source, EventArgs args) { 128 ResultRecievedEventArgs resultArgs = (ResultRecievedEventArgs)args; 129 if(resultArgs.resultGuid == guid) { 130 waitHandle.Set(); 131 } 132 }; 136 return null; 133 137 } 134 }135 136 waitHandle.WaitOne();137 waitHandle.Close();138 139 lock(bigLock) {140 byte[] result = results[guid];141 results.Remove(guid);142 return result;143 138 } 144 139 } … … 157 152 } 158 153 } 159 160 private void OnResultRecieved(Guid guid) {161 ResultRecievedEventArgs args = new ResultRecievedEventArgs();162 args.resultGuid = guid;163 if(ResultRecieved != null) {164 ResultRecieved(this, args);165 }166 }167 168 private class ResultRecievedEventArgs : EventArgs {169 public Guid resultGuid;170 }171 154 } 172 155 } -
trunk/sources/HeuristicLab.Grid/GridServer.cs
r32 r33 44 44 return engineStore.GetResult(guid); 45 45 } 46 public byte[] TryEndExecuteEngine(Guid guid, int timeout) { 47 return engineStore.GetResult(guid, timeout); 48 } 46 49 47 50 public void AbortEngine(Guid engine) { -
trunk/sources/HeuristicLab.Grid/IGridServer.cs
r2 r33 30 30 [OperationContract] 31 31 Guid BeginExecuteEngine(byte[] engine); 32 33 32 [OperationContract] 34 33 byte[] EndExecuteEngine(Guid engineGuid); 35 34 [OperationContract] 35 byte[] TryEndExecuteEngine(Guid engineGuid, int timeout); 36 36 [OperationContract] 37 37 void AbortEngine(Guid engineGuid);
Note: See TracChangeset
for help on using the changeset viewer.