Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Grid/3.2/JobManager.cs @ 2058

Last change on this file since 2058 was 2058, checked in by gkronber, 15 years ago

Refactored JobManager and added a plugin that contains a bridge between grid and hive. The bridge allows to use the execution engine service of Hive as a grid server. This way CEDMA job execution and DistributedEngine job execution can either use Hive or Grid as backend. #642 (Hive backend for CEDMA)

File size: 6.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.Windows.Forms;
32using System.Diagnostics;
33
34namespace HeuristicLab.Grid {
35  public class JobExecutionException : ApplicationException {
36    public JobExecutionException(string msg) : base(msg) { }
37  }
38
39  public class JobManager {
40    private const int MAX_RESTARTS = 5;
41    private const int RESULT_POLLING_TIMEOUT = 5;
42
43    private IGridServer server;
44    private string address;
45    private object waitingQueueLock = new object();
46    private Queue<AsyncGridResult> waitingJobs = new Queue<AsyncGridResult>();
47    private object runningQueueLock = new object();
48    private Queue<AsyncGridResult> runningJobs = new Queue<AsyncGridResult>();
49    private AutoResetEvent runningWaitHandle = new AutoResetEvent(false);
50    private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false);
51
52
53    public JobManager(IGridServer server) {
54      this.server = server;
55      Thread starterThread = new Thread(StartEngines);
56      Thread resultsGatheringThread = new Thread(GetResults);
57      starterThread.Start();
58      resultsGatheringThread.Start();
59    }
60
61    public void Reset() {
62      lock (waitingQueueLock) {
63        foreach (AsyncGridResult r in waitingJobs) {
64          r.WaitHandle.Close();
65        }
66        waitingJobs.Clear();
67      }
68      lock (runningQueueLock) {
69        foreach (AsyncGridResult r in runningJobs) {
70          r.WaitHandle.Close();
71        }
72        runningJobs.Clear();
73      }
74    }
75
76
77    public void StartEngines() {
78      try {
79        while (true) {
80          AsyncGridResult job = null;
81          lock (waitingQueueLock) {
82            if (waitingJobs.Count > 0) job = waitingJobs.Dequeue();
83          }
84          if (job == null) waitingWaitHandle.WaitOne(); // no jobs waiting
85          else {
86            Guid currentEngineGuid = server.BeginExecuteEngine(ZipEngine(job.Engine));
87            if (currentEngineGuid == Guid.Empty) {
88              // couldn't start the job -> requeue
89              if (job.Restarts < MAX_RESTARTS) {
90                job.Restarts++;
91                lock (waitingQueueLock) waitingJobs.Enqueue(job);
92                waitingWaitHandle.Set();
93              } else {
94                // max restart count reached -> give up on this job and flag error
95                job.Aborted = true;
96                job.SignalFinished();
97              }
98            } else {
99              // job started successfully
100              job.Guid = currentEngineGuid;
101              lock (runningQueueLock) {
102                runningJobs.Enqueue(job);
103                runningWaitHandle.Set();
104              }
105            }
106          }
107        }
108      }
109      catch (Exception e) {
110        Trace.TraceError("Exception " + e + " in JobManager.StartEngines() killed the start-engine thread\n" + e.StackTrace);
111      }
112    }
113
114
115    public void GetResults() {
116      try {
117        while (true) {
118          AsyncGridResult job = null;
119          lock (runningQueueLock) {
120            if (runningJobs.Count > 0) job = runningJobs.Dequeue();
121          }
122          if (job == null) runningWaitHandle.WaitOne(); // no jobs running
123          else {
124            byte[] zippedResult = server.TryEndExecuteEngine(job.Guid);
125            if (zippedResult != null) {
126              // successful => store result
127              job.ZippedResult = zippedResult;
128              // notify consumer that result is ready
129              job.SignalFinished();
130            } else {
131              // there was a problem -> check the state of the job and restart if necessary
132              JobState jobState = server.JobState(job.Guid);
133              if (jobState == JobState.Unknown) {
134                job.Restarts++;
135                lock (waitingQueueLock) {
136                  waitingJobs.Enqueue(job);
137                  waitingWaitHandle.Set();
138                }
139              } else {
140                // job still active at the server
141                lock (runningQueueLock) {
142                  runningJobs.Enqueue(job);
143                  runningWaitHandle.Set();
144                }
145                Thread.Sleep(TimeSpan.FromSeconds(RESULT_POLLING_TIMEOUT)); // sleep a while before trying to get the next result
146              }
147            }
148          }
149        }
150      }
151      catch (Exception e) {
152        Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n" + e.StackTrace);
153      }
154    }
155
156    public AsyncGridResult BeginExecuteEngine(ProcessingEngine engine) {
157      AsyncGridResult asyncResult = new AsyncGridResult(engine);
158      asyncResult.Engine = engine;
159      lock (waitingQueueLock) {
160        waitingJobs.Enqueue(asyncResult);
161      }
162      waitingWaitHandle.Set();
163      return asyncResult;
164    }
165
166    private byte[] ZipEngine(IEngine engine) {
167      return PersistenceManager.SaveToGZip(engine);
168    }
169
170    public IEngine EndExecuteEngine(AsyncGridResult asyncResult) {
171      if (asyncResult.Aborted) {
172        throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
173      } else {
174        // restore the engine
175        return (IEngine)PersistenceManager.RestoreFromGZip(asyncResult.ZippedResult);
176      }
177    }
178  }
179}
Note: See TracBrowser for help on using the repository browser.