Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5137

Last change on this file since 5137 was 5137, checked in by ascheibe, 14 years ago

#1233

  • more tests for the hive slave
  • implemented a better way to write tests for the slave
  • get rid of MessageTypes for core and executor
  • various improvements in core and executor (better communication, bugfixes,...)
File size: 13.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.CompilerServices;
26using System.Threading;
27using HeuristicLab.Clients.Hive.Slave;
28using HeuristicLab.Common;
29using HeuristicLab.Core;
30using HeuristicLab.Services.Hive.Common;
31using HeuristicLab.Services.Hive.Common.DataTransfer;
32
33
34namespace HeuristicLab.Clients.Hive.Salve {
35  /// <summary>
36  /// The core component of the Hive Client
37  /// </summary>
38  public class Core : MarshalByRefObject {
39    public static bool abortRequested { get; set; }
40    public static ILog Log { get; set; }
41
42    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
43    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
44    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
45
46    private WcfService wcfService;
47    public HeartbeatManager heartbeatManager;
48
49    private int coreThreadId;
50
51    public Dictionary<Guid, Executor> ExecutionEngines {
52      get { return engines; }
53    }
54
55    internal Dictionary<Guid, Job> Jobs {
56      get { return jobs; }
57    }
58
59    public Core() {
60      coreThreadId = Thread.CurrentThread.ManagedThreadId;
61    }
62
63    /// <summary>
64    /// Main Method for the client
65    /// </summary>
66    public void Start() {
67      abortRequested = false;
68      Logger.Info("Hive Slave started");
69      //TODO: implement slave console server
70      //SlaveConsoleServer server = new SlaveConsoleServer();
71      //server.Start();
72
73      ConfigManager manager = ConfigManager.Instance;
74      manager.Core = this;
75
76      wcfService = WcfService.Instance;
77      RegisterServiceEvents();
78
79      StartHeartbeats(); // Start heartbeats thread
80      DispatchMessageQueue(); // dispatch messages until abortRequested
81
82      DeRegisterServiceEvents();
83
84      //server.Close();
85      Logger.Info("Program shutdown");
86    }
87
88    private void StartHeartbeats() {
89      //Initialize the heartbeat     
90      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
91      heartbeatManager.StartHeartbeat();
92    }
93
94    private void DispatchMessageQueue() {
95      MessageQueue queue = MessageQueue.GetInstance();
96      while (!abortRequested) {
97        MessageContainer container = queue.GetMessage();
98        DetermineAction(container);
99      }
100    }
101
102    private void RegisterServiceEvents() {
103      //TODO
104    }
105
106    private void DeRegisterServiceEvents() {
107      //TODO
108    }
109
110    /// <summary>
111    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
112    /// </summary>
113    /// <param name="container">The Container, containing the message</param>
114    private void DetermineAction(MessageContainer container) {
115      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
116      //TODO: find a better solution
117      if (container is ExecutorMessageContainer<Guid>) {
118        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
119        c.execute();
120      } else if (container is MessageContainer) {
121        switch (container.Message) {
122          //Server requests to abort a job
123          case MessageContainer.MessageType.AbortJob:
124            if (engines.ContainsKey(container.JobId))
125              try {
126                engines[container.JobId].Abort();
127              }
128              catch (AppDomainUnloadedException) {
129                // appdomain already unloaded. Finishing job probably ongoing
130              } else
131              Logger.Error("AbortJob: Engine doesn't exist");
132            break;
133
134          //Pull a Job from the Server
135          case MessageContainer.MessageType.AquireJob:
136            Job myJob = wcfService.AquireJob();
137            //TODO: handle in own thread!!
138            JobData jobData = wcfService.GetJobData(myJob.Id);
139            StartJobInAppDomain(myJob, jobData);
140            break;
141
142          //Hard shutdown of the client
143          case MessageContainer.MessageType.ShutdownSlave:
144            ShutdownCore();
145            break;
146        }
147      } else {
148        Logger.Warn("Unknown MessageContainer: " + container);
149      }
150    }
151
152    public void ShutdownCore() {
153      Logger.Info("Shutdown Signal received");
154      Logger.Debug("Stopping heartbeat");
155      heartbeatManager.StopHeartBeat();
156      abortRequested = true;
157      Logger.Debug("Logging out");
158
159      lock (engines) {
160        Logger.Debug("engines locked");
161        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
162          Logger.Debug("Shutting down Appdomain for " + kvp.Key);
163          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
164          AppDomain.Unload(kvp.Value);
165        }
166      }
167      WcfService.Instance.Disconnect();
168    }
169
170    /// <summary>
171    /// Pauses a job, which means sending it to the server and killing it locally;
172    /// atm only used when executor is waiting for child jobs
173    /// </summary>
174    /// <param name="data"></param>
175    [MethodImpl(MethodImplOptions.Synchronized)]
176    public void PauseJob(JobData data) {
177      if (!Jobs.ContainsKey(data.JobId)) {
178        Logger.Error("Can't find job with id " + data.JobId);
179      } else {
180        Job job = Jobs[data.JobId];
181        job.JobState = JobState.WaitingForChildJobs;
182        wcfService.UpdateJob(job, data);
183      }
184      KillAppDomain(data.JobId);
185    }
186
187    /// <summary>
188    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
189    /// once the connection gets reestablished, the job gets submitted
190    /// </summary>
191    /// <param name="jobId"></param>
192    [MethodImpl(MethodImplOptions.Synchronized)]
193    public void SendFinishedJob(object jobId) {
194      try {
195        Guid jId = (Guid)jobId;
196        Logger.Info("Getting the finished job with id: " + jId);
197        if (!engines.ContainsKey(jId)) {
198          Logger.Info("Engine doesn't exist");
199          return;
200        }
201        if (!jobs.ContainsKey(jId)) {
202          Logger.Info("Job doesn't exist");
203          return;
204        }
205        Job cJob = jobs[jId];
206
207        JobData sJob = engines[jId].GetFinishedJob();
208        cJob.Exception = engines[jId].CurrentException;
209        cJob.ExecutionTime = engines[jId].ExecutionTime;
210
211        try {
212          Logger.Info("Sending the finished job with id: " + jId);
213          wcfService.UpdateJob(cJob, sJob);
214          SlaveStatusInfo.JobsProcessed++;
215        }
216        catch (Exception e) {
217          Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
218        }
219        finally {
220          KillAppDomain(jId); // kill app-domain in every case
221          heartbeatManager.AwakeHeartBeatThread();
222        }
223      }
224      catch (Exception e) {
225        OnExceptionOccured(e);
226      }
227    }
228
229    /// <summary>
230    /// A new Job from the wcfService has been received and will be started within a AppDomain.
231    /// </summary>
232    /// <param name="sender"></param>
233    /// <param name="e"></param>
234    private void StartJobInAppDomain(Job myJob, JobData jobData) {
235      Logger.Info("Received new job with id " + myJob.Id);
236      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
237      bool pluginsPrepared = false;
238
239      try {
240        PluginCache.Instance.PreparePlugins(myJob, jobData);
241        Logger.Debug("Plugins fetched for job " + myJob.Id);
242        pluginsPrepared = true;
243      }
244      catch (Exception exception) {
245        Logger.Error(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
246      }
247
248      if (pluginsPrepared) {
249        try {
250          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName));
251          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
252          lock (engines) {
253            if (!jobs.ContainsKey(myJob.Id)) {
254              jobs.Add(myJob.Id, myJob);
255              appDomains.Add(myJob.Id, appDomain);
256              Logger.Debug("Creating AppDomain");
257              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
258              Logger.Debug("Created AppDomain");
259              engine.JobId = myJob.Id;
260              engine.core = this;
261              Logger.Debug("Starting Engine for job " + myJob.Id);
262              engines.Add(myJob.Id, engine);
263              engine.Start(jobData.Data);
264              SlaveStatusInfo.JobsFetched++;
265              Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
266            }
267          }
268          heartbeatManager.AwakeHeartBeatThread();
269        }
270        catch (Exception exception) {
271          Logger.Error("Creating the Appdomain and loading the job failed for job " + myJob.Id);
272          Logger.Error("Error thrown is: ", exception);
273          KillAppDomain(myJob.Id);
274        }
275      }
276    }
277
278    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
279    private void OnExceptionOccured(Exception e) {
280      Logger.Error("Error: " + e.ToString());
281      var handler = ExceptionOccured;
282      if (handler != null) handler(this, new EventArgs<Exception>(e));
283    }
284
285    private void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
286      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
287      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
288    }
289
290    /// <summary>
291    /// Enqueues messages from the executor to the message queue.
292    /// This is necessary if the core thread has to execute certain actions, e.g.
293    /// killing of an app domain.
294    /// </summary>
295    /// <typeparam name="T"></typeparam>
296    /// <param name="action"></param>
297    /// <param name="parameter"></param>
298    /// <returns>true if the calling method can continue execution, else false</returns>
299    private bool EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
300      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
301        ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
302        container.Callback = action;
303        container.CallbackParameter = parameter;
304        MessageQueue.GetInstance().AddMessage(container);
305        return false;
306      } else {
307        return true;
308      }
309    }
310
311    /// <summary>
312    /// Kill a appdomain with a specific id.
313    /// </summary>
314    /// <param name="id">the GUID of the job</param>
315    [MethodImpl(MethodImplOptions.Synchronized)]
316    public void KillAppDomain(Guid id) {
317      if (EnqueueExecutorMessage<Guid>(KillAppDomain, id)) {
318        Logger.Debug("Shutting down Appdomain for Job " + id);
319        lock (engines) {
320          try {
321            if (engines.ContainsKey(id)) {
322              engines[id].Dispose();
323              engines.Remove(id);
324            }
325
326            if (appDomains.ContainsKey(id)) {
327              appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
328
329              int repeat = 5;
330              while (repeat > 0) {
331                try {
332                  AppDomain.Unload(appDomains[id]);
333                  repeat = 0;
334                }
335                catch (CannotUnloadAppDomainException) {
336                  Logger.Error("Could not unload AppDomain, will try again in 1 sec.");
337                  Thread.Sleep(1000);
338                  repeat--;
339                  if (repeat == 0) {
340                    throw; // rethrow and let app crash
341                  }
342                }
343              }
344              appDomains.Remove(id);
345            }
346
347            jobs.Remove(id);
348            PluginCache.Instance.DeletePluginsForJob(id);
349            GC.Collect();
350          }
351          catch (Exception ex) {
352            Logger.Error("Exception when unloading the appdomain: ", ex);
353          }
354        }
355        GC.Collect();
356      }
357    }
358  }
359}
Note: See TracBrowser for help on using the repository browser.