source: trunk/sources/HeuristicLab.Grid/JobManager.cs @ 1114

Last change on this file since 1114 was 1114, checked in by gkronber, 12 years ago

merged quick fix for #462 (GridClient needs super-user permissions to write to the event-log) (r1113) from CEDMA refactoring into the trunk

File size: 10.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.Windows.Forms;
32using System.Diagnostics;
33
34namespace HeuristicLab.Grid {
35  public class JobExecutionException : ApplicationException {
36    public JobExecutionException(string msg) : base(msg) { }
37  }
38
39  public class JobManager {
40    private const int MAX_RESTARTS = 5;
41    private const int MAX_CONNECTION_RETRIES = 10;
42    private const int RETRY_TIMEOUT_SEC = 60;
43    private const int RESULT_POLLING_TIMEOUT = 5;
44
45    private class Job {
46      public Guid guid;
47      public ProcessingEngine engine;
48      public ManualResetEvent waitHandle;
49      public int restarts;
50    }
51
52    private IGridServer server;
53    private string address;
54    private object waitingQueueLock = new object();
55    private Queue<Job> waitingJobs = new Queue<Job>();
56    private object runningQueueLock = new object();
57    private Queue<Job> runningJobs = new Queue<Job>();
58    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
59
60    private List<IOperation> erroredOperations = new List<IOperation>();
61    private object connectionLock = new object();
62    private object dictionaryLock = new object();
63
64    private AutoResetEvent runningWaitHandle = new AutoResetEvent(false);
65    private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false);
66
67    private ChannelFactory<IGridServer> factory;
68
69    public JobManager(string address) {
70      this.address = address;
71      Thread starterThread = new Thread(StartEngines);
72      Thread resultsGatheringThread = new Thread(GetResults);
73      starterThread.Start();
74      resultsGatheringThread.Start();
75    }
76
77    public void Reset() {
78      ResetConnection();
79      lock(dictionaryLock) {
80        foreach(Job j in waitingJobs) {
81          j.waitHandle.Close();
82        }
83        waitingJobs.Clear();
84        foreach(Job j in runningJobs) {
85          j.waitHandle.Close();
86        }
87        runningJobs.Clear();
88        results.Clear();
89        erroredOperations.Clear();
90      }
91    }
92
93    private void ResetConnection() {
94      Trace.TraceInformation("Reset connection in JobManager");
95      lock(connectionLock) {
96        // open a new channel
97        NetTcpBinding binding = new NetTcpBinding();
98        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
99        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
100        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
101        binding.Security.Mode = SecurityMode.None;
102
103        factory = new ChannelFactory<IGridServer>(binding);
104        server = factory.CreateChannel(new EndpointAddress(address));
105      }
106    }
107
108    public void StartEngines() {
109      try {
110        while(true) {
111          Job job = null;
112          lock(waitingQueueLock) {
113            if(waitingJobs.Count > 0) job = waitingJobs.Dequeue();
114          }
115          if(job==null) waitingWaitHandle.WaitOne(); // no jobs waiting
116          else {
117            Guid currentEngineGuid = TryStartExecuteEngine(job.engine);
118            if(currentEngineGuid == Guid.Empty) {
119              // couldn't start the job -> requeue
120              if(job.restarts < MAX_RESTARTS) {
121                job.restarts++;
122                lock(waitingQueueLock) waitingJobs.Enqueue(job);
123                waitingWaitHandle.Set();
124              } else {
125                // max restart count reached -> give up on this job and flag error
126                lock(dictionaryLock) {
127                  erroredOperations.Add(job.engine.InitialOperation);
128                  job.waitHandle.Set();
129                }
130              }
131            } else {
132              // job started successfully
133              job.guid = currentEngineGuid;
134              lock(runningQueueLock) {
135                runningJobs.Enqueue(job);
136                runningWaitHandle.Set();
137              }
138            }
139          }
140        }
141      } catch(Exception e) {
142        Trace.TraceError("Exception "+e+" in JobManager.StartEngines() killed the start-engine thread\n"+e.StackTrace);
143      }
144    }
145
146
147    public void GetResults() {
148      try {
149        while(true) {
150          Job job = null;
151          lock(runningQueueLock) {
152            if(runningJobs.Count > 0) job = runningJobs.Dequeue();
153          }
154          if(job == null) runningWaitHandle.WaitOne(); // no jobs running
155          else {
156            byte[] zippedResult = TryEndExecuteEngine(server, job.guid);
157            if(zippedResult != null) { // successful
158              lock(dictionaryLock) {
159                // store result
160                results[job.engine.InitialOperation] = zippedResult;
161                // notify consumer that result is ready
162                job.waitHandle.Set();
163              }
164            } else {
165              // there was a problem -> check the state of the job and restart if necessary
166              JobState jobState = TryGetJobState(server, job.guid);
167              if(jobState == JobState.Unknown) {
168                job.restarts++;
169                lock(waitingQueueLock) {
170                  waitingJobs.Enqueue(job);
171                  waitingWaitHandle.Set();
172                }
173              } else {
174                // job still active at the server
175                lock(runningQueueLock) {
176                  runningJobs.Enqueue(job);
177                  runningWaitHandle.Set();
178                }
179                Thread.Sleep(TimeSpan.FromSeconds(RESULT_POLLING_TIMEOUT)); // sleep a while before trying to get the next result
180              }
181            }
182          }
183        }
184      } catch(Exception e) {
185        Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n"+ e.StackTrace);
186      }
187    }
188
189    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
190      return BeginExecuteEngine(new ProcessingEngine(globalScope, operation));
191    }
192
193    public WaitHandle BeginExecuteEngine(ProcessingEngine engine) {
194      Job job = new Job();
195      job.engine = engine;
196      job.waitHandle = new ManualResetEvent(false);
197      job.restarts = 0;
198      lock(waitingQueueLock) {
199        waitingJobs.Enqueue(job);
200      }
201      waitingWaitHandle.Set();
202      return job.waitHandle;
203    }
204
205    private byte[] ZipEngine(ProcessingEngine engine) {
206      return PersistenceManager.SaveToGZip(engine);
207    }
208
209    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
210      if(erroredOperations.Contains(operation)) {
211        erroredOperations.Remove(operation);
212        throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
213      } else {
214        byte[] zippedResult = null;
215        lock(dictionaryLock) {
216          zippedResult = results[operation];
217          results.Remove(operation);
218        }
219        // restore the engine
220        return (ProcessingEngine)PersistenceManager.RestoreFromGZip(zippedResult);
221      }
222    }
223
224    private Guid TryStartExecuteEngine(ProcessingEngine engine) {
225      byte[] zippedEngine = ZipEngine(engine);
226      int retries = 0;
227      Guid guid = Guid.Empty;
228      do {
229        try {
230          lock(connectionLock) {
231            guid = server.BeginExecuteEngine(zippedEngine);
232          }
233          return guid;
234        } catch(TimeoutException) {
235          retries++;
236          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
237        } catch(CommunicationException) {
238          ResetConnection();
239          retries++;
240          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
241        }
242      } while(retries < MAX_CONNECTION_RETRIES);
243      Trace.TraceWarning("Reached max connection retries in TryStartExecuteEngine");
244      return Guid.Empty;
245    }
246
247    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
248      int retries = 0;
249      do {
250        try {
251          lock(connectionLock) {
252            byte[] zippedResult = server.TryEndExecuteEngine(engineGuid);
253            return zippedResult;
254          }
255        } catch(TimeoutException) {
256          retries++;
257          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
258        } catch(CommunicationException) {
259          ResetConnection();
260          retries++;
261          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
262        }
263      } while(retries < MAX_CONNECTION_RETRIES);
264      Trace.TraceWarning("Reached max connection retries in TryEndExecuteEngine");
265      return null;
266    }
267
268    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
269      // check if the server is still working on the job
270      int retries = 0;
271      do {
272        try {
273          lock(connectionLock) {
274            JobState jobState = server.JobState(engineGuid);
275            return jobState;
276          }
277        } catch(TimeoutException) {
278          retries++;
279          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
280        } catch(CommunicationException) {
281          ResetConnection();
282          retries++;
283          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
284        }
285      } while(retries < MAX_CONNECTION_RETRIES);
286      Trace.TraceWarning("Reached max connection retries in TryGetJobState");
287      return JobState.Unknown;
288    }
289  }
290}
Note: See TracBrowser for help on using the repository browser.