Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5636

Last change on this file since 5636 was 5636, checked in by cneumuel, 13 years ago

#1233

  • updated jobstates documentation
  • enhanced ganttChart
  • fixed setting of jobstates
  • added option to force lifecycle-trigger (mainly for testing purposes)
File size: 19.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.CompilerServices;
26using System.ServiceModel;
27using System.Threading;
28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Client
37  /// </summary>
38  public class Core : MarshalByRefObject {
39
40    //TODO: this class should be a singleton; there is only one instance, the reference is meanwhile save in TheCore
41    public static Core TheCore;
42
43    public static bool abortRequested { get; set; }
44    private Semaphore waitShutdownSem = new Semaphore(0, 1);
45    public static ILog Log { get; set; }
46
47    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
48    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
49    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
50
51    private WcfService wcfService;
52    private HeartbeatManager heartbeatManager;
53    private int coreThreadId;
54
55    private ISlaveCommunication ClientCom;
56    private ServiceHost slaveComm;
57
58    public Dictionary<Guid, Executor> ExecutionEngines {
59      get { return engines; }
60    }
61
62    internal Dictionary<Guid, Job> Jobs {
63      get { return jobs; }
64    }
65
66    public Core() {
67      TheCore = this;
68    }
69
70    /// <summary>
71    /// Main Method for the client
72    /// </summary>
73    public void Start() {
74      coreThreadId = Thread.CurrentThread.ManagedThreadId;
75      abortRequested = false;
76
77      //start the client communication service (pipe between slave and slave gui)
78      slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
79      slaveComm.Open();
80
81      ClientCom = SlaveClientCom.Instance.ClientCom;
82      ClientCom.LogMessage("Hive Slave started");
83
84      ConfigManager manager = ConfigManager.Instance;
85      manager.Core = this;
86
87      wcfService = WcfService.Instance;
88      RegisterServiceEvents();
89
90      StartHeartbeats(); // Start heartbeats thread
91      DispatchMessageQueue(); // dispatch messages until abortRequested
92
93      DeRegisterServiceEvents();
94      waitShutdownSem.Release();
95    }
96
97    private void StartHeartbeats() {
98      //Initialize the heartbeat     
99      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
100      heartbeatManager.StartHeartbeat();
101    }
102
103    private void DispatchMessageQueue() {
104      MessageQueue queue = MessageQueue.GetInstance();
105      while (!abortRequested) {
106        MessageContainer container = queue.GetMessage();
107        DetermineAction(container);
108      }
109    }
110
111    private void RegisterServiceEvents() {
112      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
113      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
114    }
115
116    private void DeRegisterServiceEvents() {
117      WcfService.Instance.Connected -= WcfService_Connected;
118      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
119    }
120
121    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
122      ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
123    }
124
125    void WcfService_Connected(object sender, EventArgs e) {
126      ClientCom.LogMessage("Connected successfully to Hive server");
127    }
128
129    /// <summary>
130    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
131    /// </summary>
132    /// <param name="container">The Container, containing the message</param>
133    private void DetermineAction(MessageContainer container) {
134      ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
135      //TODO: find a better solution
136      if (container is ExecutorMessageContainer<Guid>) {
137        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
138        c.execute();
139      } else if (container is MessageContainer) {
140        switch (container.Message) {
141          case MessageContainer.MessageType.CalculateJob:
142            Job job = wcfService.GetJob(container.JobId);
143            JobData jobData = wcfService.GetJobData(job.Id);
144            job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
145            StartJobInAppDomain(job, jobData);
146            break;
147          case MessageContainer.MessageType.ShutdownSlave:
148            ShutdownCore();
149            break;
150          case MessageContainer.MessageType.StopAll:
151            DoStopAll();
152            break;
153          case MessageContainer.MessageType.PauseAll:
154            DoPauseAll();
155            break;
156          case MessageContainer.MessageType.AbortAll:
157            DoAbortAll();
158            break;
159          case MessageContainer.MessageType.AbortJob:
160            KillAppDomain(container.JobId);
161            break;
162          case MessageContainer.MessageType.StopJob:
163            DoStopJob(container.JobId);
164            break;
165          case MessageContainer.MessageType.PauseJob:
166            DoPauseJob(container.JobId);
167            break;
168          case MessageContainer.MessageType.Restart:
169            DoStartSlave();
170            break;
171          case MessageContainer.MessageType.Sleep:
172            Sleep();
173            break;
174          case MessageContainer.MessageType.SayHello:
175            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
176            break;
177        }
178      } else {
179        ClientCom.LogMessage("Unknown MessageContainer: " + container);
180      }
181    }
182
183    private void DoPauseJob(Guid jobId) {
184      Job job = Jobs[jobId];
185
186      if (job != null) {
187        engines[job.Id].Pause();
188        JobData sJob = engines[job.Id].GetFinishedJob();
189        // job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is paused
190        job.ExecutionTime = engines[job.Id].ExecutionTime;
191
192        try {
193          ClientCom.LogMessage("Sending the paused job with id: " + job.Id);
194          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
195          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
196        }
197        catch (Exception e) {
198          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
199        }
200        finally {
201          KillAppDomain(job.Id); // kill app-domain in every case         
202        }
203      }
204    }
205
206    private void DoStopJob(Guid guid) {
207      Job job = Jobs[guid];
208
209      if (job != null) {
210        engines[job.Id].Stop();
211        JobData sJob = engines[job.Id].GetFinishedJob();
212        // job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is stopped regularly
213        job.ExecutionTime = engines[job.Id].ExecutionTime;
214
215        try {
216          ClientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
217          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
218          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
219        }
220        catch (Exception e) {
221          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
222        }
223        finally {
224          KillAppDomain(job.Id); // kill app-domain in every case         
225        }
226      }
227    }
228
229    /// <summary>
230    /// aborts all running jobs, no results are sent back
231    /// </summary>
232    private void DoAbortAll() {
233      List<Guid> guids = new List<Guid>();
234      foreach (Guid job in Jobs.Keys) {
235        guids.Add(job);
236      }
237
238      foreach (Guid g in guids) {
239        KillAppDomain(g);
240      }
241
242      ClientCom.LogMessage("Aborted all jobs!");
243    }
244
245    /// <summary>
246    /// wait for jobs to finish, then pause client
247    /// </summary>
248    private void DoPauseAll() {
249      ClientCom.LogMessage("Pause all received");
250
251      //copy guids because there will be removed items from 'Jobs'
252      List<Guid> guids = new List<Guid>();
253      foreach (Guid job in Jobs.Keys) {
254        guids.Add(job);
255      }
256
257      foreach (Guid g in guids) {
258        DoPauseJob(g);
259      }
260    }
261
262    /// <summary>
263    /// pause slave immediately
264    /// </summary>
265    private void DoStopAll() {
266      ClientCom.LogMessage("Stop all received");
267
268      //copy guids because there will be removed items from 'Jobs'
269      List<Guid> guids = new List<Guid>();
270      foreach (Guid job in Jobs.Keys) {
271        guids.Add(job);
272      }
273
274      foreach (Guid g in guids) {
275        DoStopJob(g);
276      }
277    }
278
279    /// <summary>
280    /// completly shudown slave
281    /// </summary>
282    public void Shutdown() {
283      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
284      MessageQueue.GetInstance().AddMessage(mc);
285      waitShutdownSem.WaitOne();
286    }
287
288    /// <summary>
289    /// complete shutdown, should be called before the the application is exited
290    /// </summary>
291    private void ShutdownCore() {
292      ClientCom.LogMessage("Shutdown Signal received");
293      ClientCom.LogMessage("Stopping heartbeat");
294      heartbeatManager.StopHeartBeat();
295      abortRequested = true;
296      ClientCom.LogMessage("Logging out");
297
298
299      lock (engines) {
300        ClientCom.LogMessage("engines locked");
301        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
302          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
303          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
304          AppDomain.Unload(kvp.Value);
305        }
306      }
307      WcfService.Instance.Disconnect();
308      ClientCom.Shutdown();
309      SlaveClientCom.Close();
310
311      if (slaveComm.State != CommunicationState.Closed)
312        slaveComm.Close();
313    }
314
315    /// <summary>
316    /// reinitializes everything and continues operation,
317    /// can be called after Sleep()
318    /// </summary> 
319    private void DoStartSlave() {
320      ClientCom.LogMessage("Restart received");
321      StartHeartbeats();
322      ClientCom.LogMessage("Restart done");
323    }
324
325    /// <summary>
326    /// stop slave, except for client gui communication,
327    /// primarily used by gui if core is running as windows service
328    /// </summary>
329    //TODO: do we need an AbortSleep?
330    private void Sleep() {
331      ClientCom.LogMessage("Sleep received");
332      heartbeatManager.StopHeartBeat();
333      DoStopAll();
334      WcfService.Instance.Disconnect();
335      ClientCom.LogMessage("Sleep done");
336    }
337
338    /// <summary>
339    /// Pauses a job, which means sending it to the server and killing it locally;
340    /// atm only used when executor is waiting for child jobs
341    /// </summary>
342    /// <param name="data"></param>
343    [MethodImpl(MethodImplOptions.Synchronized)]
344    public void PauseWaitJob(JobData data) {
345      if (!Jobs.ContainsKey(data.JobId)) {
346        ClientCom.LogMessage("Can't find job with id " + data.JobId);
347      } else {
348        Job job = Jobs[data.JobId];
349        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
350        job.SetState(JobState.Waiting);
351        wcfService.UpdateJob(job);
352      }
353      KillAppDomain(data.JobId);
354    }
355
356    /// <summary>
357    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
358    /// once the connection gets reestablished, the job gets submitted
359    /// </summary>
360    /// <param name="jobId"></param>
361    [MethodImpl(MethodImplOptions.Synchronized)]
362    public void SendFinishedJob(Guid jobId) {
363      try {
364        ClientCom.LogMessage("Getting the finished job with id: " + jobId);
365        if (!engines.ContainsKey(jobId)) {
366          ClientCom.LogMessage("Engine doesn't exist");
367          return;
368        }
369        if (!jobs.ContainsKey(jobId)) {
370          ClientCom.LogMessage("Job doesn't exist");
371          return;
372        }
373        Job cJob = jobs[jobId];
374        cJob.ExecutionTime = engines[jobId].ExecutionTime;
375
376        JobData sJob = engines[jobId].GetFinishedJob();
377        // cJob.Exception = engines[jId].CurrentException; // can there be an exception if the job is sent normally. the exception should be entered in the statelog with the corresponding state (Failed)
378        cJob.ExecutionTime = engines[jobId].ExecutionTime;
379
380        try {
381          ClientCom.LogMessage("Sending the finished job with id: " + jobId);
382          wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
383          SlaveStatusInfo.JobsProcessed++;
384        }
385        catch (Exception e) {
386          ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
387        }
388        finally {
389          KillAppDomain(jobId); // kill app-domain in every case
390          heartbeatManager.AwakeHeartBeatThread();
391        }
392      }
393      catch (Exception e) {
394        OnExceptionOccured(e);
395      }
396    }
397
398    /// <summary>
399    /// A new Job from the wcfService has been received and will be started within a AppDomain.
400    /// </summary>
401    /// <param name="sender"></param>
402    /// <param name="e"></param>
403    private void StartJobInAppDomain(Job myJob, JobData jobData) {
404      ClientCom.LogMessage("Received new job with id " + myJob.Id);
405      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
406      bool pluginsPrepared = false;
407      string configFileName = string.Empty;
408
409      try {
410        PluginCache.Instance.PreparePlugins(myJob, out configFileName);
411        ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
412        pluginsPrepared = true;
413      }
414      catch (Exception exception) {
415        ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
416      }
417
418      if (pluginsPrepared) {
419        try {
420          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
421          appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
422          lock (engines) {
423            if (!jobs.ContainsKey(myJob.Id)) {
424              jobs.Add(myJob.Id, myJob);
425              appDomains.Add(myJob.Id, appDomain);
426              ClientCom.LogMessage("Creating AppDomain");
427              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
428              ClientCom.LogMessage("Created AppDomain");
429              engine.JobId = myJob.Id;
430              engine.Core = this;
431              ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
432              engines.Add(myJob.Id, engine);
433              engine.Start(jobData.Data);
434              SlaveStatusInfo.JobsFetched++;
435              ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
436            }
437          }
438          heartbeatManager.AwakeHeartBeatThread();
439        }
440        catch (Exception exception) {
441          ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
442          ClientCom.LogMessage("Error thrown is: " + exception.ToString());
443          KillAppDomain(myJob.Id);
444        }
445      }
446    }
447
448    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
449    private void OnExceptionOccured(Exception e) {
450      ClientCom.LogMessage("Error: " + e.ToString());
451      var handler = ExceptionOccured;
452      if (handler != null) handler(this, new EventArgs<Exception>(e));
453    }
454
455    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
456      ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
457      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
458    }
459
460    /// <summary>
461    /// Enqueues messages from the executor to the message queue.
462    /// This is necessary if the core thread has to execute certain actions, e.g.
463    /// killing of an app domain.
464    /// </summary>
465    /// <typeparam name="T"></typeparam>
466    /// <param name="action"></param>
467    /// <param name="parameter"></param>
468    /// <returns>true if the calling method can continue execution, else false</returns>
469    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
470      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
471      container.Callback = action;
472      container.CallbackParameter = parameter;
473      MessageQueue.GetInstance().AddMessage(container);
474    }
475
476    /// <summary>
477    /// Kill a appdomain with a specific id.
478    /// </summary>
479    /// <param name="id">the GUID of the job</param>
480    //[MethodImpl(MethodImplOptions.Synchronized)]
481    public void KillAppDomain(Guid id) {
482      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
483        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
484        return;
485      }
486
487      ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
488      lock (engines) {
489        try {
490          if (engines.ContainsKey(id)) {
491            engines[id].Dispose();
492            engines.Remove(id);
493          }
494
495          if (appDomains.ContainsKey(id)) {
496            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
497
498            int repeat = 5;
499            while (repeat > 0) {
500              try {
501                AppDomain.Unload(appDomains[id]);
502                repeat = 0;
503              }
504              catch (CannotUnloadAppDomainException) {
505                ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
506                Thread.Sleep(1000);
507                repeat--;
508                if (repeat == 0) {
509                  throw; // rethrow and let app crash
510                }
511              }
512            }
513            appDomains.Remove(id);
514          }
515
516          jobs.Remove(id);
517          PluginCache.Instance.DeletePluginsForJob(id);
518          GC.Collect();
519        }
520        catch (Exception ex) {
521          ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
522        }
523      }
524      GC.Collect();
525    }
526  }
527}
Note: See TracBrowser for help on using the repository browser.