Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Grid/JobManager.cs @ 400

Last change on this file since 400 was 391, checked in by gkronber, 16 years ago

improved JobManager code and fixed a typo in the JobState enum. (ticket #188)

File size: 10.1 KB
RevLine 
[265]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
[219]23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.IO.Compression;
32using System.Windows.Forms;
[386]33using System.Diagnostics;
[219]34
[372]35namespace HeuristicLab.Grid {
[391]36  public class JobExecutionException : ApplicationException {
37    public JobExecutionException(string msg) : base(msg) { }
38  }
39
[372]40  public class JobManager {
[386]41    private const int MAX_RESTARTS = 5;
42    private const int MAX_CONNECTION_RETRIES = 10;
43    private const int RETRY_TIMEOUT_SEC = 60;
44    private const int CHECK_RESULTS_TIMEOUT = 3;
45
[391]46    private class Job {
47      public Guid guid;
48      public ProcessingEngine engine;
49      public ManualResetEvent waitHandle;
50      public int restarts;
51    }
52
[219]53    private IGridServer server;
54    private string address;
[386]55    private object waitingQueueLock = new object();
[391]56    private Queue<Job> waitingJobs = new Queue<Job>();
[386]57    private object runningQueueLock = new object();
[391]58    private Queue<Job> runningJobs = new Queue<Job>();
[248]59    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
[386]60
[315]61    private List<IOperation> erroredOperations = new List<IOperation>();
[248]62    private object connectionLock = new object();
63    private object dictionaryLock = new object();
64
[387]65    private AutoResetEvent runningWaitHandle = new AutoResetEvent(false);
66    private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false);
[248]67
[228]68    private ChannelFactory<IGridServer> factory;
[219]69
70    public JobManager(string address) {
71      this.address = address;
[386]72      Thread starterThread = new Thread(StartEngines);
73      Thread resultsGatheringThread = new Thread(GetResults);
74      starterThread.Start();
75      resultsGatheringThread.Start();
[219]76    }
77
[372]78    public void Reset() {
[248]79      ResetConnection();
80      lock(dictionaryLock) {
[391]81        foreach(Job j in waitingJobs) {
82          j.waitHandle.Close();
83        }
84        waitingJobs.Clear();
85        foreach(Job j in runningJobs) {
86          j.waitHandle.Close();
87        }
88        runningJobs.Clear();
[248]89        results.Clear();
[315]90        erroredOperations.Clear();
[219]91      }
92    }
93
[228]94    private void ResetConnection() {
[248]95      lock(connectionLock) {
96        // open a new channel
97        NetTcpBinding binding = new NetTcpBinding();
98        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
99        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
100        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
101        binding.Security.Mode = SecurityMode.None;
102
103        factory = new ChannelFactory<IGridServer>(binding);
104        server = factory.CreateChannel(new EndpointAddress(address));
105      }
[228]106    }
107
[386]108    public void StartEngines() {
109      try {
110        while(true) {
[391]111          Job job = null;
[386]112          lock(waitingQueueLock) {
[391]113            if(waitingJobs.Count > 0) job = waitingJobs.Dequeue();
[248]114          }
[391]115          if(job==null) waitingWaitHandle.WaitOne(); // no jobs waiting
116          else {
117            Guid currentEngineGuid = TryStartExecuteEngine(job.engine);
118            if(currentEngineGuid == Guid.Empty) {
119              // couldn't start the job -> requeue
120              if(job.restarts < MAX_RESTARTS) {
121                job.restarts++;
122                lock(waitingQueueLock) waitingJobs.Enqueue(job);
123                waitingWaitHandle.Set();
[386]124              } else {
[391]125                // max restart count reached -> give up on this job and flag error
126                lock(dictionaryLock) {
127                  erroredOperations.Add(job.engine.InitialOperation);
128                  job.waitHandle.Set();
[386]129                }
130              }
[391]131            } else {
132              // job started successfully
133              job.guid = currentEngineGuid;
134              lock(runningQueueLock) {
135                runningJobs.Enqueue(job);
136                runningWaitHandle.Set();
137              }
[386]138            }
[315]139          }
[386]140        }
141      } finally {
142        Debug.Assert(false);  // make sure that we are notified when this thread is stopped in debugging
143      }
144    }
145
[391]146
[386]147    public void GetResults() {
148      try {
149        while(true) {
[391]150          Job job = null;
[386]151          lock(runningQueueLock) {
[391]152            if(runningJobs.Count > 0) job = runningJobs.Dequeue();
[315]153          }
[391]154          if(job == null) runningWaitHandle.WaitOne(); // no jobs running
155          else {
156            byte[] zippedResult = TryEndExecuteEngine(server, job.guid);
[386]157            if(zippedResult != null) { // successful
158              lock(dictionaryLock) {
159                // store result
[391]160                results[job.engine.InitialOperation] = zippedResult;
161                // notify consumer that result is ready
162                job.waitHandle.Set();
[386]163              }
164            } else {
165              // there was a problem -> check the state of the job and restart if necessary
[391]166              JobState jobState = TryGetJobState(server, job.guid);
167              if(jobState == JobState.Unknown) {
168                job.restarts++;
[386]169                lock(waitingQueueLock) {
[391]170                  waitingJobs.Enqueue(job);
[386]171                  waitingWaitHandle.Set();
172                }
173              } else {
174                // job still active at the server
175                lock(runningQueueLock) {
[391]176                  runningJobs.Enqueue(job);
177                  runningWaitHandle.Set();
[386]178                }
179              }
180            }
181          }
[248]182        }
[386]183      } finally {
184        Debug.Assert(false); // just to make sure that I get notified when debugging whenever this thread is killed somehow
[219]185      }
186    }
187
[386]188    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
[391]189      Job job = new Job();
190      job.engine = new ProcessingEngine(globalScope, operation);
191      job.waitHandle = new ManualResetEvent(false);
192      job.restarts = 0;
[386]193      lock(waitingQueueLock) {
[391]194        waitingJobs.Enqueue(job);
[386]195      }
196      waitingWaitHandle.Set();
[391]197      return job.waitHandle;
[386]198    }
199
[257]200    private byte[] ZipEngine(ProcessingEngine engine) {
201      MemoryStream memStream = new MemoryStream();
202      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
203      PersistenceManager.Save(engine, stream);
204      stream.Close();
205      byte[] zippedEngine = memStream.ToArray();
206      memStream.Close();
207      return zippedEngine;
208    }
209
[281]210    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
[315]211      if(erroredOperations.Contains(operation)) {
212        erroredOperations.Remove(operation);
[391]213        throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
[315]214      } else {
215        byte[] zippedResult = null;
216        lock(dictionaryLock) {
217          zippedResult = results[operation];
218          results.Remove(operation);
219        }
220        // restore the engine
221        using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
222          return (ProcessingEngine)PersistenceManager.Load(stream);
223        }
[256]224      }
[248]225    }
226
[391]227    private Guid TryStartExecuteEngine(ProcessingEngine engine) {
228      byte[] zippedEngine = ZipEngine(engine);
229      int retries = 0;
230      Guid guid = Guid.Empty;
231      do {
232        try {
233          lock(connectionLock) {
234            guid = server.BeginExecuteEngine(zippedEngine);
235          }
236          return guid;
237        } catch(TimeoutException) {
238          retries++;
239          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
240        } catch(CommunicationException) {
241          ResetConnection();
242          retries++;
243          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
244        }
245      } while(retries < MAX_CONNECTION_RETRIES);
246      return Guid.Empty;
247    }
248
[315]249    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
250      int retries = 0;
251      do {
252        try {
253          lock(connectionLock) {
254            byte[] zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
255            return zippedResult;
256          }
[383]257        } catch(TimeoutException) {
[315]258          retries++;
259          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
[383]260        } catch(CommunicationException) {
[315]261          ResetConnection();
262          retries++;
263          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
264        }
265      } while(retries < MAX_CONNECTION_RETRIES);
266      return null;
267    }
268
269    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
270      // check if the server is still working on the job
271      int retries = 0;
272      do {
273        try {
274          lock(connectionLock) {
275            JobState jobState = server.JobState(engineGuid);
276            return jobState;
277          }
[383]278        } catch(TimeoutException) {
[315]279          retries++;
280          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
[383]281        } catch(CommunicationException) {
[315]282          ResetConnection();
283          retries++;
284          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
285        }
286      } while(retries < MAX_CONNECTION_RETRIES);
[391]287      return JobState.Unknown;
[315]288    }
[219]289  }
290}
Note: See TracBrowser for help on using the repository browser.