Free cookie consent management tool by TermsFeed Policy Generator

source: branches/Operator Architecture Refactoring/HeuristicLab.Grid/3.2/JobManager.cs @ 2498

Last change on this file since 2498 was 1529, checked in by gkronber, 16 years ago

Moved source files of plugins AdvancedOptimizationFrontEnd ... Grid into version-specific sub-folders. #576

File size: 10.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.Windows.Forms;
32using System.Diagnostics;
33
34namespace HeuristicLab.Grid {
35  public class JobExecutionException : ApplicationException {
36    public JobExecutionException(string msg) : base(msg) { }
37  }
38
39  public class JobManager {
40    private const int MAX_RESTARTS = 5;
41    private const int MAX_CONNECTION_RETRIES = 10;
42    private const int RETRY_TIMEOUT_SEC = 60;
43    private const int RESULT_POLLING_TIMEOUT = 5;
44
45    private class Job {
46      public Guid guid;
47      public ProcessingEngine engine;
48      public ManualResetEvent waitHandle;
49      public int restarts;
50    }
51
52    private IGridServer server;
53    private string address;
54    private object waitingQueueLock = new object();
55    private Queue<Job> waitingJobs = new Queue<Job>();
56    private object runningQueueLock = new object();
57    private Queue<Job> runningJobs = new Queue<Job>();
58    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
59
60    private List<IOperation> erroredOperations = new List<IOperation>();
61    private object connectionLock = new object();
62    private object dictionaryLock = new object();
63
64    private AutoResetEvent runningWaitHandle = new AutoResetEvent(false);
65    private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false);
66
67    private ChannelFactory<IGridServer> factory;
68
69    public JobManager(string address) {
70      this.address = address;
71      Thread starterThread = new Thread(StartEngines);
72      Thread resultsGatheringThread = new Thread(GetResults);
73      starterThread.Start();
74      resultsGatheringThread.Start();
75    }
76
77    public void Reset() {
78      ResetConnection();
79      lock(dictionaryLock) {
80        foreach(Job j in waitingJobs) {
81          j.waitHandle.Close();
82        }
83        waitingJobs.Clear();
84        foreach(Job j in runningJobs) {
85          j.waitHandle.Close();
86        }
87        runningJobs.Clear();
88        results.Clear();
89        erroredOperations.Clear();
90      }
91    }
92
93    private void ResetConnection() {
94      Trace.TraceInformation("Reset connection in JobManager");
95      lock(connectionLock) {
96        // open a new channel
97        NetTcpBinding binding = new NetTcpBinding();
98        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
99        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
100        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
101
102        factory = new ChannelFactory<IGridServer>(binding);
103        server = factory.CreateChannel(new EndpointAddress(address));
104      }
105    }
106
107    public void StartEngines() {
108      try {
109        while(true) {
110          Job job = null;
111          lock(waitingQueueLock) {
112            if(waitingJobs.Count > 0) job = waitingJobs.Dequeue();
113          }
114          if(job==null) waitingWaitHandle.WaitOne(); // no jobs waiting
115          else {
116            Guid currentEngineGuid = TryStartExecuteEngine(job.engine);
117            if(currentEngineGuid == Guid.Empty) {
118              // couldn't start the job -> requeue
119              if(job.restarts < MAX_RESTARTS) {
120                job.restarts++;
121                lock(waitingQueueLock) waitingJobs.Enqueue(job);
122                waitingWaitHandle.Set();
123              } else {
124                // max restart count reached -> give up on this job and flag error
125                lock(dictionaryLock) {
126                  erroredOperations.Add(job.engine.InitialOperation);
127                  job.waitHandle.Set();
128                }
129              }
130            } else {
131              // job started successfully
132              job.guid = currentEngineGuid;
133              lock(runningQueueLock) {
134                runningJobs.Enqueue(job);
135                runningWaitHandle.Set();
136              }
137            }
138          }
139        }
140      } catch(Exception e) {
141        Trace.TraceError("Exception "+e+" in JobManager.StartEngines() killed the start-engine thread\n"+e.StackTrace);
142      }
143    }
144
145
146    public void GetResults() {
147      try {
148        while(true) {
149          Job job = null;
150          lock(runningQueueLock) {
151            if(runningJobs.Count > 0) job = runningJobs.Dequeue();
152          }
153          if(job == null) runningWaitHandle.WaitOne(); // no jobs running
154          else {
155            byte[] zippedResult = TryEndExecuteEngine(server, job.guid);
156            if(zippedResult != null) { // successful
157              lock(dictionaryLock) {
158                // store result
159                results[job.engine.InitialOperation] = zippedResult;
160                // notify consumer that result is ready
161                job.waitHandle.Set();
162              }
163            } else {
164              // there was a problem -> check the state of the job and restart if necessary
165              JobState jobState = TryGetJobState(server, job.guid);
166              if(jobState == JobState.Unknown) {
167                job.restarts++;
168                lock(waitingQueueLock) {
169                  waitingJobs.Enqueue(job);
170                  waitingWaitHandle.Set();
171                }
172              } else {
173                // job still active at the server
174                lock(runningQueueLock) {
175                  runningJobs.Enqueue(job);
176                  runningWaitHandle.Set();
177                }
178                Thread.Sleep(TimeSpan.FromSeconds(RESULT_POLLING_TIMEOUT)); // sleep a while before trying to get the next result
179              }
180            }
181          }
182        }
183      } catch(Exception e) {
184        Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n"+ e.StackTrace);
185      }
186    }
187
188    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
189      return BeginExecuteEngine(new ProcessingEngine(globalScope, operation));
190    }
191
192    public WaitHandle BeginExecuteEngine(ProcessingEngine engine) {
193      Job job = new Job();
194      job.engine = engine;
195      job.waitHandle = new ManualResetEvent(false);
196      job.restarts = 0;
197      lock(waitingQueueLock) {
198        waitingJobs.Enqueue(job);
199      }
200      waitingWaitHandle.Set();
201      return job.waitHandle;
202    }
203
204    private byte[] ZipEngine(ProcessingEngine engine) {
205      return PersistenceManager.SaveToGZip(engine);
206    }
207
208    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
209      if(erroredOperations.Contains(operation)) {
210        erroredOperations.Remove(operation);
211        throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
212      } else {
213        byte[] zippedResult = null;
214        lock(dictionaryLock) {
215          zippedResult = results[operation];
216          results.Remove(operation);
217        }
218        // restore the engine
219        return (ProcessingEngine)PersistenceManager.RestoreFromGZip(zippedResult);
220      }
221    }
222
223    private Guid TryStartExecuteEngine(ProcessingEngine engine) {
224      byte[] zippedEngine = ZipEngine(engine);
225      int retries = 0;
226      Guid guid = Guid.Empty;
227      do {
228        try {
229          lock(connectionLock) {
230            guid = server.BeginExecuteEngine(zippedEngine);
231          }
232          return guid;
233        } catch(TimeoutException) {
234          retries++;
235          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
236        } catch(CommunicationException) {
237          ResetConnection();
238          retries++;
239          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
240        }
241      } while(retries < MAX_CONNECTION_RETRIES);
242      Trace.TraceWarning("Reached max connection retries in TryStartExecuteEngine");
243      return Guid.Empty;
244    }
245
246    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
247      int retries = 0;
248      do {
249        try {
250          lock(connectionLock) {
251            byte[] zippedResult = server.TryEndExecuteEngine(engineGuid);
252            return zippedResult;
253          }
254        } catch(TimeoutException) {
255          retries++;
256          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
257        } catch(CommunicationException) {
258          ResetConnection();
259          retries++;
260          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
261        }
262      } while(retries < MAX_CONNECTION_RETRIES);
263      Trace.TraceWarning("Reached max connection retries in TryEndExecuteEngine");
264      return null;
265    }
266
267    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
268      // check if the server is still working on the job
269      int retries = 0;
270      do {
271        try {
272          lock(connectionLock) {
273            JobState jobState = server.JobState(engineGuid);
274            return jobState;
275          }
276        } catch(TimeoutException) {
277          retries++;
278          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
279        } catch(CommunicationException) {
280          ResetConnection();
281          retries++;
282          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
283        }
284      } while(retries < MAX_CONNECTION_RETRIES);
285      Trace.TraceWarning("Reached max connection retries in TryGetJobState");
286      return JobState.Unknown;
287    }
288  }
289}
Note: See TracBrowser for help on using the repository browser.