Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Grid/JobManager.cs @ 387

Last change on this file since 387 was 387, checked in by gkronber, 16 years ago

changed ManualResetEvents to AutoResetEvents in JobManager (related to #199)

File size: 11.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.IO.Compression;
32using System.Windows.Forms;
33using System.Diagnostics;
34
35namespace HeuristicLab.Grid {
36  public class JobManager {
37    private const int MAX_RESTARTS = 5;
38    private const int MAX_CONNECTION_RETRIES = 10;
39    private const int RETRY_TIMEOUT_SEC = 60;
40    private const int CHECK_RESULTS_TIMEOUT = 3;
41
42    private IGridServer server;
43    private string address;
44    private object waitingQueueLock = new object();
45    private Queue<ProcessingEngine> waitingEngines = new Queue<ProcessingEngine>();
46    private object runningQueueLock = new object();
47    private Queue<Guid> runningEngines = new Queue<Guid>();
48
49    private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
50    private Dictionary<ProcessingEngine, ManualResetEvent> waithandles = new Dictionary<ProcessingEngine, ManualResetEvent>();
51    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
52    private Dictionary<ProcessingEngine, int> restarts = new Dictionary<ProcessingEngine, int>();
53
54    private List<IOperation> erroredOperations = new List<IOperation>();
55    private object connectionLock = new object();
56    private object dictionaryLock = new object();
57
58    private AutoResetEvent runningWaitHandle = new AutoResetEvent(false);
59    private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false);
60
61    private ChannelFactory<IGridServer> factory;
62
63    public JobManager(string address) {
64      this.address = address;
65      Thread starterThread = new Thread(StartEngines);
66      Thread resultsGatheringThread = new Thread(GetResults);
67      starterThread.Start();
68      resultsGatheringThread.Start();
69    }
70
71    public void Reset() {
72      ResetConnection();
73      lock(dictionaryLock) {
74        foreach(WaitHandle wh in waithandles.Values) wh.Close();
75        waithandles.Clear();
76        engines.Clear();
77        results.Clear();
78        erroredOperations.Clear();
79        runningEngines.Clear();
80        waitingEngines.Clear();
81        restarts.Clear();
82      }
83    }
84
85    private void ResetConnection() {
86      lock(connectionLock) {
87        // open a new channel
88        NetTcpBinding binding = new NetTcpBinding();
89        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
90        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
91        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
92        binding.Security.Mode = SecurityMode.None;
93
94        factory = new ChannelFactory<IGridServer>(binding);
95        server = factory.CreateChannel(new EndpointAddress(address));
96      }
97    }
98
99    public void StartEngines() {
100      try {
101        while(true) {
102          bool enginesWaiting = false;
103          lock(waitingQueueLock) {
104            enginesWaiting = waitingEngines.Count > 0;
105          }
106          if(enginesWaiting) {
107            ProcessingEngine engine;
108            lock(waitingQueueLock) {
109              engine = waitingEngines.Dequeue();
110            }
111            int nRestarts = 0;
112            lock(dictionaryLock) {
113              if(restarts.ContainsKey(engine)) {
114                nRestarts = restarts[engine];
115                restarts[engine] = nRestarts + 1;
116              } else {
117                restarts[engine] = 0;
118              }
119            }
120            if(nRestarts < MAX_RESTARTS) {
121              byte[] zippedEngine = ZipEngine(engine);
122              Guid currentEngineGuid = Guid.Empty;
123              bool success = false;
124              int retryCount = 0;
125              do {
126                try {
127                  lock(connectionLock) {
128                    currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
129                  }
130                  lock(dictionaryLock) {
131                    engines[currentEngineGuid] = engine;
132                  }
133                  lock(runningQueueLock) {
134                    runningEngines.Enqueue(currentEngineGuid);
135                  }
136
137                  success = true;
138                } catch(TimeoutException timeoutException) {
139                  if(retryCount++ >= MAX_CONNECTION_RETRIES) {
140                    //                  throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException);
141                    lock(waitingQueueLock) {
142                      waitingEngines.Enqueue(engine);
143                    }
144                    success = true;
145                  }
146                  Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
147                } catch(CommunicationException communicationException) {
148                  if(retryCount++ >= MAX_CONNECTION_RETRIES) {
149                    //                  throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException);
150                    lock(waitingQueueLock) {
151                      waitingEngines.Enqueue(engine);
152                    }
153                    success = true;
154                  }
155                  ResetConnection();
156                  Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
157                }
158              } while(!success); // connection attempts
159            } // restarts
160            else {
161              lock(dictionaryLock) {
162                erroredOperations.Add(engine.InitialOperation);
163                restarts.Remove(engine);
164                Debug.Assert(!engines.ContainsValue(engine));
165                //// clean up and signal the wait handle then return
166                waithandles[engine].Set();
167                waithandles.Remove(engine);
168              }
169            }
170          } else {
171            // no engines are waiting
172            waitingWaitHandle.WaitOne();
173          }
174        }
175      } finally {
176        Debug.Assert(false);  // make sure that we are notified when this thread is stopped in debugging
177      }
178    }
179
180    public void GetResults() {
181      try {
182        while(true) {
183          Guid engineGuid = Guid.Empty;
184          lock(runningQueueLock) {
185            if(runningEngines.Count > 0) engineGuid = runningEngines.Dequeue();
186          }
187
188          if(engineGuid != Guid.Empty) {
189            Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
190            byte[] zippedResult = TryEndExecuteEngine(server, engineGuid);
191            if(zippedResult != null) { // successful
192              lock(dictionaryLock) {
193                ProcessingEngine engine = engines[engineGuid];
194                engines.Remove(engineGuid);
195                restarts.Remove(engine);
196                // store result
197                results[engine.InitialOperation] = zippedResult;
198                // clean up and signal the wait handle then return
199                waithandles[engine].Set();
200                waithandles.Remove(engine);
201              }
202            } else {
203              // there was a problem -> check the state of the job and restart if necessary
204              JobState jobState = TryGetJobState(server, engineGuid);
205              if(jobState == JobState.Unkown) {
206                lock(waitingQueueLock) {
207                  ProcessingEngine engine = engines[engineGuid];
208                  engines.Remove(engineGuid);
209                  waitingEngines.Enqueue(engine);
210                  waitingWaitHandle.Set();
211                }
212              } else {
213                // job still active at the server
214                lock(runningQueueLock) {
215                  runningEngines.Enqueue(engineGuid);
216                }
217              }
218            }
219          } else {
220            // no running engines
221            runningWaitHandle.WaitOne();
222          }
223        }
224      } finally {
225        Debug.Assert(false); // just to make sure that I get notified when debugging whenever this thread is killed somehow
226      }
227    }
228
229    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
230      ProcessingEngine engine = new ProcessingEngine(globalScope, operation);
231      waithandles[engine] = new ManualResetEvent(false);
232      lock(waitingQueueLock) {
233        waitingEngines.Enqueue(engine);
234      }
235      waitingWaitHandle.Set();
236      return waithandles[engine];
237    }
238
239    private byte[] ZipEngine(ProcessingEngine engine) {
240      MemoryStream memStream = new MemoryStream();
241      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
242      PersistenceManager.Save(engine, stream);
243      stream.Close();
244      byte[] zippedEngine = memStream.ToArray();
245      memStream.Close();
246      return zippedEngine;
247    }
248
249    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
250      if(erroredOperations.Contains(operation)) {
251        erroredOperations.Remove(operation);
252        throw new ApplicationException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
253      } else {
254        byte[] zippedResult = null;
255        lock(dictionaryLock) {
256          zippedResult = results[operation];
257          results.Remove(operation);
258        }
259        // restore the engine
260        using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
261          return (ProcessingEngine)PersistenceManager.Load(stream);
262        }
263      }
264    }
265
266    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
267      int retries = 0;
268      do {
269        try {
270          lock(connectionLock) {
271            byte[] zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
272            return zippedResult;
273          }
274        } catch(TimeoutException) {
275          retries++;
276          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
277        } catch(CommunicationException) {
278          ResetConnection();
279          retries++;
280          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
281        }
282      } while(retries < MAX_CONNECTION_RETRIES);
283      return null;
284    }
285
286    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
287      // check if the server is still working on the job
288      int retries = 0;
289      do {
290        try {
291          lock(connectionLock) {
292            JobState jobState = server.JobState(engineGuid);
293            return jobState;
294          }
295        } catch(TimeoutException) {
296          retries++;
297          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
298        } catch(CommunicationException) {
299          ResetConnection();
300          retries++;
301          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
302        }
303      } while(retries < MAX_CONNECTION_RETRIES);
304      return JobState.Unkown;
305    }
306  }
307}
Note: See TracBrowser for help on using the repository browser.