Changeset 2055 for trunk/sources/HeuristicLab.Grid/3.2
- Timestamp:
- 06/17/09 18:07:15 (15 years ago)
- Location:
- trunk/sources/HeuristicLab.Grid/3.2
- Files:
-
- 1 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Grid/3.2/HeuristicLab.Grid-3.2.csproj
r1534 r2055 98 98 <DependentUpon>ClientForm.cs</DependentUpon> 99 99 </Compile> 100 <Compile Include="AsyncGridResult.cs" /> 100 101 <Compile Include="Database.cs" /> 101 102 <Compile Include="EngineRunner.cs" /> -
trunk/sources/HeuristicLab.Grid/3.2/IGridServer.cs
r1529 r2055 27 27 namespace HeuristicLab.Grid { 28 28 public enum JobState { 29 Unknown ,29 Unknown = 0, // default value 30 30 Waiting, 31 31 Busy, -
trunk/sources/HeuristicLab.Grid/3.2/JobManager.cs
r1529 r2055 43 43 private const int RESULT_POLLING_TIMEOUT = 5; 44 44 45 private class Job {46 public Guid guid;47 public ProcessingEngine engine;48 public ManualResetEvent waitHandle;49 public int restarts;50 }51 52 45 private IGridServer server; 53 46 private string address; 54 47 private object waitingQueueLock = new object(); 55 private Queue< Job> waitingJobs = new Queue<Job>();48 private Queue<AsyncGridResult> waitingJobs = new Queue<AsyncGridResult>(); 56 49 private object runningQueueLock = new object(); 57 private Queue<Job> runningJobs = new Queue<Job>(); 58 private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>(); 59 60 private List<IOperation> erroredOperations = new List<IOperation>(); 50 private Queue<AsyncGridResult> runningJobs = new Queue<AsyncGridResult>(); 61 51 private object connectionLock = new object(); 62 private object dictionaryLock = new object();63 52 64 53 private AutoResetEvent runningWaitHandle = new AutoResetEvent(false); … … 77 66 public void Reset() { 78 67 ResetConnection(); 79 lock (dictionaryLock) {80 foreach (Job jin waitingJobs) {81 j.waitHandle.Close();68 lock (waitingQueueLock) { 69 foreach (AsyncGridResult r in waitingJobs) { 70 r.WaitHandle.Close(); 82 71 } 83 72 waitingJobs.Clear(); 84 foreach(Job j in runningJobs) { 85 j.waitHandle.Close(); 73 } 74 lock (runningQueueLock) { 75 foreach (AsyncGridResult r in runningJobs) { 76 r.WaitHandle.Close(); 86 77 } 87 78 runningJobs.Clear(); 88 results.Clear();89 erroredOperations.Clear();90 79 } 91 80 } … … 93 82 private void ResetConnection() { 94 83 Trace.TraceInformation("Reset connection in JobManager"); 95 lock (connectionLock) {84 lock (connectionLock) { 96 85 // open a new channel 97 86 NetTcpBinding binding = new NetTcpBinding(); … … 107 96 public void StartEngines() { 108 97 try { 109 while (true) {110 Jobjob = null;111 lock (waitingQueueLock) {112 if (waitingJobs.Count > 0) job = waitingJobs.Dequeue();113 } 114 if (job==null) waitingWaitHandle.WaitOne(); // no jobs waiting98 while (true) { 99 AsyncGridResult job = null; 100 lock (waitingQueueLock) { 101 if (waitingJobs.Count > 0) job = waitingJobs.Dequeue(); 102 } 103 if (job == null) waitingWaitHandle.WaitOne(); // no jobs waiting 115 104 else { 116 Guid currentEngineGuid = TryStartExecuteEngine(job. engine);117 if (currentEngineGuid == Guid.Empty) {105 Guid currentEngineGuid = TryStartExecuteEngine(job.Engine); 106 if (currentEngineGuid == Guid.Empty) { 118 107 // couldn't start the job -> requeue 119 if (job.restarts < MAX_RESTARTS) {120 job. restarts++;121 lock (waitingQueueLock) waitingJobs.Enqueue(job);108 if (job.Restarts < MAX_RESTARTS) { 109 job.Restarts++; 110 lock (waitingQueueLock) waitingJobs.Enqueue(job); 122 111 waitingWaitHandle.Set(); 123 112 } else { 124 113 // max restart count reached -> give up on this job and flag error 125 lock(dictionaryLock) { 126 erroredOperations.Add(job.engine.InitialOperation); 127 job.waitHandle.Set(); 128 } 114 job.Aborted = true; 115 job.SignalFinished(); 129 116 } 130 117 } else { 131 118 // job started successfully 132 job. guid = currentEngineGuid;133 lock (runningQueueLock) {119 job.Guid = currentEngineGuid; 120 lock (runningQueueLock) { 134 121 runningJobs.Enqueue(job); 135 122 runningWaitHandle.Set(); … … 138 125 } 139 126 } 140 } catch(Exception e) { 141 Trace.TraceError("Exception "+e+" in JobManager.StartEngines() killed the start-engine thread\n"+e.StackTrace); 127 } 128 catch (Exception e) { 129 Trace.TraceError("Exception " + e + " in JobManager.StartEngines() killed the start-engine thread\n" + e.StackTrace); 142 130 } 143 131 } … … 146 134 public void GetResults() { 147 135 try { 148 while (true) {149 Jobjob = null;150 lock (runningQueueLock) {151 if (runningJobs.Count > 0) job = runningJobs.Dequeue();152 } 153 if (job == null) runningWaitHandle.WaitOne(); // no jobs running136 while (true) { 137 AsyncGridResult job = null; 138 lock (runningQueueLock) { 139 if (runningJobs.Count > 0) job = runningJobs.Dequeue(); 140 } 141 if (job == null) runningWaitHandle.WaitOne(); // no jobs running 154 142 else { 155 byte[] zippedResult = TryEndExecuteEngine(server, job.guid); 156 if(zippedResult != null) { // successful 157 lock(dictionaryLock) { 158 // store result 159 results[job.engine.InitialOperation] = zippedResult; 160 // notify consumer that result is ready 161 job.waitHandle.Set(); 162 } 143 byte[] zippedResult = TryEndExecuteEngine(server, job.Guid); 144 if (zippedResult != null) { 145 // successful => store result 146 job.ZippedResult = zippedResult; 147 // notify consumer that result is ready 148 job.SignalFinished(); 163 149 } else { 164 150 // there was a problem -> check the state of the job and restart if necessary 165 JobState jobState = TryGetJobState(server, job. guid);166 if (jobState == JobState.Unknown) {167 job. restarts++;168 lock (waitingQueueLock) {151 JobState jobState = TryGetJobState(server, job.Guid); 152 if (jobState == JobState.Unknown) { 153 job.Restarts++; 154 lock (waitingQueueLock) { 169 155 waitingJobs.Enqueue(job); 170 156 waitingWaitHandle.Set(); … … 172 158 } else { 173 159 // job still active at the server 174 lock (runningQueueLock) {160 lock (runningQueueLock) { 175 161 runningJobs.Enqueue(job); 176 162 runningWaitHandle.Set(); … … 181 167 } 182 168 } 183 } catch(Exception e) { 184 Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n"+ e.StackTrace); 185 } 186 } 187 188 public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) { 189 return BeginExecuteEngine(new ProcessingEngine(globalScope, operation)); 190 } 191 192 public WaitHandle BeginExecuteEngine(ProcessingEngine engine) { 193 Job job = new Job(); 194 job.engine = engine; 195 job.waitHandle = new ManualResetEvent(false); 196 job.restarts = 0; 197 lock(waitingQueueLock) { 198 waitingJobs.Enqueue(job); 169 } 170 catch (Exception e) { 171 Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n" + e.StackTrace); 172 } 173 } 174 175 public AsyncGridResult BeginExecuteEngine(ProcessingEngine engine) { 176 AsyncGridResult asyncResult = new AsyncGridResult(engine); 177 asyncResult.Engine = engine; 178 lock (waitingQueueLock) { 179 waitingJobs.Enqueue(asyncResult); 199 180 } 200 181 waitingWaitHandle.Set(); 201 return job.waitHandle;202 } 203 204 private byte[] ZipEngine( ProcessingEngine engine) {182 return asyncResult; 183 } 184 185 private byte[] ZipEngine(IEngine engine) { 205 186 return PersistenceManager.SaveToGZip(engine); 206 187 } 207 188 208 public ProcessingEngine EndExecuteOperation(AtomicOperation operation) { 209 if(erroredOperations.Contains(operation)) { 210 erroredOperations.Remove(operation); 189 public IEngine EndExecuteEngine(AsyncGridResult asyncResult) { 190 if (asyncResult.Aborted) { 211 191 throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server."); 212 192 } else { 213 byte[] zippedResult = null;214 lock(dictionaryLock) {215 zippedResult = results[operation];216 results.Remove(operation);217 }218 193 // restore the engine 219 return ( ProcessingEngine)PersistenceManager.RestoreFromGZip(zippedResult);220 } 221 } 222 223 private Guid TryStartExecuteEngine( ProcessingEngine engine) {194 return (IEngine)PersistenceManager.RestoreFromGZip(asyncResult.ZippedResult); 195 } 196 } 197 198 private Guid TryStartExecuteEngine(IEngine engine) { 224 199 byte[] zippedEngine = ZipEngine(engine); 200 return SavelyExecute(() => server.BeginExecuteEngine(zippedEngine)); 201 } 202 203 private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) { 204 return SavelyExecute(() => { 205 byte[] zippedResult = server.TryEndExecuteEngine(engineGuid); 206 return zippedResult; 207 }); 208 } 209 210 private JobState TryGetJobState(IGridServer server, Guid engineGuid) { 211 return SavelyExecute(() => server.JobState(engineGuid)); 212 } 213 214 private TResult SavelyExecute<TResult>(Func<TResult> a) { 225 215 int retries = 0; 226 Guid guid = Guid.Empty;227 216 do { 228 217 try { 229 lock (connectionLock) {230 guid = server.BeginExecuteEngine(zippedEngine);231 } 232 return guid;233 } catch(TimeoutException) {218 lock (connectionLock) { 219 return a(); 220 } 221 } 222 catch (TimeoutException) { 234 223 retries++; 235 224 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 236 } catch(CommunicationException) { 225 } 226 catch (CommunicationException) { 237 227 ResetConnection(); 238 228 retries++; 239 229 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 240 230 } 241 } while(retries < MAX_CONNECTION_RETRIES); 242 Trace.TraceWarning("Reached max connection retries in TryStartExecuteEngine"); 243 return Guid.Empty; 244 } 245 246 private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) { 247 int retries = 0; 248 do { 249 try { 250 lock(connectionLock) { 251 byte[] zippedResult = server.TryEndExecuteEngine(engineGuid); 252 return zippedResult; 253 } 254 } catch(TimeoutException) { 255 retries++; 256 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 257 } catch(CommunicationException) { 258 ResetConnection(); 259 retries++; 260 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 261 } 262 } while(retries < MAX_CONNECTION_RETRIES); 263 Trace.TraceWarning("Reached max connection retries in TryEndExecuteEngine"); 264 return null; 265 } 266 267 private JobState TryGetJobState(IGridServer server, Guid engineGuid) { 268 // check if the server is still working on the job 269 int retries = 0; 270 do { 271 try { 272 lock(connectionLock) { 273 JobState jobState = server.JobState(engineGuid); 274 return jobState; 275 } 276 } catch(TimeoutException) { 277 retries++; 278 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 279 } catch(CommunicationException) { 280 ResetConnection(); 281 retries++; 282 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 283 } 284 } while(retries < MAX_CONNECTION_RETRIES); 285 Trace.TraceWarning("Reached max connection retries in TryGetJobState"); 286 return JobState.Unknown; 231 } while (retries < MAX_CONNECTION_RETRIES); 232 Trace.TraceWarning("Reached max connection retries"); 233 return default(TResult); 287 234 } 288 235 }
Note: See TracChangeset
for help on using the changeset viewer.