Changeset 248 for trunk/sources
- Timestamp:
- 05/13/08 23:06:12 (17 years ago)
- Location:
- trunk/sources/HeuristicLab.DistributedEngine
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs
r228 r248 92 92 WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count]; 93 93 int i = 0; 94 // start all parallel jobs 94 95 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 95 96 waithandles[i++] = jobManager.BeginExecuteOperation(OperatorGraph, GlobalScope, parOperation); 96 97 } 98 99 // wait until all jobs are finished 97 100 // WaitAll works only with maximally 64 waithandles 98 101 if(waithandles.Length <= 64) { … … 103 106 } 104 107 } 105 if(jobManager.Exception != null) {106 myExecutionStack.Push(compositeOperation);107 Abort();108 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(jobManager.Exception); });108 // retrieve results and merge into scope-tree 109 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 110 IScope result = jobManager.EndExecuteOperation(parOperation); 111 MergeScope(parOperation.Scope, result); 109 112 } 110 113 } catch(Exception e) { 111 114 myExecutionStack.Push(compositeOperation); 112 115 Abort(); 113 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred( jobManager.Exception); });116 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); }); 114 117 } 115 118 } else { … … 117 120 myExecutionStack.Push(compositeOperation.Operations[i]); 118 121 } 122 } 123 } 124 125 private void MergeScope(IScope original, IScope result) { 126 // merge the results 127 original.Clear(); 128 foreach(IVariable variable in result.Variables) { 129 original.AddVariable(variable); 130 } 131 foreach(IScope subScope in result.SubScopes) { 132 original.AddSubScope(subScope); 133 } 134 foreach(KeyValuePair<string, string> alias in result.Aliases) { 135 original.AddAlias(alias.Key, alias.Value); 119 136 } 120 137 } -
trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs
r228 r248 18 18 private Dictionary<Guid, byte[]> runningEngines = new Dictionary<Guid, byte[]>(); 19 19 private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>(); 20 private object locker = new object(); 20 private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>(); 21 private object connectionLock = new object(); 22 private object dictionaryLock = new object(); 23 21 24 private const int MAX_RESTARTS = 5; 22 private Exception exception; 25 private const int MAX_CONNECTION_RETRIES = 10; 26 private const int RETRY_TIMEOUT_SEC = 10; 27 private const int CHECK_RESULTS_TIMEOUT = 10; 28 23 29 private ChannelFactory<IGridServer> factory; 24 public Exception Exception {25 get { return exception; }26 }27 30 28 31 public JobManager(string address) { … … 31 34 32 35 internal void Reset() { 33 lock(locker) {34 ResetConnection();36 ResetConnection(); 37 lock(dictionaryLock) { 35 38 foreach(WaitHandle wh in waithandles.Values) wh.Close(); 36 39 waithandles.Clear(); 37 40 engineOperations.Clear(); 38 41 runningEngines.Clear(); 39 exception = null;42 results.Clear(); 40 43 } 41 44 } 42 45 43 46 private void ResetConnection() { 44 // open a new channel 45 NetTcpBinding binding = new NetTcpBinding(); 46 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes 47 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 48 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 49 binding.Security.Mode = SecurityMode.None; 50 factory = new ChannelFactory<IGridServer>(binding); 51 server = factory.CreateChannel(new EndpointAddress(address)); 47 lock(connectionLock) { 48 // open a new channel 49 NetTcpBinding binding = new NetTcpBinding(); 50 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes 51 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 52 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 53 binding.Security.Mode = SecurityMode.None; 54 55 factory = new ChannelFactory<IGridServer>(binding); 56 server = factory.CreateChannel(new EndpointAddress(address)); 57 } 52 58 } 53 59 … … 58 64 PersistenceManager.Save(engine, stream); 59 65 stream.Close(); 60 if(factory.State != CommunicationState.Opened) 61 ResetConnection(); 62 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 63 lock(locker) { 66 byte[] zippedEngine = memStream.ToArray(); 67 Guid currentEngineGuid = Guid.Empty; 68 bool success = false; 69 int retryCount = 0; 70 do { 71 lock(connectionLock) { 72 if(factory.State != CommunicationState.Opened) 73 ResetConnection(); 74 try { 75 currentEngineGuid = server.BeginExecuteEngine(zippedEngine); 76 success = true; 77 } catch(TimeoutException timeoutException) { 78 if(retryCount < MAX_CONNECTION_RETRIES) { 79 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 80 retryCount++; 81 } else { 82 throw new ApplicationException("Max retries reached.", timeoutException); 83 } 84 } catch(CommunicationException communicationException) { 85 // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution) 86 if(retryCount < MAX_CONNECTION_RETRIES) { 87 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 88 retryCount++; 89 } else { 90 throw new ApplicationException("Max retries reached.", communicationException); 91 } 92 } 93 } 94 } while(!success); 95 lock(dictionaryLock) { 64 96 runningEngines[currentEngineGuid] = memStream.ToArray(); 65 97 engineOperations[currentEngineGuid] = operation; 66 98 waithandles[currentEngineGuid] = new ManualResetEvent(false); 67 ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);68 99 } 100 ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid); 69 101 return waithandles[currentEngineGuid]; 102 } 103 104 public IScope EndExecuteOperation(AtomicOperation operation) { 105 byte[] zippedResult = results[operation]; 106 // restore the engine 107 GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress); 108 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 109 110 return resultEngine.InitialOperation.Scope; 70 111 } 71 112 … … 74 115 int restartCounter = 0; 75 116 do { 76 try { 77 if(factory.State != CommunicationState.Opened) ResetConnection(); 78 byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100); 79 if(resultXml != null) { 80 // restore the engine 81 GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress); 82 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 83 84 // merge the results 85 IScope oldScope = engineOperations[engineGuid].Scope; 86 oldScope.Clear(); 87 foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) { 88 oldScope.AddVariable(variable); 89 } 90 foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) { 91 oldScope.AddSubScope(subScope); 92 } 93 foreach(KeyValuePair<string, string> alias in resultEngine.InitialOperation.Scope.Aliases) { 94 oldScope.AddAlias(alias.Key, alias.Value); 95 } 96 97 lock(locker) { 98 // signal the wait handle and clean up then return 99 waithandles[engineGuid].Set(); 100 engineOperations.Remove(engineGuid); 101 waithandles.Remove(engineGuid); 102 runningEngines.Remove(engineGuid); 103 } 104 return; 105 } else { 106 // check if the server is still working on the job 107 JobState jobState = server.JobState(engineGuid); 108 if(jobState == JobState.Unkown) { 109 // restart job 110 byte[] packedEngine; 111 lock(locker) { 112 packedEngine = runningEngines[engineGuid]; 117 byte[] zippedResult = null; 118 lock(connectionLock) { 119 bool success = false; 120 int retries = 0; 121 do { 122 if(factory.State != CommunicationState.Opened) ResetConnection(); 123 try { 124 zippedResult = server.TryEndExecuteEngine(engineGuid, 100); 125 success = true; 126 } catch(TimeoutException timeoutException) { 127 success = false; 128 retries++; 129 Thread.Sleep(RETRY_TIMEOUT_SEC); 130 } catch(CommunicationException communicationException) { 131 success = false; 132 retries++; 133 Thread.Sleep(RETRY_TIMEOUT_SEC); 134 } 135 136 } while(!success && retries < MAX_CONNECTION_RETRIES); 137 } 138 if(zippedResult != null) { 139 lock(dictionaryLock) { 140 // store result 141 results[engineOperations[engineGuid]] = zippedResult; 142 143 // signal the wait handle and clean up then return 144 engineOperations.Remove(engineGuid); 145 runningEngines.Remove(engineGuid); 146 waithandles[engineGuid].Set(); 147 waithandles.Remove(engineGuid); 148 } 149 return; 150 } else { 151 // check if the server is still working on the job 152 bool success = false; 153 int retries = 0; 154 JobState jobState = JobState.Unkown; 155 do { 156 try { 157 lock(connectionLock) { 158 if(factory.State != CommunicationState.Opened) ResetConnection(); 159 jobState = server.JobState(engineGuid); 113 160 } 114 server.BeginExecuteEngine(packedEngine); 115 restartCounter++; 116 } 161 success = true; 162 } catch(TimeoutException timeoutException) { 163 retries++; 164 success = false; 165 Thread.Sleep(RETRY_TIMEOUT_SEC); 166 } catch(CommunicationException communicationException) { 167 retries++; 168 success = false; 169 Thread.Sleep(RETRY_TIMEOUT_SEC); 170 } 171 } while(!success && retries < MAX_CONNECTION_RETRIES); 172 if(jobState == JobState.Unkown) { 173 // restart job 174 byte[] packedEngine; 175 lock(dictionaryLock) { 176 packedEngine = runningEngines[engineGuid]; 177 } 178 success = false; 179 retries = 0; 180 do { 181 try { 182 lock(connectionLock) { 183 if(factory.State != CommunicationState.Opened) ResetConnection(); 184 server.BeginExecuteEngine(packedEngine); 185 } 186 success = true; 187 } catch(TimeoutException timeoutException) { 188 success = false; 189 retries++; 190 Thread.Sleep(RETRY_TIMEOUT_SEC); 191 } catch(CommunicationException communicationException) { 192 success = false; 193 retries++; 194 Thread.Sleep(RETRY_TIMEOUT_SEC); 195 } 196 } while(!success && retries < MAX_CONNECTION_RETRIES); 197 restartCounter++; 117 198 } 118 } catch(Exception e) {119 // catch all exceptions set an exception flag, signal the wait-handle and exit the routine120 this.exception = new Exception("There was a problem with parallel execution", e);121 waithandles[engineGuid].Set();122 return;123 199 } 124 200 125 201 // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem 126 202 if(restartCounter > MAX_RESTARTS) { 127 this.exception = new Exception("Maximal number of parallel job restarts reached"); 128 waithandles[engineGuid].Set(); 129 return; 130 } 131 132 Thread.Sleep(TimeSpan.FromSeconds(10.0)); 203 throw new ApplicationException("Maximum number of job restarts reached."); 204 } 205 206 Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT)); 133 207 } while(true); 134 208 }
Note: See TracChangeset
for help on using the changeset viewer.