Changeset 386
- Timestamp:
- 07/21/08 16:40:49 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Grid/JobManager.cs
r383 r386 31 31 using System.IO.Compression; 32 32 using System.Windows.Forms; 33 using System.Diagnostics; 33 34 34 35 namespace HeuristicLab.Grid { 35 36 public class JobManager { 37 private const int MAX_RESTARTS = 5; 38 private const int MAX_CONNECTION_RETRIES = 10; 39 private const int RETRY_TIMEOUT_SEC = 60; 40 private const int CHECK_RESULTS_TIMEOUT = 3; 41 36 42 private IGridServer server; 37 43 private string address; 44 private object waitingQueueLock = new object(); 45 private Queue<ProcessingEngine> waitingEngines = new Queue<ProcessingEngine>(); 46 private object runningQueueLock = new object(); 47 private Queue<Guid> runningEngines = new Queue<Guid>(); 48 38 49 private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>(); 39 private Dictionary< Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();50 private Dictionary<ProcessingEngine, ManualResetEvent> waithandles = new Dictionary<ProcessingEngine, ManualResetEvent>(); 40 51 private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>(); 52 private Dictionary<ProcessingEngine, int> restarts = new Dictionary<ProcessingEngine, int>(); 53 41 54 private List<IOperation> erroredOperations = new List<IOperation>(); 42 55 private object connectionLock = new object(); 43 56 private object dictionaryLock = new object(); 44 57 45 private const int MAX_RESTARTS = 5; 46 private const int MAX_CONNECTION_RETRIES = 10; 47 private const int RETRY_TIMEOUT_SEC = 60; 48 private const int CHECK_RESULTS_TIMEOUT = 10; 58 private ManualResetEvent runningWaitHandle = new ManualResetEvent(false); 59 private ManualResetEvent waitingWaitHandle = new ManualResetEvent(false); 49 60 50 61 private ChannelFactory<IGridServer> factory; … … 52 63 public JobManager(string address) { 53 64 this.address = address; 65 Thread starterThread = new Thread(StartEngines); 66 Thread resultsGatheringThread = new Thread(GetResults); 67 starterThread.Start(); 68 resultsGatheringThread.Start(); 54 69 } 55 70 … … 62 77 results.Clear(); 63 78 erroredOperations.Clear(); 79 runningEngines.Clear(); 80 waitingEngines.Clear(); 81 restarts.Clear(); 64 82 } 65 83 } … … 79 97 } 80 98 99 public void StartEngines() { 100 try { 101 while(true) { 102 bool enginesWaiting = false; 103 lock(waitingQueueLock) { 104 enginesWaiting = waitingEngines.Count > 0; 105 } 106 if(enginesWaiting) { 107 ProcessingEngine engine; 108 lock(waitingQueueLock) { 109 engine = waitingEngines.Dequeue(); 110 } 111 int nRestarts = 0; 112 lock(dictionaryLock) { 113 if(restarts.ContainsKey(engine)) { 114 nRestarts = restarts[engine]; 115 restarts[engine] = nRestarts + 1; 116 } else { 117 restarts[engine] = 0; 118 } 119 } 120 if(nRestarts < MAX_RESTARTS) { 121 byte[] zippedEngine = ZipEngine(engine); 122 Guid currentEngineGuid = Guid.Empty; 123 bool success = false; 124 int retryCount = 0; 125 do { 126 try { 127 lock(connectionLock) { 128 currentEngineGuid = server.BeginExecuteEngine(zippedEngine); 129 } 130 lock(dictionaryLock) { 131 engines[currentEngineGuid] = engine; 132 } 133 lock(runningQueueLock) { 134 runningEngines.Enqueue(currentEngineGuid); 135 } 136 137 success = true; 138 } catch(TimeoutException timeoutException) { 139 if(retryCount++ >= MAX_CONNECTION_RETRIES) { 140 // throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException); 141 lock(waitingQueueLock) { 142 waitingEngines.Enqueue(engine); 143 } 144 success = true; 145 } 146 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 147 } catch(CommunicationException communicationException) { 148 if(retryCount++ >= MAX_CONNECTION_RETRIES) { 149 // throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException); 150 lock(waitingQueueLock) { 151 waitingEngines.Enqueue(engine); 152 } 153 success = true; 154 } 155 ResetConnection(); 156 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 157 } 158 } while(!success); // connection attempts 159 } // restarts 160 else { 161 lock(dictionaryLock) { 162 erroredOperations.Add(engine.InitialOperation); 163 restarts.Remove(engine); 164 Debug.Assert(!engines.ContainsValue(engine)); 165 //// clean up and signal the wait handle then return 166 waithandles[engine].Set(); 167 waithandles.Remove(engine); 168 } 169 } 170 } else { 171 // no engines are waiting 172 waitingWaitHandle.WaitOne(); 173 waitingWaitHandle.Reset(); 174 } 175 } 176 } finally { 177 Debug.Assert(false); // make sure that we are notified when this thread is stopped in debugging 178 } 179 } 180 181 public void GetResults() { 182 try { 183 while(true) { 184 Guid engineGuid = Guid.Empty; 185 lock(runningQueueLock) { 186 if(runningEngines.Count > 0) engineGuid = runningEngines.Dequeue(); 187 } 188 189 if(engineGuid != Guid.Empty) { 190 Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT)); 191 byte[] zippedResult = TryEndExecuteEngine(server, engineGuid); 192 if(zippedResult != null) { // successful 193 lock(dictionaryLock) { 194 ProcessingEngine engine = engines[engineGuid]; 195 engines.Remove(engineGuid); 196 restarts.Remove(engine); 197 // store result 198 results[engine.InitialOperation] = zippedResult; 199 // clean up and signal the wait handle then return 200 waithandles[engine].Set(); 201 waithandles.Remove(engine); 202 } 203 } else { 204 // there was a problem -> check the state of the job and restart if necessary 205 JobState jobState = TryGetJobState(server, engineGuid); 206 if(jobState == JobState.Unkown) { 207 lock(waitingQueueLock) { 208 ProcessingEngine engine = engines[engineGuid]; 209 engines.Remove(engineGuid); 210 waitingEngines.Enqueue(engine); 211 waitingWaitHandle.Set(); 212 } 213 } else { 214 // job still active at the server 215 lock(runningQueueLock) { 216 runningEngines.Enqueue(engineGuid); 217 } 218 } 219 } 220 } else { 221 // no running engines 222 runningWaitHandle.WaitOne(); 223 runningWaitHandle.Reset(); 224 } 225 } 226 } finally { 227 Debug.Assert(false); // just to make sure that I get notified when debugging whenever this thread is killed somehow 228 } 229 } 230 81 231 public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) { 82 232 ProcessingEngine engine = new ProcessingEngine(globalScope, operation); 83 byte[] zippedEngine = ZipEngine(engine); 84 Guid currentEngineGuid = Guid.Empty; 85 bool success = false; 86 int retryCount = 0; 87 do { 88 try { 89 lock(connectionLock) { 90 currentEngineGuid = server.BeginExecuteEngine(zippedEngine); 91 } 92 success = true; 93 } catch(TimeoutException timeoutException) { 94 if(retryCount++ >= MAX_CONNECTION_RETRIES) { 95 throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException); 96 } 97 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 98 } catch(CommunicationException communicationException) { 99 if(retryCount++ >= MAX_CONNECTION_RETRIES) { 100 throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException); 101 } 102 ResetConnection(); 103 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 104 } 105 } while(!success); 106 lock(dictionaryLock) { 107 engines[currentEngineGuid] = engine; 108 waithandles[currentEngineGuid] = new ManualResetEvent(false); 109 } 110 ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid); 111 return waithandles[currentEngineGuid]; 233 waithandles[engine] = new ManualResetEvent(false); 234 lock(waitingQueueLock) { 235 waitingEngines.Enqueue(engine); 236 } 237 waitingWaitHandle.Set(); 238 return waithandles[engine]; 112 239 } 113 240 … … 139 266 } 140 267 141 private void TryGetResult(object state) {142 Guid engineGuid = (Guid)state;143 int restartCounter = 0;144 do {145 Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));146 byte[] zippedResult = TryEndExecuteEngine(server, engineGuid);147 if(zippedResult != null) { // successful148 lock(dictionaryLock) {149 // store result150 results[engines[engineGuid].InitialOperation] = zippedResult;151 // clean up and signal the wait handle then return152 engines.Remove(engineGuid);153 waithandles[engineGuid].Set();154 waithandles.Remove(engineGuid);155 }156 return;157 } else {158 // there was a problem -> check the state of the job and restart if necessary159 JobState jobState = TryGetJobState(server, engineGuid);160 if(jobState == JobState.Unkown) {161 TryRestartJob(engineGuid);162 restartCounter++;163 }164 }165 } while(restartCounter < MAX_RESTARTS);166 lock(dictionaryLock) {167 // the job was never finished and restarting didn't help -> stop trying to execute the job and168 // save the faulted operation in a list to throw an exception when EndExecuteEngine is called.169 erroredOperations.Add(engines[engineGuid].InitialOperation);170 // clean up and signal the wait handle171 engines.Remove(engineGuid);172 waithandles[engineGuid].Set();173 waithandles.Remove(engineGuid);174 }175 }176 177 private void TryRestartJob(Guid engineGuid) {178 // restart job179 ProcessingEngine engine;180 lock(dictionaryLock) {181 engine = engines[engineGuid];182 }183 byte[] zippedEngine = ZipEngine(engine);184 int retries = 0;185 do {186 try {187 lock(connectionLock) {188 server.BeginExecuteEngine(zippedEngine);189 }190 return;191 } catch(TimeoutException) {192 retries++;193 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));194 } catch(CommunicationException) {195 ResetConnection();196 retries++;197 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));198 }199 } while(retries < MAX_CONNECTION_RETRIES);200 }201 202 268 private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) { 203 269 int retries = 0;
Note: See TracChangeset
for help on using the changeset viewer.