Changeset 219 for trunk/sources/HeuristicLab.DistributedEngine
- Timestamp:
- 05/06/08 12:22:58 (17 years ago)
- Location:
- trunk/sources/HeuristicLab.DistributedEngine
- Files:
-
- 1 added
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs
r36 r219 30 30 using System.IO; 31 31 using System.IO.Compression; 32 using HeuristicLab.PluginInfrastructure; 33 using System.Windows.Forms; 32 34 33 35 namespace HeuristicLab.DistributedEngine { 34 36 public class DistributedEngine : EngineBase, IEditable { 35 private IGridServer server; 36 private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>(); 37 private List<Guid> runningEngines = new List<Guid>(); 37 private JobManager jobManager; 38 private CompositeOperation waitingOperations; 38 39 private string serverAddress; 39 private bool cancelRequested;40 private CompositeOperation waitingOperations;41 40 public string ServerAddress { 42 41 get { return serverAddress; } … … 45 44 serverAddress = value; 46 45 } 47 }48 }49 public override bool Terminated {50 get {51 return myExecutionStack.Count == 0 && runningEngines.Count == 0 && waitingOperations==null;52 46 } 53 47 } … … 66 60 67 61 public override void Execute() { 68 NetTcpBinding binding = new NetTcpBinding(); 69 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes 70 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 71 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 72 binding.Security.Mode = SecurityMode.None; 73 ChannelFactory<IGridServer> factory = new ChannelFactory<IGridServer>(binding); 74 server = factory.CreateChannel(new EndpointAddress(serverAddress)); 75 62 if(jobManager == null) this.jobManager = new JobManager(serverAddress); 63 jobManager.Reset(); 76 64 base.Execute(); 77 65 } … … 81 69 } 82 70 83 public override void Abort() { 84 lock(runningEngines) { 85 cancelRequested = true; 86 foreach(Guid engineGuid in runningEngines) { 87 server.AbortEngine(engineGuid); 71 protected override void ProcessNextOperation() { 72 IOperation operation = myExecutionStack.Pop(); 73 if(operation is AtomicOperation) { 74 AtomicOperation atomicOperation = (AtomicOperation)operation; 75 IOperation next = null; 76 try { 77 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 78 } catch(Exception ex) { 79 // push operation on stack again 80 myExecutionStack.Push(atomicOperation); 81 Abort(); 82 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 88 83 } 89 } 90 } 91 public override void Reset() { 92 base.Reset(); 93 engineOperations.Clear(); 94 runningEngines.Clear(); 95 cancelRequested = false; 96 } 97 98 protected override void ProcessNextOperation() { 99 lock(runningEngines) { 100 if(runningEngines.Count == 0 && cancelRequested) { 101 base.Abort(); 102 cancelRequested = false; 103 if(waitingOperations != null && waitingOperations.Operations.Count != 0) { 104 myExecutionStack.Push(waitingOperations); 105 waitingOperations = null; 84 if(next != null) 85 myExecutionStack.Push(next); 86 OnOperationExecuted(atomicOperation); 87 if(atomicOperation.Operator.Breakpoint) Abort(); 88 } else if(operation is CompositeOperation) { 89 CompositeOperation compositeOperation = (CompositeOperation)operation; 90 if(compositeOperation.ExecuteInParallel) { 91 WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count]; 92 int i = 0; 93 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 94 waithandles[i++] = jobManager.BeginExecuteOperation(OperatorGraph, GlobalScope, parOperation); 106 95 } 107 return; 108 } 109 if(runningEngines.Count != 0) { 110 Guid engineGuid = runningEngines[0]; 111 byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100); 112 if(resultXml != null) { 113 GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress); 114 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 115 IScope oldScope = engineOperations[engineGuid].Scope; 116 oldScope.Clear(); 117 foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) { 118 oldScope.AddVariable(variable); 119 } 120 foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) { 121 oldScope.AddSubScope(subScope); 122 } 123 OnOperationExecuted(engineOperations[engineGuid]); 124 125 if(cancelRequested & resultEngine.ExecutionStack.Count != 0) { 126 if(waitingOperations == null) { 127 waitingOperations = new CompositeOperation(); 128 waitingOperations.ExecuteInParallel = false; 129 } 130 CompositeOperation task = new CompositeOperation(); 131 while(resultEngine.ExecutionStack.Count > 0) { 132 AtomicOperation oldOperation = (AtomicOperation)resultEngine.ExecutionStack.Pop(); 133 if(oldOperation.Scope == resultEngine.InitialOperation.Scope) { 134 oldOperation = new AtomicOperation(oldOperation.Operator, oldScope); 135 } 136 task.AddOperation(oldOperation); 137 } 138 waitingOperations.AddOperation(task); 139 } 140 runningEngines.Remove(engineGuid); 141 engineOperations.Remove(engineGuid); 96 WaitHandle.WaitAll(waithandles); 97 if(jobManager.Exception != null) { 98 myExecutionStack.Push(compositeOperation); 99 Abort(); 100 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(jobManager.Exception); }); 142 101 } 143 return; 144 } 145 IOperation operation = myExecutionStack.Pop(); 146 if(operation is AtomicOperation) { 147 AtomicOperation atomicOperation = (AtomicOperation)operation; 148 IOperation next = null; 149 try { 150 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 151 } catch(Exception ex) { 152 // push operation on stack again 153 myExecutionStack.Push(atomicOperation); 154 Abort(); 155 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 156 } 157 if(next != null) 158 myExecutionStack.Push(next); 159 OnOperationExecuted(atomicOperation); 160 if(atomicOperation.Operator.Breakpoint) Abort(); 161 } else if(operation is CompositeOperation) { 162 CompositeOperation compositeOperation = (CompositeOperation)operation; 163 if(compositeOperation.ExecuteInParallel) { 164 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 165 ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed? 166 MemoryStream memStream = new MemoryStream(); 167 GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); 168 PersistenceManager.Save(engine, stream); 169 stream.Close(); 170 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 171 runningEngines.Add(currentEngineGuid); 172 engineOperations[currentEngineGuid] = parOperation; 173 } 174 } else { 175 for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--) 176 myExecutionStack.Push(compositeOperation.Operations[i]); 177 } 102 } else { 103 for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--) 104 myExecutionStack.Push(compositeOperation.Operations[i]); 178 105 } 179 106 } -
trunk/sources/HeuristicLab.DistributedEngine/HeuristicLab.DistributedEngine.csproj
r30 r219 50 50 <ItemGroup> 51 51 <Compile Include="HeuristicLabDistributedEnginePlugin.cs" /> 52 <Compile Include="JobManager.cs" /> 52 53 <Compile Include="Properties\AssemblyInfo.cs" /> 53 54 <Compile Include="DistributedEngine.cs" />
Note: See TracChangeset
for help on using the changeset viewer.