- Timestamp:
- 03/01/08 16:19:11 (17 years ago)
- Location:
- trunk/sources
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs
r34 r35 37 37 private List<Guid> runningEngines = new List<Guid>(); 38 38 private string serverAddress; 39 private bool cancelRequested; 39 40 public string ServerAddress { 40 41 get { return serverAddress; } … … 50 51 } 51 52 } 52 53 53 public override object Clone(IDictionary<Guid, object> clonedObjects) { 54 54 DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects); … … 81 81 82 82 public override void Abort() { 83 base.Abort(); 84 foreach(Guid engineGuid in runningEngines) { 85 server.AbortEngine(engineGuid); 83 lock(runningEngines) { 84 cancelRequested = true; 85 foreach(Guid engineGuid in runningEngines) { 86 server.AbortEngine(engineGuid); 87 } 86 88 } 89 } 90 public override void Reset() { 91 base.Reset(); 92 engineOperations.Clear(); 93 runningEngines.Clear(); 94 cancelRequested = false; 87 95 } 88 96 89 97 protected override void ProcessNextOperation() { 90 if(runningEngines.Count != 0) { 91 Guid engineGuid = runningEngines[0]; 92 byte[] resultXml = server.TryEndExecuteEngine(engineGuid,100); 93 if(resultXml != null) { 94 GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress); 95 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 96 IScope oldScope = engineOperations[engineGuid].Scope; 97 oldScope.Clear(); 98 foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) { 99 oldScope.AddVariable(variable); 98 lock(runningEngines) { 99 if(runningEngines.Count == 0 && cancelRequested) { 100 base.Abort(); 101 cancelRequested = false; 102 return; 103 } 104 if(runningEngines.Count != 0) { 105 Guid engineGuid = runningEngines[0]; 106 byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100); 107 if(resultXml != null) { 108 GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress); 109 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 110 IScope oldScope = engineOperations[engineGuid].Scope; 111 oldScope.Clear(); 112 foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) { 113 oldScope.AddVariable(variable); 114 } 115 foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) { 116 oldScope.AddSubScope(subScope); 117 } 118 OnOperationExecuted(engineOperations[engineGuid]); 119 if(resultEngine.ExecutionStack.Count != 0) { 120 foreach(IOperation op in resultEngine.ExecutionStack) { 121 myExecutionStack.Push(op); 122 } 123 } 124 runningEngines.Remove(engineGuid); 125 engineOperations.Remove(engineGuid); 100 126 } 101 foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) { 102 oldScope.AddSubScope(subScope); 127 return; 128 } 129 IOperation operation = myExecutionStack.Pop(); 130 if(operation is AtomicOperation) { 131 AtomicOperation atomicOperation = (AtomicOperation)operation; 132 IOperation next = null; 133 try { 134 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 135 } catch(Exception ex) { 136 // push operation on stack again 137 myExecutionStack.Push(atomicOperation); 138 Abort(); 139 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 103 140 } 104 runningEngines.Remove(engineGuid); 105 engineOperations.Remove(engineGuid); 106 } 107 108 if(Canceled) { 109 // write back not finished tasks 110 //CompositeOperation remaining = new CompositeOperation(); 111 //remaining.ExecuteInParallel = true; 112 //for(int i = 0; i < list.tasks.Length; i++) { 113 // if(list.tasks[i].Count > 0) { 114 // CompositeOperation task = new CompositeOperation(); 115 // while(list.tasks[i].Count > 0) 116 // task.AddOperation(list.tasks[i].Pop()); 117 // remaining.AddOperation(task); 118 // } 119 //} 120 //if(remaining.Operations.Count > 0) 121 // stack.Push(remaining); 122 } 123 return; 124 } 125 IOperation operation = myExecutionStack.Pop(); 126 if(operation is AtomicOperation) { 127 AtomicOperation atomicOperation = (AtomicOperation)operation; 128 IOperation next = null; 129 try { 130 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 131 } catch(Exception ex) { 132 // push operation on stack again 133 myExecutionStack.Push(atomicOperation); 134 Abort(); 135 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 136 } 137 if(next != null) 138 myExecutionStack.Push(next); 139 OnOperationExecuted(atomicOperation); 140 if(atomicOperation.Operator.Breakpoint) Abort(); 141 } else if(operation is CompositeOperation) { 142 CompositeOperation compositeOperation = (CompositeOperation)operation; 143 if(compositeOperation.ExecuteInParallel) { 144 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 145 ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed? 146 MemoryStream memStream = new MemoryStream(); 147 GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); 148 PersistenceManager.Save(engine, stream); 149 stream.Close(); 150 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 151 runningEngines.Add(currentEngineGuid); 152 engineOperations[currentEngineGuid] = parOperation; 141 if(next != null) 142 myExecutionStack.Push(next); 143 OnOperationExecuted(atomicOperation); 144 if(atomicOperation.Operator.Breakpoint) Abort(); 145 } else if(operation is CompositeOperation) { 146 CompositeOperation compositeOperation = (CompositeOperation)operation; 147 if(compositeOperation.ExecuteInParallel) { 148 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 149 ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed? 150 MemoryStream memStream = new MemoryStream(); 151 GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); 152 PersistenceManager.Save(engine, stream); 153 stream.Close(); 154 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 155 runningEngines.Add(currentEngineGuid); 156 engineOperations[currentEngineGuid] = parOperation; 157 } 158 } else { 159 for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--) 160 myExecutionStack.Push(compositeOperation.Operations[i]); 153 161 } 154 } else {155 for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)156 myExecutionStack.Push(compositeOperation.Operations[i]);157 162 } 158 163 } -
trunk/sources/HeuristicLab.Grid/ClientForm.cs
r34 r35 46 46 private ProcessingEngine currentEngine; 47 47 private string clientUrl; 48 private object locker = new object(); 48 49 49 50 public ClientForm() { … … 96 97 97 98 private void fetchOperationTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { 98 byte[] engineXml; 99 fetchOperationTimer.Stop(); 100 if (engineStore.TryTakeEngine(clientUrl, out currentGuid, out engineXml)) { 101 currentEngine = RestoreEngine(engineXml); 102 if (InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Executing engine"; }); } else statusTextBox.Text = "Executing engine"; 103 currentEngine.Finished += delegate(object src, EventArgs args) { 104 byte[] resultXml = SaveEngine(currentEngine); 105 engineStore.StoreResult(currentGuid, resultXml); 106 currentGuid = Guid.Empty; 107 currentEngine = null; 108 fetchOperationTimer.Interval = 100; 99 lock(locker) { 100 byte[] engineXml; 101 fetchOperationTimer.Stop(); 102 if(engineStore.TryTakeEngine(clientUrl, out currentGuid, out engineXml)) { 103 currentEngine = RestoreEngine(engineXml); 104 if(InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Executing engine"; }); } else statusTextBox.Text = "Executing engine"; 105 currentEngine.Finished += delegate(object src, EventArgs args) { 106 byte[] resultXml = SaveEngine(currentEngine); 107 engineStore.StoreResult(currentGuid, resultXml); 108 currentGuid = Guid.Empty; 109 currentEngine = null; 110 fetchOperationTimer.Interval = 100; 111 fetchOperationTimer.Start(); 112 }; 113 currentEngine.Execute(); 114 } else { 115 if(InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Waiting for engine"; }); } else statusTextBox.Text = "Waiting for engine"; 116 fetchOperationTimer.Interval = 5000; 109 117 fetchOperationTimer.Start(); 110 }; 111 currentEngine.Execute(); 112 } else { 113 if(InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Waiting for engine"; }); } else statusTextBox.Text = "Waiting for engine"; 114 fetchOperationTimer.Interval = 5000; 115 fetchOperationTimer.Start(); 118 } 116 119 } 117 120 } 118 121 public void Abort(Guid guid) { 119 if(!IsRunningEngine(guid)) return; 120 currentEngine.Abort(); 122 lock(locker) { 123 if(!IsRunningEngine(guid)) return; 124 currentEngine.Abort(); 125 } 121 126 } 122 127 public bool IsRunningEngine(Guid guid) { -
trunk/sources/HeuristicLab.Grid/EngineStore.cs
r34 r35 117 117 if(waitHandles.ContainsKey(guid)) { 118 118 ManualResetEvent waitHandle = waitHandles[guid]; 119 if(waitHandle.WaitOne(timeout, false)) {119 if(waitHandle.WaitOne(timeout, true)) { 120 120 waitHandle.Close(); 121 121 waitHandles.Remove(guid); … … 143 143 waitingEngines.Remove(guid); 144 144 engineList.Remove(guid); 145 waitHandles[guid].Set(); 145 146 results.Add(guid, engine); 146 147 }
Note: See TracChangeset
for help on using the changeset viewer.