Changeset 391 for trunk/sources/HeuristicLab.Grid/JobManager.cs
- Timestamp:
- 07/22/08 21:15:56 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Grid/JobManager.cs
r387 r391 34 34 35 35 namespace HeuristicLab.Grid { 36 public class JobExecutionException : ApplicationException { 37 public JobExecutionException(string msg) : base(msg) { } 38 } 39 36 40 public class JobManager { 37 41 private const int MAX_RESTARTS = 5; … … 40 44 private const int CHECK_RESULTS_TIMEOUT = 3; 41 45 46 private class Job { 47 public Guid guid; 48 public ProcessingEngine engine; 49 public ManualResetEvent waitHandle; 50 public int restarts; 51 } 52 42 53 private IGridServer server; 43 54 private string address; 44 55 private object waitingQueueLock = new object(); 45 private Queue< ProcessingEngine> waitingEngines = new Queue<ProcessingEngine>();56 private Queue<Job> waitingJobs = new Queue<Job>(); 46 57 private object runningQueueLock = new object(); 47 private Queue<Guid> runningEngines = new Queue<Guid>(); 48 49 private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>(); 50 private Dictionary<ProcessingEngine, ManualResetEvent> waithandles = new Dictionary<ProcessingEngine, ManualResetEvent>(); 58 private Queue<Job> runningJobs = new Queue<Job>(); 51 59 private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>(); 52 private Dictionary<ProcessingEngine, int> restarts = new Dictionary<ProcessingEngine, int>();53 60 54 61 private List<IOperation> erroredOperations = new List<IOperation>(); … … 72 79 ResetConnection(); 73 80 lock(dictionaryLock) { 74 foreach(WaitHandle wh in waithandles.Values) wh.Close(); 75 waithandles.Clear(); 76 engines.Clear(); 81 foreach(Job j in waitingJobs) { 82 j.waitHandle.Close(); 83 } 84 waitingJobs.Clear(); 85 foreach(Job j in runningJobs) { 86 j.waitHandle.Close(); 87 } 88 runningJobs.Clear(); 77 89 results.Clear(); 78 90 erroredOperations.Clear(); 79 runningEngines.Clear();80 waitingEngines.Clear();81 restarts.Clear();82 91 } 83 92 } … … 100 109 try { 101 110 while(true) { 102 bool enginesWaiting = false;111 Job job = null; 103 112 lock(waitingQueueLock) { 104 enginesWaiting = waitingEngines.Count > 0; 105 } 106 if(enginesWaiting) { 107 ProcessingEngine engine; 108 lock(waitingQueueLock) { 109 engine = waitingEngines.Dequeue(); 113 if(waitingJobs.Count > 0) job = waitingJobs.Dequeue(); 114 } 115 if(job==null) waitingWaitHandle.WaitOne(); // no jobs waiting 116 else { 117 Guid currentEngineGuid = TryStartExecuteEngine(job.engine); 118 if(currentEngineGuid == Guid.Empty) { 119 // couldn't start the job -> requeue 120 if(job.restarts < MAX_RESTARTS) { 121 job.restarts++; 122 lock(waitingQueueLock) waitingJobs.Enqueue(job); 123 waitingWaitHandle.Set(); 124 } else { 125 // max restart count reached -> give up on this job and flag error 126 lock(dictionaryLock) { 127 erroredOperations.Add(job.engine.InitialOperation); 128 job.waitHandle.Set(); 129 } 130 } 131 } else { 132 // job started successfully 133 job.guid = currentEngineGuid; 134 lock(runningQueueLock) { 135 runningJobs.Enqueue(job); 136 runningWaitHandle.Set(); 137 } 110 138 } 111 int nRestarts = 0;112 lock(dictionaryLock) {113 if(restarts.ContainsKey(engine)) {114 nRestarts = restarts[engine];115 restarts[engine] = nRestarts + 1;116 } else {117 restarts[engine] = 0;118 }119 }120 if(nRestarts < MAX_RESTARTS) {121 byte[] zippedEngine = ZipEngine(engine);122 Guid currentEngineGuid = Guid.Empty;123 bool success = false;124 int retryCount = 0;125 do {126 try {127 lock(connectionLock) {128 currentEngineGuid = server.BeginExecuteEngine(zippedEngine);129 }130 lock(dictionaryLock) {131 engines[currentEngineGuid] = engine;132 }133 lock(runningQueueLock) {134 runningEngines.Enqueue(currentEngineGuid);135 }136 137 success = true;138 } catch(TimeoutException timeoutException) {139 if(retryCount++ >= MAX_CONNECTION_RETRIES) {140 // throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException);141 lock(waitingQueueLock) {142 waitingEngines.Enqueue(engine);143 }144 success = true;145 }146 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));147 } catch(CommunicationException communicationException) {148 if(retryCount++ >= MAX_CONNECTION_RETRIES) {149 // throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException);150 lock(waitingQueueLock) {151 waitingEngines.Enqueue(engine);152 }153 success = true;154 }155 ResetConnection();156 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));157 }158 } while(!success); // connection attempts159 } // restarts160 else {161 lock(dictionaryLock) {162 erroredOperations.Add(engine.InitialOperation);163 restarts.Remove(engine);164 Debug.Assert(!engines.ContainsValue(engine));165 //// clean up and signal the wait handle then return166 waithandles[engine].Set();167 waithandles.Remove(engine);168 }169 }170 } else {171 // no engines are waiting172 waitingWaitHandle.WaitOne();173 139 } 174 140 } … … 177 143 } 178 144 } 145 179 146 180 147 public void GetResults() { 181 148 try { 182 149 while(true) { 183 Guid engineGuid = Guid.Empty;150 Job job = null; 184 151 lock(runningQueueLock) { 185 if(runningEngines.Count > 0) engineGuid = runningEngines.Dequeue(); 186 } 187 188 if(engineGuid != Guid.Empty) { 189 Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT)); 190 byte[] zippedResult = TryEndExecuteEngine(server, engineGuid); 152 if(runningJobs.Count > 0) job = runningJobs.Dequeue(); 153 } 154 if(job == null) runningWaitHandle.WaitOne(); // no jobs running 155 else { 156 byte[] zippedResult = TryEndExecuteEngine(server, job.guid); 191 157 if(zippedResult != null) { // successful 192 158 lock(dictionaryLock) { 193 ProcessingEngine engine = engines[engineGuid];194 engines.Remove(engineGuid);195 restarts.Remove(engine);196 159 // store result 197 results[engine.InitialOperation] = zippedResult; 198 // clean up and signal the wait handle then return 199 waithandles[engine].Set(); 200 waithandles.Remove(engine); 160 results[job.engine.InitialOperation] = zippedResult; 161 // notify consumer that result is ready 162 job.waitHandle.Set(); 201 163 } 202 164 } else { 203 165 // there was a problem -> check the state of the job and restart if necessary 204 JobState jobState = TryGetJobState(server, engineGuid); 205 if(jobState == JobState.Unkown) { 166 JobState jobState = TryGetJobState(server, job.guid); 167 if(jobState == JobState.Unknown) { 168 job.restarts++; 206 169 lock(waitingQueueLock) { 207 ProcessingEngine engine = engines[engineGuid]; 208 engines.Remove(engineGuid); 209 waitingEngines.Enqueue(engine); 170 waitingJobs.Enqueue(job); 210 171 waitingWaitHandle.Set(); 211 172 } … … 213 174 // job still active at the server 214 175 lock(runningQueueLock) { 215 runningEngines.Enqueue(engineGuid); 176 runningJobs.Enqueue(job); 177 runningWaitHandle.Set(); 216 178 } 217 179 } 218 180 } 219 } else {220 // no running engines221 runningWaitHandle.WaitOne();222 181 } 223 182 } … … 228 187 229 188 public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) { 230 ProcessingEngine engine = new ProcessingEngine(globalScope, operation); 231 waithandles[engine] = new ManualResetEvent(false); 189 Job job = new Job(); 190 job.engine = new ProcessingEngine(globalScope, operation); 191 job.waitHandle = new ManualResetEvent(false); 192 job.restarts = 0; 232 193 lock(waitingQueueLock) { 233 waiting Engines.Enqueue(engine);194 waitingJobs.Enqueue(job); 234 195 } 235 196 waitingWaitHandle.Set(); 236 return waithandles[engine];197 return job.waitHandle; 237 198 } 238 199 … … 250 211 if(erroredOperations.Contains(operation)) { 251 212 erroredOperations.Remove(operation); 252 throw new ApplicationException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");213 throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server."); 253 214 } else { 254 215 byte[] zippedResult = null; … … 262 223 } 263 224 } 225 } 226 227 private Guid TryStartExecuteEngine(ProcessingEngine engine) { 228 byte[] zippedEngine = ZipEngine(engine); 229 int retries = 0; 230 Guid guid = Guid.Empty; 231 do { 232 try { 233 lock(connectionLock) { 234 guid = server.BeginExecuteEngine(zippedEngine); 235 } 236 return guid; 237 } catch(TimeoutException) { 238 retries++; 239 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 240 } catch(CommunicationException) { 241 ResetConnection(); 242 retries++; 243 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 244 } 245 } while(retries < MAX_CONNECTION_RETRIES); 246 return Guid.Empty; 264 247 } 265 248 … … 302 285 } 303 286 } while(retries < MAX_CONNECTION_RETRIES); 304 return JobState.Unk own;287 return JobState.Unknown; 305 288 } 306 289 }
Note: See TracChangeset
for help on using the changeset viewer.