Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Grid/JobManager.cs @ 506

Last change on this file since 506 was 502, checked in by gkronber, 16 years ago

fixed a few bugs (#197 Use SQLite backend to store waiting engines and results instead of in-memory dictionaries)

File size: 10.3 KB
RevLine 
[265]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
[219]23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.Windows.Forms;
[386]32using System.Diagnostics;
[219]33
[372]34namespace HeuristicLab.Grid {
[391]35  public class JobExecutionException : ApplicationException {
36    public JobExecutionException(string msg) : base(msg) { }
37  }
38
[372]39  public class JobManager {
[386]40    private const int MAX_RESTARTS = 5;
41    private const int MAX_CONNECTION_RETRIES = 10;
42    private const int RETRY_TIMEOUT_SEC = 60;
[502]43    private const int RESULT_POLLING_TIMEOUT = 5;
[386]44
[391]45    private class Job {
46      public Guid guid;
47      public ProcessingEngine engine;
48      public ManualResetEvent waitHandle;
49      public int restarts;
50    }
51
[219]52    private IGridServer server;
53    private string address;
[386]54    private object waitingQueueLock = new object();
[391]55    private Queue<Job> waitingJobs = new Queue<Job>();
[386]56    private object runningQueueLock = new object();
[391]57    private Queue<Job> runningJobs = new Queue<Job>();
[248]58    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
[386]59
[315]60    private List<IOperation> erroredOperations = new List<IOperation>();
[248]61    private object connectionLock = new object();
62    private object dictionaryLock = new object();
63
[387]64    private AutoResetEvent runningWaitHandle = new AutoResetEvent(false);
65    private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false);
[248]66
[228]67    private ChannelFactory<IGridServer> factory;
[219]68
69    public JobManager(string address) {
[402]70      Trace.Listeners.Clear();
71      Trace.Listeners.Add(new EventLogTraceListener("HeuristicLab.Grid"));
[219]72      this.address = address;
[386]73      Thread starterThread = new Thread(StartEngines);
74      Thread resultsGatheringThread = new Thread(GetResults);
75      starterThread.Start();
76      resultsGatheringThread.Start();
[219]77    }
78
[372]79    public void Reset() {
[248]80      ResetConnection();
81      lock(dictionaryLock) {
[391]82        foreach(Job j in waitingJobs) {
83          j.waitHandle.Close();
84        }
85        waitingJobs.Clear();
86        foreach(Job j in runningJobs) {
87          j.waitHandle.Close();
88        }
89        runningJobs.Clear();
[248]90        results.Clear();
[315]91        erroredOperations.Clear();
[219]92      }
93    }
94
[228]95    private void ResetConnection() {
[402]96      Trace.TraceInformation("Reset connection in JobManager");
[248]97      lock(connectionLock) {
98        // open a new channel
99        NetTcpBinding binding = new NetTcpBinding();
100        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
101        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
102        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
103        binding.Security.Mode = SecurityMode.None;
104
105        factory = new ChannelFactory<IGridServer>(binding);
106        server = factory.CreateChannel(new EndpointAddress(address));
107      }
[228]108    }
109
[386]110    public void StartEngines() {
111      try {
112        while(true) {
[391]113          Job job = null;
[386]114          lock(waitingQueueLock) {
[391]115            if(waitingJobs.Count > 0) job = waitingJobs.Dequeue();
[248]116          }
[391]117          if(job==null) waitingWaitHandle.WaitOne(); // no jobs waiting
118          else {
119            Guid currentEngineGuid = TryStartExecuteEngine(job.engine);
120            if(currentEngineGuid == Guid.Empty) {
121              // couldn't start the job -> requeue
122              if(job.restarts < MAX_RESTARTS) {
123                job.restarts++;
124                lock(waitingQueueLock) waitingJobs.Enqueue(job);
125                waitingWaitHandle.Set();
[386]126              } else {
[391]127                // max restart count reached -> give up on this job and flag error
128                lock(dictionaryLock) {
129                  erroredOperations.Add(job.engine.InitialOperation);
130                  job.waitHandle.Set();
[386]131                }
132              }
[391]133            } else {
134              // job started successfully
135              job.guid = currentEngineGuid;
136              lock(runningQueueLock) {
137                runningJobs.Enqueue(job);
138                runningWaitHandle.Set();
139              }
[386]140            }
[315]141          }
[386]142        }
[402]143      } catch(Exception e) {
144        Trace.TraceError("Exception "+e+" in JobManager.StartEngines() killed the start-engine thread\n"+e.StackTrace);
[386]145      }
146    }
147
[391]148
[386]149    public void GetResults() {
150      try {
151        while(true) {
[502]152          Thread.Sleep(TimeSpan.FromSeconds(RESULT_POLLING_TIMEOUT));
[391]153          Job job = null;
[386]154          lock(runningQueueLock) {
[391]155            if(runningJobs.Count > 0) job = runningJobs.Dequeue();
[315]156          }
[391]157          if(job == null) runningWaitHandle.WaitOne(); // no jobs running
158          else {
159            byte[] zippedResult = TryEndExecuteEngine(server, job.guid);
[386]160            if(zippedResult != null) { // successful
161              lock(dictionaryLock) {
162                // store result
[391]163                results[job.engine.InitialOperation] = zippedResult;
164                // notify consumer that result is ready
165                job.waitHandle.Set();
[386]166              }
167            } else {
168              // there was a problem -> check the state of the job and restart if necessary
[391]169              JobState jobState = TryGetJobState(server, job.guid);
170              if(jobState == JobState.Unknown) {
171                job.restarts++;
[386]172                lock(waitingQueueLock) {
[391]173                  waitingJobs.Enqueue(job);
[386]174                  waitingWaitHandle.Set();
175                }
176              } else {
177                // job still active at the server
178                lock(runningQueueLock) {
[391]179                  runningJobs.Enqueue(job);
180                  runningWaitHandle.Set();
[386]181                }
182              }
183            }
184          }
[248]185        }
[402]186      } catch(Exception e) {
187        Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n"+ e.StackTrace);
[219]188      }
189    }
190
[386]191    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
[414]192      return BeginExecuteEngine(new ProcessingEngine(globalScope, operation));
193    }
194
195    public WaitHandle BeginExecuteEngine(ProcessingEngine engine) {
[391]196      Job job = new Job();
[414]197      job.engine = engine;
[391]198      job.waitHandle = new ManualResetEvent(false);
199      job.restarts = 0;
[386]200      lock(waitingQueueLock) {
[391]201        waitingJobs.Enqueue(job);
[386]202      }
203      waitingWaitHandle.Set();
[391]204      return job.waitHandle;
[386]205    }
206
[257]207    private byte[] ZipEngine(ProcessingEngine engine) {
[402]208      return PersistenceManager.SaveToGZip(engine);
[257]209    }
210
[281]211    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
[315]212      if(erroredOperations.Contains(operation)) {
213        erroredOperations.Remove(operation);
[391]214        throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
[315]215      } else {
216        byte[] zippedResult = null;
217        lock(dictionaryLock) {
218          zippedResult = results[operation];
219          results.Remove(operation);
220        }
221        // restore the engine
[402]222        return (ProcessingEngine)PersistenceManager.RestoreFromGZip(zippedResult);
[256]223      }
[248]224    }
225
[391]226    private Guid TryStartExecuteEngine(ProcessingEngine engine) {
227      byte[] zippedEngine = ZipEngine(engine);
228      int retries = 0;
229      Guid guid = Guid.Empty;
230      do {
231        try {
232          lock(connectionLock) {
233            guid = server.BeginExecuteEngine(zippedEngine);
234          }
235          return guid;
236        } catch(TimeoutException) {
237          retries++;
238          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
239        } catch(CommunicationException) {
240          ResetConnection();
241          retries++;
242          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
243        }
244      } while(retries < MAX_CONNECTION_RETRIES);
[402]245      Trace.TraceWarning("Reached max connection retries in TryStartExecuteEngine");
[391]246      return Guid.Empty;
247    }
248
[315]249    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
250      int retries = 0;
251      do {
252        try {
253          lock(connectionLock) {
[501]254            byte[] zippedResult = server.TryEndExecuteEngine(engineGuid);
[315]255            return zippedResult;
256          }
[383]257        } catch(TimeoutException) {
[315]258          retries++;
259          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
[383]260        } catch(CommunicationException) {
[315]261          ResetConnection();
262          retries++;
263          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
264        }
265      } while(retries < MAX_CONNECTION_RETRIES);
[402]266      Trace.TraceWarning("Reached max connection retries in TryEndExecuteEngine");
[315]267      return null;
268    }
269
270    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
271      // check if the server is still working on the job
272      int retries = 0;
273      do {
274        try {
275          lock(connectionLock) {
276            JobState jobState = server.JobState(engineGuid);
277            return jobState;
278          }
[383]279        } catch(TimeoutException) {
[315]280          retries++;
281          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
[383]282        } catch(CommunicationException) {
[315]283          ResetConnection();
284          retries++;
285          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
286        }
287      } while(retries < MAX_CONNECTION_RETRIES);
[402]288      Trace.TraceWarning("Reached max connection retries in TryGetJobState");
[391]289      return JobState.Unknown;
[315]290    }
[219]291  }
292}
Note: See TracBrowser for help on using the repository browser.