Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Grid/JobManager.cs @ 386

Last change on this file since 386 was 386, checked in by gkronber, 16 years ago

worked on #199

File size: 11.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.IO.Compression;
32using System.Windows.Forms;
33using System.Diagnostics;
34
35namespace HeuristicLab.Grid {
36  public class JobManager {
37    private const int MAX_RESTARTS = 5;
38    private const int MAX_CONNECTION_RETRIES = 10;
39    private const int RETRY_TIMEOUT_SEC = 60;
40    private const int CHECK_RESULTS_TIMEOUT = 3;
41
42    private IGridServer server;
43    private string address;
44    private object waitingQueueLock = new object();
45    private Queue<ProcessingEngine> waitingEngines = new Queue<ProcessingEngine>();
46    private object runningQueueLock = new object();
47    private Queue<Guid> runningEngines = new Queue<Guid>();
48
49    private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
50    private Dictionary<ProcessingEngine, ManualResetEvent> waithandles = new Dictionary<ProcessingEngine, ManualResetEvent>();
51    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
52    private Dictionary<ProcessingEngine, int> restarts = new Dictionary<ProcessingEngine, int>();
53
54    private List<IOperation> erroredOperations = new List<IOperation>();
55    private object connectionLock = new object();
56    private object dictionaryLock = new object();
57
58    private ManualResetEvent runningWaitHandle = new ManualResetEvent(false);
59    private ManualResetEvent waitingWaitHandle = new ManualResetEvent(false);
60
61    private ChannelFactory<IGridServer> factory;
62
63    public JobManager(string address) {
64      this.address = address;
65      Thread starterThread = new Thread(StartEngines);
66      Thread resultsGatheringThread = new Thread(GetResults);
67      starterThread.Start();
68      resultsGatheringThread.Start();
69    }
70
71    public void Reset() {
72      ResetConnection();
73      lock(dictionaryLock) {
74        foreach(WaitHandle wh in waithandles.Values) wh.Close();
75        waithandles.Clear();
76        engines.Clear();
77        results.Clear();
78        erroredOperations.Clear();
79        runningEngines.Clear();
80        waitingEngines.Clear();
81        restarts.Clear();
82      }
83    }
84
85    private void ResetConnection() {
86      lock(connectionLock) {
87        // open a new channel
88        NetTcpBinding binding = new NetTcpBinding();
89        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
90        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
91        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
92        binding.Security.Mode = SecurityMode.None;
93
94        factory = new ChannelFactory<IGridServer>(binding);
95        server = factory.CreateChannel(new EndpointAddress(address));
96      }
97    }
98
99    public void StartEngines() {
100      try {
101        while(true) {
102          bool enginesWaiting = false;
103          lock(waitingQueueLock) {
104            enginesWaiting = waitingEngines.Count > 0;
105          }
106          if(enginesWaiting) {
107            ProcessingEngine engine;
108            lock(waitingQueueLock) {
109              engine = waitingEngines.Dequeue();
110            }
111            int nRestarts = 0;
112            lock(dictionaryLock) {
113              if(restarts.ContainsKey(engine)) {
114                nRestarts = restarts[engine];
115                restarts[engine] = nRestarts + 1;
116              } else {
117                restarts[engine] = 0;
118              }
119            }
120            if(nRestarts < MAX_RESTARTS) {
121              byte[] zippedEngine = ZipEngine(engine);
122              Guid currentEngineGuid = Guid.Empty;
123              bool success = false;
124              int retryCount = 0;
125              do {
126                try {
127                  lock(connectionLock) {
128                    currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
129                  }
130                  lock(dictionaryLock) {
131                    engines[currentEngineGuid] = engine;
132                  }
133                  lock(runningQueueLock) {
134                    runningEngines.Enqueue(currentEngineGuid);
135                  }
136
137                  success = true;
138                } catch(TimeoutException timeoutException) {
139                  if(retryCount++ >= MAX_CONNECTION_RETRIES) {
140                    //                  throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException);
141                    lock(waitingQueueLock) {
142                      waitingEngines.Enqueue(engine);
143                    }
144                    success = true;
145                  }
146                  Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
147                } catch(CommunicationException communicationException) {
148                  if(retryCount++ >= MAX_CONNECTION_RETRIES) {
149                    //                  throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException);
150                    lock(waitingQueueLock) {
151                      waitingEngines.Enqueue(engine);
152                    }
153                    success = true;
154                  }
155                  ResetConnection();
156                  Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
157                }
158              } while(!success); // connection attempts
159            } // restarts
160            else {
161              lock(dictionaryLock) {
162                erroredOperations.Add(engine.InitialOperation);
163                restarts.Remove(engine);
164                Debug.Assert(!engines.ContainsValue(engine));
165                //// clean up and signal the wait handle then return
166                waithandles[engine].Set();
167                waithandles.Remove(engine);
168              }
169            }
170          } else {
171            // no engines are waiting
172            waitingWaitHandle.WaitOne();
173            waitingWaitHandle.Reset();
174          }
175        }
176      } finally {
177        Debug.Assert(false);  // make sure that we are notified when this thread is stopped in debugging
178      }
179    }
180
181    public void GetResults() {
182      try {
183        while(true) {
184          Guid engineGuid = Guid.Empty;
185          lock(runningQueueLock) {
186            if(runningEngines.Count > 0) engineGuid = runningEngines.Dequeue();
187          }
188
189          if(engineGuid != Guid.Empty) {
190            Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
191            byte[] zippedResult = TryEndExecuteEngine(server, engineGuid);
192            if(zippedResult != null) { // successful
193              lock(dictionaryLock) {
194                ProcessingEngine engine = engines[engineGuid];
195                engines.Remove(engineGuid);
196                restarts.Remove(engine);
197                // store result
198                results[engine.InitialOperation] = zippedResult;
199                // clean up and signal the wait handle then return
200                waithandles[engine].Set();
201                waithandles.Remove(engine);
202              }
203            } else {
204              // there was a problem -> check the state of the job and restart if necessary
205              JobState jobState = TryGetJobState(server, engineGuid);
206              if(jobState == JobState.Unkown) {
207                lock(waitingQueueLock) {
208                  ProcessingEngine engine = engines[engineGuid];
209                  engines.Remove(engineGuid);
210                  waitingEngines.Enqueue(engine);
211                  waitingWaitHandle.Set();
212                }
213              } else {
214                // job still active at the server
215                lock(runningQueueLock) {
216                  runningEngines.Enqueue(engineGuid);
217                }
218              }
219            }
220          } else {
221            // no running engines
222            runningWaitHandle.WaitOne();
223            runningWaitHandle.Reset();
224          }
225        }
226      } finally {
227        Debug.Assert(false); // just to make sure that I get notified when debugging whenever this thread is killed somehow
228      }
229    }
230
231    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
232      ProcessingEngine engine = new ProcessingEngine(globalScope, operation);
233      waithandles[engine] = new ManualResetEvent(false);
234      lock(waitingQueueLock) {
235        waitingEngines.Enqueue(engine);
236      }
237      waitingWaitHandle.Set();
238      return waithandles[engine];
239    }
240
241    private byte[] ZipEngine(ProcessingEngine engine) {
242      MemoryStream memStream = new MemoryStream();
243      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
244      PersistenceManager.Save(engine, stream);
245      stream.Close();
246      byte[] zippedEngine = memStream.ToArray();
247      memStream.Close();
248      return zippedEngine;
249    }
250
251    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
252      if(erroredOperations.Contains(operation)) {
253        erroredOperations.Remove(operation);
254        throw new ApplicationException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
255      } else {
256        byte[] zippedResult = null;
257        lock(dictionaryLock) {
258          zippedResult = results[operation];
259          results.Remove(operation);
260        }
261        // restore the engine
262        using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
263          return (ProcessingEngine)PersistenceManager.Load(stream);
264        }
265      }
266    }
267
268    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
269      int retries = 0;
270      do {
271        try {
272          lock(connectionLock) {
273            byte[] zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
274            return zippedResult;
275          }
276        } catch(TimeoutException) {
277          retries++;
278          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
279        } catch(CommunicationException) {
280          ResetConnection();
281          retries++;
282          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
283        }
284      } while(retries < MAX_CONNECTION_RETRIES);
285      return null;
286    }
287
288    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
289      // check if the server is still working on the job
290      int retries = 0;
291      do {
292        try {
293          lock(connectionLock) {
294            JobState jobState = server.JobState(engineGuid);
295            return jobState;
296          }
297        } catch(TimeoutException) {
298          retries++;
299          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
300        } catch(CommunicationException) {
301          ResetConnection();
302          retries++;
303          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
304        }
305      } while(retries < MAX_CONNECTION_RETRIES);
306      return JobState.Unkown;
307    }
308  }
309}
Note: See TracBrowser for help on using the repository browser.