Free cookie consent management tool by TermsFeed Policy Generator

source: branches/CEDMA-Refactoring-Ticket419/HeuristicLab.Grid/JobManager.cs @ 1130

Last change on this file since 1130 was 1130, checked in by gkronber, 15 years ago
  • re-enabled security for all WCF services;
  • debugged rdf queries
  • added tracing for WCF services
  • implemented simple version of quality based dispatching
  • extended ontology to include a number of predefined model-attributes (quality, size, evaluated solutions etc.)

ticket: #419 (Refactor CEDMA plugins)

File size: 10.2 KB
RevLine 
[265]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
[219]23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.Windows.Forms;
[386]32using System.Diagnostics;
[219]33
[372]34namespace HeuristicLab.Grid {
[391]35  public class JobExecutionException : ApplicationException {
36    public JobExecutionException(string msg) : base(msg) { }
37  }
38
[372]39  public class JobManager {
[386]40    private const int MAX_RESTARTS = 5;
41    private const int MAX_CONNECTION_RETRIES = 10;
42    private const int RETRY_TIMEOUT_SEC = 60;
[502]43    private const int RESULT_POLLING_TIMEOUT = 5;
[386]44
[391]45    private class Job {
46      public Guid guid;
47      public ProcessingEngine engine;
48      public ManualResetEvent waitHandle;
49      public int restarts;
50    }
51
[219]52    private IGridServer server;
53    private string address;
[386]54    private object waitingQueueLock = new object();
[391]55    private Queue<Job> waitingJobs = new Queue<Job>();
[386]56    private object runningQueueLock = new object();
[391]57    private Queue<Job> runningJobs = new Queue<Job>();
[248]58    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
[386]59
[315]60    private List<IOperation> erroredOperations = new List<IOperation>();
[248]61    private object connectionLock = new object();
62    private object dictionaryLock = new object();
63
[387]64    private AutoResetEvent runningWaitHandle = new AutoResetEvent(false);
65    private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false);
[248]66
[228]67    private ChannelFactory<IGridServer> factory;
[219]68
69    public JobManager(string address) {
70      this.address = address;
[386]71      Thread starterThread = new Thread(StartEngines);
72      Thread resultsGatheringThread = new Thread(GetResults);
73      starterThread.Start();
74      resultsGatheringThread.Start();
[219]75    }
76
[372]77    public void Reset() {
[248]78      ResetConnection();
79      lock(dictionaryLock) {
[391]80        foreach(Job j in waitingJobs) {
81          j.waitHandle.Close();
82        }
83        waitingJobs.Clear();
84        foreach(Job j in runningJobs) {
85          j.waitHandle.Close();
86        }
87        runningJobs.Clear();
[248]88        results.Clear();
[315]89        erroredOperations.Clear();
[219]90      }
91    }
92
[228]93    private void ResetConnection() {
[402]94      Trace.TraceInformation("Reset connection in JobManager");
[248]95      lock(connectionLock) {
96        // open a new channel
97        NetTcpBinding binding = new NetTcpBinding();
98        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
99        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
100        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
101
102        factory = new ChannelFactory<IGridServer>(binding);
103        server = factory.CreateChannel(new EndpointAddress(address));
104      }
[228]105    }
106
[386]107    public void StartEngines() {
108      try {
109        while(true) {
[391]110          Job job = null;
[386]111          lock(waitingQueueLock) {
[391]112            if(waitingJobs.Count > 0) job = waitingJobs.Dequeue();
[248]113          }
[391]114          if(job==null) waitingWaitHandle.WaitOne(); // no jobs waiting
115          else {
116            Guid currentEngineGuid = TryStartExecuteEngine(job.engine);
117            if(currentEngineGuid == Guid.Empty) {
118              // couldn't start the job -> requeue
119              if(job.restarts < MAX_RESTARTS) {
120                job.restarts++;
121                lock(waitingQueueLock) waitingJobs.Enqueue(job);
122                waitingWaitHandle.Set();
[386]123              } else {
[391]124                // max restart count reached -> give up on this job and flag error
125                lock(dictionaryLock) {
126                  erroredOperations.Add(job.engine.InitialOperation);
127                  job.waitHandle.Set();
[386]128                }
129              }
[391]130            } else {
131              // job started successfully
132              job.guid = currentEngineGuid;
133              lock(runningQueueLock) {
134                runningJobs.Enqueue(job);
135                runningWaitHandle.Set();
136              }
[386]137            }
[315]138          }
[386]139        }
[402]140      } catch(Exception e) {
141        Trace.TraceError("Exception "+e+" in JobManager.StartEngines() killed the start-engine thread\n"+e.StackTrace);
[386]142      }
143    }
144
[391]145
[386]146    public void GetResults() {
147      try {
148        while(true) {
[391]149          Job job = null;
[386]150          lock(runningQueueLock) {
[391]151            if(runningJobs.Count > 0) job = runningJobs.Dequeue();
[315]152          }
[391]153          if(job == null) runningWaitHandle.WaitOne(); // no jobs running
154          else {
155            byte[] zippedResult = TryEndExecuteEngine(server, job.guid);
[386]156            if(zippedResult != null) { // successful
157              lock(dictionaryLock) {
158                // store result
[391]159                results[job.engine.InitialOperation] = zippedResult;
160                // notify consumer that result is ready
161                job.waitHandle.Set();
[386]162              }
163            } else {
164              // there was a problem -> check the state of the job and restart if necessary
[391]165              JobState jobState = TryGetJobState(server, job.guid);
166              if(jobState == JobState.Unknown) {
167                job.restarts++;
[386]168                lock(waitingQueueLock) {
[391]169                  waitingJobs.Enqueue(job);
[386]170                  waitingWaitHandle.Set();
171                }
172              } else {
173                // job still active at the server
174                lock(runningQueueLock) {
[391]175                  runningJobs.Enqueue(job);
176                  runningWaitHandle.Set();
[386]177                }
[520]178                Thread.Sleep(TimeSpan.FromSeconds(RESULT_POLLING_TIMEOUT)); // sleep a while before trying to get the next result
[386]179              }
180            }
181          }
[248]182        }
[402]183      } catch(Exception e) {
184        Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n"+ e.StackTrace);
[219]185      }
186    }
187
[386]188    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
[414]189      return BeginExecuteEngine(new ProcessingEngine(globalScope, operation));
190    }
191
192    public WaitHandle BeginExecuteEngine(ProcessingEngine engine) {
[391]193      Job job = new Job();
[414]194      job.engine = engine;
[391]195      job.waitHandle = new ManualResetEvent(false);
196      job.restarts = 0;
[386]197      lock(waitingQueueLock) {
[391]198        waitingJobs.Enqueue(job);
[386]199      }
200      waitingWaitHandle.Set();
[391]201      return job.waitHandle;
[386]202    }
203
[257]204    private byte[] ZipEngine(ProcessingEngine engine) {
[402]205      return PersistenceManager.SaveToGZip(engine);
[257]206    }
207
[281]208    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
[315]209      if(erroredOperations.Contains(operation)) {
210        erroredOperations.Remove(operation);
[391]211        throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
[315]212      } else {
213        byte[] zippedResult = null;
214        lock(dictionaryLock) {
215          zippedResult = results[operation];
216          results.Remove(operation);
217        }
218        // restore the engine
[402]219        return (ProcessingEngine)PersistenceManager.RestoreFromGZip(zippedResult);
[256]220      }
[248]221    }
222
[391]223    private Guid TryStartExecuteEngine(ProcessingEngine engine) {
224      byte[] zippedEngine = ZipEngine(engine);
225      int retries = 0;
226      Guid guid = Guid.Empty;
227      do {
228        try {
229          lock(connectionLock) {
230            guid = server.BeginExecuteEngine(zippedEngine);
231          }
232          return guid;
233        } catch(TimeoutException) {
234          retries++;
235          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
236        } catch(CommunicationException) {
237          ResetConnection();
238          retries++;
239          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
240        }
241      } while(retries < MAX_CONNECTION_RETRIES);
[402]242      Trace.TraceWarning("Reached max connection retries in TryStartExecuteEngine");
[391]243      return Guid.Empty;
244    }
245
[315]246    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
247      int retries = 0;
248      do {
249        try {
250          lock(connectionLock) {
[501]251            byte[] zippedResult = server.TryEndExecuteEngine(engineGuid);
[315]252            return zippedResult;
253          }
[383]254        } catch(TimeoutException) {
[315]255          retries++;
256          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
[383]257        } catch(CommunicationException) {
[315]258          ResetConnection();
259          retries++;
260          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
261        }
262      } while(retries < MAX_CONNECTION_RETRIES);
[402]263      Trace.TraceWarning("Reached max connection retries in TryEndExecuteEngine");
[315]264      return null;
265    }
266
267    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
268      // check if the server is still working on the job
269      int retries = 0;
270      do {
271        try {
272          lock(connectionLock) {
273            JobState jobState = server.JobState(engineGuid);
274            return jobState;
275          }
[383]276        } catch(TimeoutException) {
[315]277          retries++;
278          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
[383]279        } catch(CommunicationException) {
[315]280          ResetConnection();
281          retries++;
282          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
283        }
284      } while(retries < MAX_CONNECTION_RETRIES);
[402]285      Trace.TraceWarning("Reached max connection retries in TryGetJobState");
[391]286      return JobState.Unknown;
[315]287    }
[219]288  }
289}
Note: See TracBrowser for help on using the repository browser.