Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive_Milestone3/sources/HeuristicLab.Grid/3.2/JobManager.cs @ 5511

Last change on this file since 5511 was 2073, checked in by gkronber, 16 years ago

Fixed bugs in preparation of engines for execution on hive. Used HL.Tracing instead of trace statements. #642 (Hive backend for CEDMA)

File size: 6.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.Windows.Forms;
32using System.Diagnostics;
33
34namespace HeuristicLab.Grid {
35  public class JobExecutionException : ApplicationException {
36    public JobExecutionException(string msg) : base(msg) { }
37  }
38
39  public class JobManager {
40    private const int MAX_RESTARTS = 5;
41    private const int RESULT_POLLING_TIMEOUT = 5;
42
43    private IGridServer server;
44    private object waitingQueueLock = new object();
45    private Queue<AsyncGridResult> waitingJobs = new Queue<AsyncGridResult>();
46    private object runningQueueLock = new object();
47    private Queue<AsyncGridResult> runningJobs = new Queue<AsyncGridResult>();
48    private AutoResetEvent runningWaitHandle = new AutoResetEvent(false);
49    private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false);
50
51
52    public JobManager(IGridServer server) {
53      this.server = server;
54      Thread starterThread = new Thread(StartEngines);
55      Thread resultsGatheringThread = new Thread(GetResults);
56      starterThread.Start();
57      resultsGatheringThread.Start();
58    }
59
60    public void Reset() {
61      lock (waitingQueueLock) {
62        foreach (AsyncGridResult r in waitingJobs) {
63          r.WaitHandle.Close();
64        }
65        waitingJobs.Clear();
66      }
67      lock (runningQueueLock) {
68        foreach (AsyncGridResult r in runningJobs) {
69          r.WaitHandle.Close();
70        }
71        runningJobs.Clear();
72      }
73    }
74
75
76    public void StartEngines() {
77      try {
78        while (true) {
79          AsyncGridResult job = null;
80          lock (waitingQueueLock) {
81            if (waitingJobs.Count > 0) job = waitingJobs.Dequeue();
82          }
83          if (job == null) waitingWaitHandle.WaitOne(); // no jobs waiting
84          else {
85            Guid currentEngineGuid = server.BeginExecuteEngine(ZipEngine(job.Engine));
86            if (currentEngineGuid == Guid.Empty) {
87              // couldn't start the job -> requeue
88              if (job.Restarts < MAX_RESTARTS) {
89                job.Restarts++;
90                lock (waitingQueueLock) waitingJobs.Enqueue(job);
91                waitingWaitHandle.Set();
92              } else {
93                // max restart count reached -> give up on this job and flag error
94                job.Aborted = true;
95                job.SignalFinished();
96              }
97            } else {
98              // job started successfully
99              job.Guid = currentEngineGuid;
100              lock (runningQueueLock) {
101                runningJobs.Enqueue(job);
102                runningWaitHandle.Set();
103              }
104            }
105          }
106        }
107      }
108      catch (Exception e) {
109        HeuristicLab.Tracing.Logger.Error("Exception " + e + " in JobManager.StartEngines() killed the start-engine thread\n" + e.StackTrace);
110      }
111    }
112
113
114    public void GetResults() {
115      try {
116        while (true) {
117          AsyncGridResult job = null;
118          lock (runningQueueLock) {
119            if (runningJobs.Count > 0) job = runningJobs.Dequeue();
120          }
121          if (job == null) runningWaitHandle.WaitOne(); // no jobs running
122          else {
123            byte[] zippedResult = server.TryEndExecuteEngine(job.Guid);
124            if (zippedResult != null) {
125              // successful => store result
126              job.ZippedResult = zippedResult;
127              // notify consumer that result is ready
128              job.SignalFinished();
129            } else {
130              // there was a problem -> check the state of the job and restart if necessary
131              JobState jobState = server.JobState(job.Guid);
132              if (jobState == JobState.Unknown) {
133                job.Restarts++;
134                lock (waitingQueueLock) {
135                  waitingJobs.Enqueue(job);
136                  waitingWaitHandle.Set();
137                }
138              } else {
139                // job still active at the server
140                lock (runningQueueLock) {
141                  runningJobs.Enqueue(job);
142                  runningWaitHandle.Set();
143                }
144                Thread.Sleep(TimeSpan.FromSeconds(RESULT_POLLING_TIMEOUT)); // sleep a while before trying to get the next result
145              }
146            }
147          }
148        }
149      }
150      catch (Exception e) {
151        HeuristicLab.Tracing.Logger.Error("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n" + e.StackTrace);
152      }
153    }
154
155    public AsyncGridResult BeginExecuteEngine(ProcessingEngine engine) {
156      AsyncGridResult asyncResult = new AsyncGridResult(engine);
157      asyncResult.Engine = engine;
158      lock (waitingQueueLock) {
159        waitingJobs.Enqueue(asyncResult);
160      }
161      waitingWaitHandle.Set();
162      return asyncResult;
163    }
164
165    private byte[] ZipEngine(IEngine engine) {
166      return PersistenceManager.SaveToGZip(engine);
167    }
168
169    public IEngine EndExecuteEngine(AsyncGridResult asyncResult) {
170      if (asyncResult.Aborted) {
171        throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
172      } else {
173        // restore the engine
174        return (IEngine)PersistenceManager.RestoreFromGZip(asyncResult.ZippedResult);
175      }
176    }
177  }
178}
Note: See TracBrowser for help on using the repository browser.