Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Grid/JobManager.cs @ 402

Last change on this file since 402 was 402, checked in by gkronber, 16 years ago

moved GZip persistence into the PersistenceManager in HeuristicLab.Core because compression is needed in several plugins (CEDMA and Grid)

File size: 10.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.Windows.Forms;
32using System.Diagnostics;
33
34namespace HeuristicLab.Grid {
35  public class JobExecutionException : ApplicationException {
36    public JobExecutionException(string msg) : base(msg) { }
37  }
38
39  public class JobManager {
40    private const int MAX_RESTARTS = 5;
41    private const int MAX_CONNECTION_RETRIES = 10;
42    private const int RETRY_TIMEOUT_SEC = 60;
43    private const int CHECK_RESULTS_TIMEOUT = 3;
44
45    private class Job {
46      public Guid guid;
47      public ProcessingEngine engine;
48      public ManualResetEvent waitHandle;
49      public int restarts;
50    }
51
52    private IGridServer server;
53    private string address;
54    private object waitingQueueLock = new object();
55    private Queue<Job> waitingJobs = new Queue<Job>();
56    private object runningQueueLock = new object();
57    private Queue<Job> runningJobs = new Queue<Job>();
58    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
59
60    private List<IOperation> erroredOperations = new List<IOperation>();
61    private object connectionLock = new object();
62    private object dictionaryLock = new object();
63
64    private AutoResetEvent runningWaitHandle = new AutoResetEvent(false);
65    private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false);
66
67    private ChannelFactory<IGridServer> factory;
68
69    public JobManager(string address) {
70      Trace.Listeners.Clear();
71      Trace.Listeners.Add(new EventLogTraceListener("HeuristicLab.Grid"));
72      this.address = address;
73      Thread starterThread = new Thread(StartEngines);
74      Thread resultsGatheringThread = new Thread(GetResults);
75      starterThread.Start();
76      resultsGatheringThread.Start();
77    }
78
79    public void Reset() {
80      ResetConnection();
81      lock(dictionaryLock) {
82        foreach(Job j in waitingJobs) {
83          j.waitHandle.Close();
84        }
85        waitingJobs.Clear();
86        foreach(Job j in runningJobs) {
87          j.waitHandle.Close();
88        }
89        runningJobs.Clear();
90        results.Clear();
91        erroredOperations.Clear();
92      }
93    }
94
95    private void ResetConnection() {
96      Trace.TraceInformation("Reset connection in JobManager");
97      lock(connectionLock) {
98        // open a new channel
99        NetTcpBinding binding = new NetTcpBinding();
100        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
101        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
102        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
103        binding.Security.Mode = SecurityMode.None;
104
105        factory = new ChannelFactory<IGridServer>(binding);
106        server = factory.CreateChannel(new EndpointAddress(address));
107      }
108    }
109
110    public void StartEngines() {
111      try {
112        while(true) {
113          Job job = null;
114          lock(waitingQueueLock) {
115            if(waitingJobs.Count > 0) job = waitingJobs.Dequeue();
116          }
117          if(job==null) waitingWaitHandle.WaitOne(); // no jobs waiting
118          else {
119            Guid currentEngineGuid = TryStartExecuteEngine(job.engine);
120            if(currentEngineGuid == Guid.Empty) {
121              // couldn't start the job -> requeue
122              if(job.restarts < MAX_RESTARTS) {
123                job.restarts++;
124                lock(waitingQueueLock) waitingJobs.Enqueue(job);
125                waitingWaitHandle.Set();
126              } else {
127                // max restart count reached -> give up on this job and flag error
128                lock(dictionaryLock) {
129                  erroredOperations.Add(job.engine.InitialOperation);
130                  job.waitHandle.Set();
131                }
132              }
133            } else {
134              // job started successfully
135              job.guid = currentEngineGuid;
136              lock(runningQueueLock) {
137                runningJobs.Enqueue(job);
138                runningWaitHandle.Set();
139              }
140            }
141          }
142        }
143      } catch(Exception e) {
144        Trace.TraceError("Exception "+e+" in JobManager.StartEngines() killed the start-engine thread\n"+e.StackTrace);
145      }
146    }
147
148
149    public void GetResults() {
150      try {
151        while(true) {
152          Job job = null;
153          lock(runningQueueLock) {
154            if(runningJobs.Count > 0) job = runningJobs.Dequeue();
155          }
156          if(job == null) runningWaitHandle.WaitOne(); // no jobs running
157          else {
158            byte[] zippedResult = TryEndExecuteEngine(server, job.guid);
159            if(zippedResult != null) { // successful
160              lock(dictionaryLock) {
161                // store result
162                results[job.engine.InitialOperation] = zippedResult;
163                // notify consumer that result is ready
164                job.waitHandle.Set();
165              }
166            } else {
167              // there was a problem -> check the state of the job and restart if necessary
168              JobState jobState = TryGetJobState(server, job.guid);
169              if(jobState == JobState.Unknown) {
170                job.restarts++;
171                lock(waitingQueueLock) {
172                  waitingJobs.Enqueue(job);
173                  waitingWaitHandle.Set();
174                }
175              } else {
176                // job still active at the server
177                lock(runningQueueLock) {
178                  runningJobs.Enqueue(job);
179                  runningWaitHandle.Set();
180                }
181              }
182            }
183          }
184        }
185      } catch(Exception e) {
186        Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n"+ e.StackTrace);
187      }
188    }
189
190    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
191      Job job = new Job();
192      job.engine = new ProcessingEngine(globalScope, operation);
193      job.waitHandle = new ManualResetEvent(false);
194      job.restarts = 0;
195      lock(waitingQueueLock) {
196        waitingJobs.Enqueue(job);
197      }
198      waitingWaitHandle.Set();
199      return job.waitHandle;
200    }
201
202    private byte[] ZipEngine(ProcessingEngine engine) {
203      return PersistenceManager.SaveToGZip(engine);
204    }
205
206    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
207      if(erroredOperations.Contains(operation)) {
208        erroredOperations.Remove(operation);
209        throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
210      } else {
211        byte[] zippedResult = null;
212        lock(dictionaryLock) {
213          zippedResult = results[operation];
214          results.Remove(operation);
215        }
216        // restore the engine
217        return (ProcessingEngine)PersistenceManager.RestoreFromGZip(zippedResult);
218      }
219    }
220
221    private Guid TryStartExecuteEngine(ProcessingEngine engine) {
222      byte[] zippedEngine = ZipEngine(engine);
223      int retries = 0;
224      Guid guid = Guid.Empty;
225      do {
226        try {
227          lock(connectionLock) {
228            guid = server.BeginExecuteEngine(zippedEngine);
229          }
230          return guid;
231        } catch(TimeoutException) {
232          retries++;
233          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
234        } catch(CommunicationException) {
235          ResetConnection();
236          retries++;
237          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
238        }
239      } while(retries < MAX_CONNECTION_RETRIES);
240      Trace.TraceWarning("Reached max connection retries in TryStartExecuteEngine");
241      return Guid.Empty;
242    }
243
244    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
245      int retries = 0;
246      do {
247        try {
248          lock(connectionLock) {
249            byte[] zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
250            return zippedResult;
251          }
252        } catch(TimeoutException) {
253          retries++;
254          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
255        } catch(CommunicationException) {
256          ResetConnection();
257          retries++;
258          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
259        }
260      } while(retries < MAX_CONNECTION_RETRIES);
261      Trace.TraceWarning("Reached max connection retries in TryEndExecuteEngine");
262      return null;
263    }
264
265    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
266      // check if the server is still working on the job
267      int retries = 0;
268      do {
269        try {
270          lock(connectionLock) {
271            JobState jobState = server.JobState(engineGuid);
272            return jobState;
273          }
274        } catch(TimeoutException) {
275          retries++;
276          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
277        } catch(CommunicationException) {
278          ResetConnection();
279          retries++;
280          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
281        }
282      } while(retries < MAX_CONNECTION_RETRIES);
283      Trace.TraceWarning("Reached max connection retries in TryGetJobState");
284      return JobState.Unknown;
285    }
286  }
287}
Note: See TracBrowser for help on using the repository browser.