Changeset 315
- Timestamp:
- 06/16/08 17:36:29 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs
r281 r315 39 39 private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>(); 40 40 private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>(); 41 private List<IOperation> erroredOperations = new List<IOperation>(); 41 42 private object connectionLock = new object(); 42 43 private object dictionaryLock = new object(); … … 44 45 private const int MAX_RESTARTS = 5; 45 46 private const int MAX_CONNECTION_RETRIES = 10; 46 private const int RETRY_TIMEOUT_SEC = 10;47 private const int RETRY_TIMEOUT_SEC = 60; 47 48 private const int CHECK_RESULTS_TIMEOUT = 10; 48 49 … … 60 61 engines.Clear(); 61 62 results.Clear(); 63 erroredOperations.Clear(); 62 64 } 63 65 } … … 84 86 int retryCount = 0; 85 87 do { 86 lock(connectionLock){87 try{88 try { 89 lock(connectionLock) { 88 90 currentEngineGuid = server.BeginExecuteEngine(zippedEngine); 89 success = true; 90 } catch(TimeoutException timeoutException) { 91 if(retryCount < MAX_CONNECTION_RETRIES) { 92 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 93 retryCount++; 94 } else { 95 throw new ApplicationException("Max retries reached.", timeoutException); 96 } 97 } catch(CommunicationException communicationException) { 98 ResetConnection(); 99 // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution) 100 if(retryCount < MAX_CONNECTION_RETRIES) { 101 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 102 retryCount++; 103 } else { 104 throw new ApplicationException("Max retries reached.", communicationException); 105 } 106 } 91 } 92 success = true; 93 } catch(TimeoutException timeoutException) { 94 if(retryCount++ >= MAX_CONNECTION_RETRIES) { 95 throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException); 96 } 97 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 98 } catch(CommunicationException communicationException) { 99 if(retryCount++ >= MAX_CONNECTION_RETRIES) { 100 throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException); 101 } 102 ResetConnection(); 103 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 107 104 } 108 105 } while(!success); … … 126 123 127 124 public ProcessingEngine EndExecuteOperation(AtomicOperation operation) { 128 byte[] zippedResult = null; 129 lock(dictionaryLock) { 130 zippedResult = results[operation]; 131 results.Remove(operation); 132 } 133 // restore the engine 134 using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) { 135 return (ProcessingEngine)PersistenceManager.Load(stream); 136 } 125 if(erroredOperations.Contains(operation)) { 126 erroredOperations.Remove(operation); 127 throw new ApplicationException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server."); 128 } else { 129 byte[] zippedResult = null; 130 lock(dictionaryLock) { 131 zippedResult = results[operation]; 132 results.Remove(operation); 133 } 134 // restore the engine 135 using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) { 136 return (ProcessingEngine)PersistenceManager.Load(stream); 137 } 138 } 137 139 } 138 140 … … 142 144 do { 143 145 Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT)); 144 byte[] zippedResult = null; 145 lock(connectionLock) { 146 bool success = false; 147 int retries = 0; 148 do { 149 try { 150 zippedResult = server.TryEndExecuteEngine(engineGuid, 100); 151 success = true; 152 } catch(TimeoutException timeoutException) { 153 success = false; 154 retries++; 155 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 156 } catch(CommunicationException communicationException) { 157 ResetConnection(); 158 success = false; 159 retries++; 160 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 161 } 162 163 } while(!success && retries < MAX_CONNECTION_RETRIES); 164 } 165 if(zippedResult != null) { 146 byte[] zippedResult = TryEndExecuteEngine(server, engineGuid); 147 if(zippedResult != null) { // successful 166 148 lock(dictionaryLock) { 167 149 // store result 168 150 results[engines[engineGuid].InitialOperation] = zippedResult; 169 170 // signal the wait handle and clean up then return 151 // clean up and signal the wait handle then return 171 152 engines.Remove(engineGuid); 172 153 waithandles[engineGuid].Set(); … … 175 156 return; 176 157 } else { 177 // check if the server is still working on the job 178 bool success = false; 179 int retries = 0; 180 JobState jobState = JobState.Unkown; 181 do { 182 try { 183 lock(connectionLock) { 184 jobState = server.JobState(engineGuid); 185 } 186 success = true; 187 } catch(TimeoutException timeoutException) { 188 retries++; 189 success = false; 190 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 191 } catch(CommunicationException communicationException) { 192 ResetConnection(); 193 retries++; 194 success = false; 195 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 196 } 197 } while(!success && retries < MAX_CONNECTION_RETRIES); 158 // there was a problem -> check the state of the job and restart if necessary 159 JobState jobState = TryGetJobState(server, engineGuid); 198 160 if(jobState == JobState.Unkown) { 199 // restart job 200 ProcessingEngine engine; 201 lock(dictionaryLock) { 202 engine = engines[engineGuid]; 203 } 204 byte[] zippedEngine = ZipEngine(engine); 205 success = false; 206 retries = 0; 207 do { 208 try { 209 lock(connectionLock) { 210 server.BeginExecuteEngine(zippedEngine); 211 } 212 success = true; 213 } catch(TimeoutException timeoutException) { 214 success = false; 215 retries++; 216 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 217 } catch(CommunicationException communicationException) { 218 ResetConnection(); 219 success = false; 220 retries++; 221 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 222 } 223 } while(!success && retries < MAX_CONNECTION_RETRIES); 161 TryRestartJob(engineGuid); 224 162 restartCounter++; 225 163 } 226 164 } 227 228 // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem 229 if(restartCounter > MAX_RESTARTS) { 230 throw new ApplicationException("Maximum number of job restarts reached."); 231 } 232 } while(true); 165 } while(restartCounter < MAX_RESTARTS); 166 lock(dictionaryLock) { 167 // the job was never finished and restarting didn't help -> stop trying to execute the job and 168 // save the faulted operation in a list to throw an exception when EndExecuteEngine is called. 169 erroredOperations.Add(engines[engineGuid].InitialOperation); 170 // clean up and signal the wait handle 171 engines.Remove(engineGuid); 172 waithandles[engineGuid].Set(); 173 waithandles.Remove(engineGuid); 174 } 175 } 176 177 private void TryRestartJob(Guid engineGuid) { 178 // restart job 179 ProcessingEngine engine; 180 lock(dictionaryLock) { 181 engine = engines[engineGuid]; 182 } 183 byte[] zippedEngine = ZipEngine(engine); 184 int retries = 0; 185 do { 186 try { 187 lock(connectionLock) { 188 server.BeginExecuteEngine(zippedEngine); 189 } 190 return; 191 } catch(TimeoutException timeoutException) { 192 retries++; 193 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 194 } catch(CommunicationException communicationException) { 195 ResetConnection(); 196 retries++; 197 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 198 } 199 } while(retries < MAX_CONNECTION_RETRIES); 200 } 201 202 private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) { 203 int retries = 0; 204 do { 205 try { 206 lock(connectionLock) { 207 byte[] zippedResult = server.TryEndExecuteEngine(engineGuid, 100); 208 return zippedResult; 209 } 210 } catch(TimeoutException timeoutException) { 211 retries++; 212 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 213 } catch(CommunicationException communicationException) { 214 ResetConnection(); 215 retries++; 216 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 217 } 218 } while(retries < MAX_CONNECTION_RETRIES); 219 return null; 220 } 221 222 private JobState TryGetJobState(IGridServer server, Guid engineGuid) { 223 // check if the server is still working on the job 224 int retries = 0; 225 do { 226 try { 227 lock(connectionLock) { 228 JobState jobState = server.JobState(engineGuid); 229 return jobState; 230 } 231 } catch(TimeoutException timeoutException) { 232 retries++; 233 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 234 } catch(CommunicationException communicationException) { 235 ResetConnection(); 236 retries++; 237 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 238 } 239 } while(retries < MAX_CONNECTION_RETRIES); 240 return JobState.Unkown; 241 233 242 } 234 243 }
Note: See TracChangeset
for help on using the changeset viewer.