Changeset 219
- Timestamp:
- 05/06/08 12:22:58 (17 years ago)
- Location:
- trunk/sources
- Files:
-
- 1 added
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs
r36 r219 30 30 using System.IO; 31 31 using System.IO.Compression; 32 using HeuristicLab.PluginInfrastructure; 33 using System.Windows.Forms; 32 34 33 35 namespace HeuristicLab.DistributedEngine { 34 36 public class DistributedEngine : EngineBase, IEditable { 35 private IGridServer server; 36 private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>(); 37 private List<Guid> runningEngines = new List<Guid>(); 37 private JobManager jobManager; 38 private CompositeOperation waitingOperations; 38 39 private string serverAddress; 39 private bool cancelRequested;40 private CompositeOperation waitingOperations;41 40 public string ServerAddress { 42 41 get { return serverAddress; } … … 45 44 serverAddress = value; 46 45 } 47 }48 }49 public override bool Terminated {50 get {51 return myExecutionStack.Count == 0 && runningEngines.Count == 0 && waitingOperations==null;52 46 } 53 47 } … … 66 60 67 61 public override void Execute() { 68 NetTcpBinding binding = new NetTcpBinding(); 69 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes 70 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 71 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 72 binding.Security.Mode = SecurityMode.None; 73 ChannelFactory<IGridServer> factory = new ChannelFactory<IGridServer>(binding); 74 server = factory.CreateChannel(new EndpointAddress(serverAddress)); 75 62 if(jobManager == null) this.jobManager = new JobManager(serverAddress); 63 jobManager.Reset(); 76 64 base.Execute(); 77 65 } … … 81 69 } 82 70 83 public override void Abort() { 84 lock(runningEngines) { 85 cancelRequested = true; 86 foreach(Guid engineGuid in runningEngines) { 87 server.AbortEngine(engineGuid); 71 protected override void ProcessNextOperation() { 72 IOperation operation = myExecutionStack.Pop(); 73 if(operation is AtomicOperation) { 74 AtomicOperation atomicOperation = (AtomicOperation)operation; 75 IOperation next = null; 76 try { 77 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 78 } catch(Exception ex) { 79 // push operation on stack again 80 myExecutionStack.Push(atomicOperation); 81 Abort(); 82 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 88 83 } 89 } 90 } 91 public override void Reset() { 92 base.Reset(); 93 engineOperations.Clear(); 94 runningEngines.Clear(); 95 cancelRequested = false; 96 } 97 98 protected override void ProcessNextOperation() { 99 lock(runningEngines) { 100 if(runningEngines.Count == 0 && cancelRequested) { 101 base.Abort(); 102 cancelRequested = false; 103 if(waitingOperations != null && waitingOperations.Operations.Count != 0) { 104 myExecutionStack.Push(waitingOperations); 105 waitingOperations = null; 84 if(next != null) 85 myExecutionStack.Push(next); 86 OnOperationExecuted(atomicOperation); 87 if(atomicOperation.Operator.Breakpoint) Abort(); 88 } else if(operation is CompositeOperation) { 89 CompositeOperation compositeOperation = (CompositeOperation)operation; 90 if(compositeOperation.ExecuteInParallel) { 91 WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count]; 92 int i = 0; 93 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 94 waithandles[i++] = jobManager.BeginExecuteOperation(OperatorGraph, GlobalScope, parOperation); 106 95 } 107 return; 108 } 109 if(runningEngines.Count != 0) { 110 Guid engineGuid = runningEngines[0]; 111 byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100); 112 if(resultXml != null) { 113 GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress); 114 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 115 IScope oldScope = engineOperations[engineGuid].Scope; 116 oldScope.Clear(); 117 foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) { 118 oldScope.AddVariable(variable); 119 } 120 foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) { 121 oldScope.AddSubScope(subScope); 122 } 123 OnOperationExecuted(engineOperations[engineGuid]); 124 125 if(cancelRequested & resultEngine.ExecutionStack.Count != 0) { 126 if(waitingOperations == null) { 127 waitingOperations = new CompositeOperation(); 128 waitingOperations.ExecuteInParallel = false; 129 } 130 CompositeOperation task = new CompositeOperation(); 131 while(resultEngine.ExecutionStack.Count > 0) { 132 AtomicOperation oldOperation = (AtomicOperation)resultEngine.ExecutionStack.Pop(); 133 if(oldOperation.Scope == resultEngine.InitialOperation.Scope) { 134 oldOperation = new AtomicOperation(oldOperation.Operator, oldScope); 135 } 136 task.AddOperation(oldOperation); 137 } 138 waitingOperations.AddOperation(task); 139 } 140 runningEngines.Remove(engineGuid); 141 engineOperations.Remove(engineGuid); 96 WaitHandle.WaitAll(waithandles); 97 if(jobManager.Exception != null) { 98 myExecutionStack.Push(compositeOperation); 99 Abort(); 100 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(jobManager.Exception); }); 142 101 } 143 return; 144 } 145 IOperation operation = myExecutionStack.Pop(); 146 if(operation is AtomicOperation) { 147 AtomicOperation atomicOperation = (AtomicOperation)operation; 148 IOperation next = null; 149 try { 150 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 151 } catch(Exception ex) { 152 // push operation on stack again 153 myExecutionStack.Push(atomicOperation); 154 Abort(); 155 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 156 } 157 if(next != null) 158 myExecutionStack.Push(next); 159 OnOperationExecuted(atomicOperation); 160 if(atomicOperation.Operator.Breakpoint) Abort(); 161 } else if(operation is CompositeOperation) { 162 CompositeOperation compositeOperation = (CompositeOperation)operation; 163 if(compositeOperation.ExecuteInParallel) { 164 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 165 ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed? 166 MemoryStream memStream = new MemoryStream(); 167 GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); 168 PersistenceManager.Save(engine, stream); 169 stream.Close(); 170 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 171 runningEngines.Add(currentEngineGuid); 172 engineOperations[currentEngineGuid] = parOperation; 173 } 174 } else { 175 for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--) 176 myExecutionStack.Push(compositeOperation.Operations[i]); 177 } 102 } else { 103 for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--) 104 myExecutionStack.Push(compositeOperation.Operations[i]); 178 105 } 179 106 } -
trunk/sources/HeuristicLab.DistributedEngine/HeuristicLab.DistributedEngine.csproj
r30 r219 50 50 <ItemGroup> 51 51 <Compile Include="HeuristicLabDistributedEnginePlugin.cs" /> 52 <Compile Include="JobManager.cs" /> 52 53 <Compile Include="Properties\AssemblyInfo.cs" /> 53 54 <Compile Include="DistributedEngine.cs" /> -
trunk/sources/HeuristicLab.Grid/ClientForm.cs
r115 r219 46 46 private ProcessingEngine currentEngine; 47 47 private string clientUrl; 48 private object locker = new object();49 48 50 49 public ClientForm() { … … 62 61 63 62 // windows XP returns the external ip on index 0 while windows vista returns the external ip on index 2 64 if 65 clientUrl = "net.tcp://" + Dns.GetHostAddresses(Dns.GetHostName())[2] + ":" + clientPort.Text + "/Grid/Client";63 if(System.Environment.OSVersion.Version.Major >= 6) { 64 clientUrl = "net.tcp://" + Dns.GetHostAddresses(Dns.GetHostName())[2] + ":" + clientPort.Text + "/Grid/Client"; 66 65 } else { 67 clientUrl = "net.tcp://" + Dns.GetHostAddresses(Dns.GetHostName())[0] + ":" + clientPort.Text + "/Grid/Client";66 clientUrl = "net.tcp://" + Dns.GetHostAddresses(Dns.GetHostName())[0] + ":" + clientPort.Text + "/Grid/Client"; 68 67 } 69 68 … … 87 86 statusTextBox.Text = "Waiting for engine"; 88 87 89 } catch 88 } catch(CommunicationException ex) { 90 89 MessageBox.Show("Exception while connecting to the server: " + ex.Message); 91 90 clientHost.Abort(); … … 106 105 107 106 private void fetchOperationTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { 108 lock(locker) {109 byte[] engineXml;110 fetchOperationTimer.Stop();111 if(engineStore.TryTakeEngine( clientUrl,out currentGuid, out engineXml)) {107 byte[] engineXml; 108 fetchOperationTimer.Stop(); 109 try { 110 if(engineStore.TryTakeEngine(out currentGuid, out engineXml)) { 112 111 currentEngine = RestoreEngine(engineXml); 113 112 if(InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Executing engine"; }); } else statusTextBox.Text = "Executing engine"; … … 126 125 fetchOperationTimer.Start(); 127 126 } 127 } catch(Exception ex) { 128 currentEngine = null; 129 currentGuid = Guid.Empty; 130 fetchOperationTimer.Interval = 5000; 131 fetchOperationTimer.Start(); 128 132 } 129 133 } 130 134 public void Abort(Guid guid) { 131 lock(locker) { 132 if(!IsRunningEngine(guid)) return; 133 currentEngine.Abort(); 134 } 135 throw new NotSupportedException(); 135 136 } 136 137 public bool IsRunningEngine(Guid guid) { 137 return currentGuid == guid;138 throw new NotSupportedException(); 138 139 } 139 140 private ProcessingEngine RestoreEngine(byte[] engine) { -
trunk/sources/HeuristicLab.Grid/EngineStore.cs
r35 r219 31 31 private List<Guid> engineList; 32 32 private Dictionary<Guid, byte[]> waitingEngines; 33 private Dictionary<Guid, byte[]> runningEngines;34 33 private Dictionary<Guid, ManualResetEvent> waitHandles; 35 34 private Dictionary<Guid, byte[]> results; 36 private Dictionary<Guid, string> runningClients;35 private Dictionary<Guid, DateTime> resultDate; 37 36 private object bigLock; 38 37 private ChannelFactory<IClient> clientChannelFactory; … … 45 44 public int RunningJobs { 46 45 get { 47 return runningEngines.Count;46 return waitHandles.Count; 48 47 } 49 48 } … … 58 57 engineList = new List<Guid>(); 59 58 waitingEngines = new Dictionary<Guid, byte[]>(); 60 runningEngines = new Dictionary<Guid, byte[]>();61 runningClients = new Dictionary<Guid, string>();62 59 waitHandles = new Dictionary<Guid, ManualResetEvent>(); 63 60 results = new Dictionary<Guid, byte[]>(); 61 resultDate = new Dictionary<Guid, DateTime>(); 64 62 bigLock = new object(); 65 63 … … 73 71 } 74 72 75 public bool TryTakeEngine( string clientUrl,out Guid guid, out byte[] engine) {73 public bool TryTakeEngine(out Guid guid, out byte[] engine) { 76 74 lock(bigLock) { 77 75 if(engineList.Count == 0) { … … 84 82 engine = waitingEngines[guid]; 85 83 waitingEngines.Remove(guid); 86 runningEngines[guid] = engine;87 runningClients[guid] = clientUrl;88 84 return true; 89 85 } … … 93 89 public void StoreResult(Guid guid, byte[] result) { 94 90 lock(bigLock) { 95 if(!runningEngines.ContainsKey(guid)) return; // ignore result when the engine is not known to be running 96 97 runningEngines.Remove(guid); 98 runningClients.Remove(guid); 91 // clear old results 92 List<Guid> expiredResults = FindExpiredResults(DateTime.Now.AddHours(-1.0)); 93 foreach(Guid expiredGuid in expiredResults) { 94 results.Remove(expiredGuid); 95 waitHandles.Remove(expiredGuid); 96 resultDate.Remove(expiredGuid); 97 } 98 // add the new result 99 99 results[guid] = result; 100 resultDate[guid] = DateTime.Now; 100 101 waitHandles[guid].Set(); 101 102 } 103 } 104 105 private List<Guid> FindExpiredResults(DateTime expirationDate) { 106 List<Guid> expiredResults = new List<Guid>(); 107 foreach(Guid guid in results.Keys) { 108 if(resultDate[guid] < expirationDate) { 109 expiredResults.Add(guid); 110 } 111 } 112 return expiredResults; 102 113 } 103 114 … … 113 124 return GetResult(guid, System.Threading.Timeout.Infinite); 114 125 } 126 115 127 internal byte[] GetResult(Guid guid, int timeout) { 116 128 lock(bigLock) { 117 if(waitHandles.ContainsKey(guid)) { 129 // result already available 130 if(results.ContainsKey(guid)) { 131 // if the wait-handle for this result is still alive then close and remove it 132 if(waitHandles.ContainsKey(guid)) { 133 ManualResetEvent waitHandle = waitHandles[guid]; 134 waitHandle.Close(); 135 waitHandles.Remove(guid); 136 } 137 return results[guid]; 138 } else { 139 // result not yet available, if there is also no wait-handle for that result then we will never have a result and can return null 140 if(!waitHandles.ContainsKey(guid)) return null; 141 142 // otherwise we have a wait-handle and can wait for the result 118 143 ManualResetEvent waitHandle = waitHandles[guid]; 144 // wait 119 145 if(waitHandle.WaitOne(timeout, true)) { 146 // ok got the result in within the wait time => close and remove the wait-hande and return the result 120 147 waitHandle.Close(); 121 148 waitHandles.Remove(guid); 122 149 byte[] result = results[guid]; 123 results.Remove(guid);124 150 return result; 125 151 } else { 152 // no result yet return without result 126 153 return null; 127 154 } 128 } else {129 return null;130 155 } 131 156 } … … 133 158 134 159 internal void AbortEngine(Guid guid) { 135 string clientUrl = "";136 160 lock(bigLock) { 137 if(runningClients.ContainsKey(guid)) { 138 clientUrl = runningClients[guid]; 139 IClient client = clientChannelFactory.CreateChannel(new EndpointAddress(clientUrl)); 140 client.Abort(guid); 141 } else if(waitingEngines.ContainsKey(guid)) { 161 if(waitingEngines.ContainsKey(guid)) { 142 162 byte[] engine = waitingEngines[guid]; 143 163 waitingEngines.Remove(guid); … … 148 168 } 149 169 } 170 171 internal JobState JobState(Guid guid) { 172 lock(bigLock) { 173 if(waitingEngines.ContainsKey(guid)) return HeuristicLab.Grid.JobState.Waiting; 174 else if(waitHandles.ContainsKey(guid)) return HeuristicLab.Grid.JobState.Busy; 175 else if(results.ContainsKey(guid)) return HeuristicLab.Grid.JobState.Finished; 176 else return HeuristicLab.Grid.JobState.Unkown; 177 } 178 } 150 179 } 151 180 } -
trunk/sources/HeuristicLab.Grid/GridServer.cs
r33 r219 35 35 } 36 36 37 public JobState JobState(Guid guid) { 38 return engineStore.JobState(guid); 39 } 40 37 41 public Guid BeginExecuteEngine(byte[] engine) { 38 42 Guid guid = Guid.NewGuid(); -
trunk/sources/HeuristicLab.Grid/IEngineStore.cs
r32 r219 30 30 interface IEngineStore { 31 31 [OperationContract] 32 bool TryTakeEngine( string clientUrl,out Guid guid, out byte[] engine);32 bool TryTakeEngine(out Guid guid, out byte[] engine); 33 33 34 34 [OperationContract] -
trunk/sources/HeuristicLab.Grid/IGridServer.cs
r33 r219 26 26 27 27 namespace HeuristicLab.Grid { 28 public enum JobState { 29 Unkown, 30 Waiting, 31 Busy, 32 Finished 33 } 34 35 28 36 [ServiceContract(Namespace = "http://HeuristicLab.Grid")] 29 37 public interface IGridServer { 38 [OperationContract] 39 JobState JobState(Guid guid); 30 40 [OperationContract] 31 41 Guid BeginExecuteEngine(byte[] engine); -
trunk/sources/HeuristicLab.Grid/ProcessingEngine.cs
r27 r219 68 68 myExecutionStack.Push(atomicOperation); 69 69 Abort(); 70 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });71 70 } 72 71 if(next != null) 73 72 myExecutionStack.Push(next); 74 OnOperationExecuted(atomicOperation);75 73 if(atomicOperation.Operator.Breakpoint) Abort(); 76 74 } else if(operation is CompositeOperation) {
Note: See TracChangeset
for help on using the changeset viewer.