Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5450

Last change on this file since 5450 was 5450, checked in by ascheibe, 13 years ago

#1233

  • added Pause/Stop/Abort mechanisms to the slave
  • added Pause to Jobs
File size: 18.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.CompilerServices;
26using System.ServiceModel;
27using System.Threading;
28using HeuristicLab.Clients.Hive.Slave.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using HeuristicLab.Services.Hive.Common;
32using HeuristicLab.Services.Hive.Common.DataTransfer;
33
34
35namespace HeuristicLab.Clients.Hive.Slave {
36  /// <summary>
37  /// The core component of the Hive Client
38  /// </summary>
39  public class Core : MarshalByRefObject {
40
41    //TODO: this class should be a singleton; there is only one instance, the reference is meanwhile save in TheCore
42    public static Core TheCore;
43
44    public static bool abortRequested { get; set; }
45    private Semaphore waitShutdownSem = new Semaphore(0, 1);
46    public static ILog Log { get; set; }
47
48    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
49    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
50    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
51
52    private WcfService wcfService;
53    private HeartbeatManager heartbeatManager;
54    private int coreThreadId;
55
56    private ISlaveCommunication ClientCom;
57    private ServiceHost slaveComm;
58
59    public Dictionary<Guid, Executor> ExecutionEngines {
60      get { return engines; }
61    }
62
63    internal Dictionary<Guid, Job> Jobs {
64      get { return jobs; }
65    }
66
67    public Core() {
68      TheCore = this;
69    }
70
71    /// <summary>
72    /// Main Method for the client
73    /// </summary>
74    public void Start() {
75      coreThreadId = Thread.CurrentThread.ManagedThreadId;
76      abortRequested = false;
77
78      //start the client communication service (pipe between slave and slave gui)
79      slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
80      slaveComm.Open();
81
82      ClientCom = SlaveClientCom.Instance.ClientCom;
83      ClientCom.LogMessage("Hive Slave started");
84
85      ConfigManager manager = ConfigManager.Instance;
86      manager.Core = this;
87
88      wcfService = WcfService.Instance;
89      RegisterServiceEvents();
90
91      StartHeartbeats(); // Start heartbeats thread
92      DispatchMessageQueue(); // dispatch messages until abortRequested
93
94      DeRegisterServiceEvents();
95      waitShutdownSem.Release();
96    }
97
98    private void StartHeartbeats() {
99      //Initialize the heartbeat     
100      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
101      heartbeatManager.StartHeartbeat();
102    }
103
104    private void DispatchMessageQueue() {
105      MessageQueue queue = MessageQueue.GetInstance();
106      while (!abortRequested) {
107        MessageContainer container = queue.GetMessage();
108        DetermineAction(container);
109      }
110    }
111
112    private void RegisterServiceEvents() {
113      WcfService.Instance.Connected += new EventHandler(wcfService_Connected);
114      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(wcfService_ExceptionOccured);
115    }
116
117    private void DeRegisterServiceEvents() {
118      WcfService.Instance.Connected -= wcfService_Connected;
119      WcfService.Instance.ExceptionOccured -= wcfService_ExceptionOccured;
120    }
121
122    void wcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
123      ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
124    }
125
126    void wcfService_Connected(object sender, EventArgs e) {
127      ClientCom.LogMessage("Connected successfully to Hive server");
128    }
129
130    /// <summary>
131    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
132    /// </summary>
133    /// <param name="container">The Container, containing the message</param>
134    private void DetermineAction(MessageContainer container) {
135      ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
136      //TODO: find a better solution
137      if (container is ExecutorMessageContainer<Guid>) {
138        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
139        c.execute();
140      } else if (container is MessageContainer) {
141        switch (container.Message) {
142          //Pull a Job from the Server
143          case MessageContainer.MessageType.CalculateJob:
144            Job myJob = wcfService.GetJob(container.JobId);
145            //TODO: handle in own thread!!
146            JobData jobData = wcfService.GetJobData(myJob.Id);
147            StartJobInAppDomain(myJob, jobData);
148            break;
149
150          case MessageContainer.MessageType.ShutdownSlave:
151            ShutdownCore();
152            break;
153          case MessageContainer.MessageType.StopAll:
154            DoStopAll();
155            break;
156          case MessageContainer.MessageType.PauseAll:
157            DoPauseAll();
158            break;
159          case MessageContainer.MessageType.AbortAll:
160            DoAbortAll();
161            break;
162          case MessageContainer.MessageType.AbortJob:
163            KillAppDomain(container.JobId);
164            break;
165          case MessageContainer.MessageType.StopJob:
166            DoStopJob(container.JobId);
167            break;
168          case MessageContainer.MessageType.PauseJob:
169            DoPauseJob(container.JobId);
170            break;
171          case MessageContainer.MessageType.Restart:
172            DoStartSlave();
173            break;
174        }
175      } else {
176        ClientCom.LogMessage("Unknown MessageContainer: " + container);
177      }
178    }
179
180    private void DoPauseJob(Guid guid) {
181      Job job = Jobs[guid];
182
183      if (job != null) {
184        engines[job.Id].Pause();
185        JobData sJob = engines[job.Id].GetFinishedJob();
186        job.Exception = engines[job.Id].CurrentException;
187        job.ExecutionTime = engines[job.Id].ExecutionTime;
188
189        try {
190          ClientCom.LogMessage("Sending the paused job with id: " + job.Id);
191          wcfService.UpdateJob(job, sJob);
192          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
193        }
194        catch (Exception e) {
195          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
196        }
197        finally {
198          KillAppDomain(job.Id); // kill app-domain in every case         
199        }
200      }
201    }
202
203    private void DoStopJob(Guid guid) {
204      Job job = Jobs[guid];
205
206      if (job != null) {
207        engines[job.Id].Stop();
208        JobData sJob = engines[job.Id].GetFinishedJob();
209        job.Exception = engines[job.Id].CurrentException;
210        job.ExecutionTime = engines[job.Id].ExecutionTime;
211
212        try {
213          ClientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
214          wcfService.UpdateJob(job, sJob);
215          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
216        }
217        catch (Exception e) {
218          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
219        }
220        finally {
221          KillAppDomain(job.Id); // kill app-domain in every case         
222        }
223      }
224    }
225
226    /// <summary>
227    /// aborts all running jobs, no results are sent back
228    /// </summary>
229    private void DoAbortAll() {
230      List<Guid> guids = new List<Guid>();
231      foreach (Guid job in Jobs.Keys) {
232        guids.Add(job);
233      }
234
235      foreach (Guid g in guids) {
236        KillAppDomain(g);
237      }
238
239      ClientCom.LogMessage("Aborted all jobs!");
240    }
241
242    /// <summary>
243    /// wait for jobs to finish, then pause client
244    /// </summary>
245    private void DoPauseAll() {
246      ClientCom.LogMessage("Pause all received");
247
248      //copy guids because there will be removed items from 'Jobs'
249      List<Guid> guids = new List<Guid>();
250      foreach (Guid job in Jobs.Keys) {
251        guids.Add(job);
252      }
253
254      foreach (Guid g in guids) {
255        DoPauseJob(g);
256      }
257    }
258
259    /// <summary>
260    /// pause slave immediately
261    /// </summary>
262    private void DoStopAll() {
263      ClientCom.LogMessage("Stop all received");
264
265      //copy guids because there will be removed items from 'Jobs'
266      List<Guid> guids = new List<Guid>();
267      foreach (Guid job in Jobs.Keys) {
268        guids.Add(job);
269      }
270
271      foreach (Guid g in guids) {
272        DoStopJob(g);
273      }
274    }
275
276    /// <summary>
277    /// completly shudown slave
278    /// </summary>
279    public void Shutdown() {
280      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
281      MessageQueue.GetInstance().AddMessage(mc);
282      waitShutdownSem.WaitOne();
283    }
284
285    /// <summary>
286    /// complete shutdown, should be called before the the application is exited
287    /// </summary>
288    private void ShutdownCore() {
289      ClientCom.LogMessage("Shutdown Signal received");
290      ClientCom.LogMessage("Stopping heartbeat");
291      heartbeatManager.StopHeartBeat();
292      abortRequested = true;
293      ClientCom.LogMessage("Logging out");
294
295
296      lock (engines) {
297        ClientCom.LogMessage("engines locked");
298        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
299          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
300          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
301          AppDomain.Unload(kvp.Value);
302        }
303      }
304      WcfService.Instance.Disconnect();
305      ClientCom.Shutdown();
306      SlaveClientCom.Close();
307
308      if (slaveComm.State != CommunicationState.Closed)
309        slaveComm.Close();
310    }
311
312    /// <summary>
313    /// reinitializes everything and continues operation,
314    /// can be called after Sleep()
315    /// </summary> 
316    private void DoStartSlave() {
317      ClientCom.LogMessage("Restart received");
318      StartHeartbeats();
319      ClientCom.LogMessage("Restart done");
320    }
321
322    /// <summary>
323    /// stop slave, except for client gui communication,
324    /// primarily used by gui if core is running as windows service
325    /// </summary>
326    //TODO: do we need an AbortSleep?
327    private void Sleep() {
328      ClientCom.LogMessage("Sleep received");
329      heartbeatManager.StopHeartBeat();
330      DoStopAll();
331      WcfService.Instance.Disconnect();
332      ClientCom.LogMessage("Sleep done");
333    }
334
335    /// <summary>
336    /// Pauses a job, which means sending it to the server and killing it locally;
337    /// atm only used when executor is waiting for child jobs
338    /// </summary>
339    /// <param name="data"></param>
340    [MethodImpl(MethodImplOptions.Synchronized)]
341    public void PauseWaitJob(JobData data) {
342      if (!Jobs.ContainsKey(data.JobId)) {
343        ClientCom.LogMessage("Can't find job with id " + data.JobId);
344      } else {
345        Job job = Jobs[data.JobId];
346        job.JobState = JobState.WaitingForChildJobs;
347        wcfService.UpdateJob(job, data);
348      }
349      KillAppDomain(data.JobId);
350    }
351
352    /// <summary>
353    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
354    /// once the connection gets reestablished, the job gets submitted
355    /// </summary>
356    /// <param name="jobId"></param>
357    [MethodImpl(MethodImplOptions.Synchronized)]
358    public void SendFinishedJob(object jobId) {
359      try {
360        Guid jId = (Guid)jobId;
361        ClientCom.LogMessage("Getting the finished job with id: " + jId);
362        if (!engines.ContainsKey(jId)) {
363          ClientCom.LogMessage("Engine doesn't exist");
364          return;
365        }
366        if (!jobs.ContainsKey(jId)) {
367          ClientCom.LogMessage("Job doesn't exist");
368          return;
369        }
370        Job cJob = jobs[jId];
371
372        JobData sJob = engines[jId].GetFinishedJob();
373        cJob.Exception = engines[jId].CurrentException;
374        cJob.ExecutionTime = engines[jId].ExecutionTime;
375
376        try {
377          ClientCom.LogMessage("Sending the finished job with id: " + jId);
378          wcfService.UpdateJob(cJob, sJob);
379          SlaveStatusInfo.JobsProcessed++;
380        }
381        catch (Exception e) {
382          ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
383        }
384        finally {
385          KillAppDomain(jId); // kill app-domain in every case
386          heartbeatManager.AwakeHeartBeatThread();
387        }
388      }
389      catch (Exception e) {
390        OnExceptionOccured(e);
391      }
392    }
393
394    /// <summary>
395    /// A new Job from the wcfService has been received and will be started within a AppDomain.
396    /// </summary>
397    /// <param name="sender"></param>
398    /// <param name="e"></param>
399    private void StartJobInAppDomain(Job myJob, JobData jobData) {
400      ClientCom.LogMessage("Received new job with id " + myJob.Id);
401      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
402      bool pluginsPrepared = false;
403
404      try {
405        PluginCache.Instance.PreparePlugins(myJob, jobData);
406        ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
407        pluginsPrepared = true;
408      }
409      catch (Exception exception) {
410        ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
411      }
412
413      if (pluginsPrepared) {
414        try {
415          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName));
416          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
417          lock (engines) {
418            if (!jobs.ContainsKey(myJob.Id)) {
419              jobs.Add(myJob.Id, myJob);
420              appDomains.Add(myJob.Id, appDomain);
421              ClientCom.LogMessage("Creating AppDomain");
422              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
423              ClientCom.LogMessage("Created AppDomain");
424              engine.JobId = myJob.Id;
425              engine.Core = this;
426              ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
427              engines.Add(myJob.Id, engine);
428              engine.Start(jobData.Data);
429              SlaveStatusInfo.JobsFetched++;
430              ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
431            }
432          }
433          heartbeatManager.AwakeHeartBeatThread();
434        }
435        catch (Exception exception) {
436          ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
437          ClientCom.LogMessage("Error thrown is: " + exception.ToString());
438          KillAppDomain(myJob.Id);
439        }
440      }
441    }
442
443    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
444    private void OnExceptionOccured(Exception e) {
445      ClientCom.LogMessage("Error: " + e.ToString());
446      var handler = ExceptionOccured;
447      if (handler != null) handler(this, new EventArgs<Exception>(e));
448    }
449
450    private void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
451      ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
452      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
453    }
454
455    /// <summary>
456    /// Enqueues messages from the executor to the message queue.
457    /// This is necessary if the core thread has to execute certain actions, e.g.
458    /// killing of an app domain.
459    /// </summary>
460    /// <typeparam name="T"></typeparam>
461    /// <param name="action"></param>
462    /// <param name="parameter"></param>
463    /// <returns>true if the calling method can continue execution, else false</returns>
464    private bool EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
465      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
466        ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
467        container.Callback = action;
468        container.CallbackParameter = parameter;
469        MessageQueue.GetInstance().AddMessage(container);
470        return false;
471      } else {
472        return true;
473      }
474    }
475
476    /// <summary>
477    /// Kill a appdomain with a specific id.
478    /// </summary>
479    /// <param name="id">the GUID of the job</param>
480    [MethodImpl(MethodImplOptions.Synchronized)]
481    public void KillAppDomain(Guid id) {
482      if (EnqueueExecutorMessage<Guid>(KillAppDomain, id)) {
483        ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
484        lock (engines) {
485          try {
486            if (engines.ContainsKey(id)) {
487              engines[id].Dispose();
488              engines.Remove(id);
489            }
490
491            if (appDomains.ContainsKey(id)) {
492              appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
493
494              int repeat = 5;
495              while (repeat > 0) {
496                try {
497                  AppDomain.Unload(appDomains[id]);
498                  repeat = 0;
499                }
500                catch (CannotUnloadAppDomainException) {
501                  ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
502                  Thread.Sleep(1000);
503                  repeat--;
504                  if (repeat == 0) {
505                    throw; // rethrow and let app crash
506                  }
507                }
508              }
509              appDomains.Remove(id);
510            }
511
512            jobs.Remove(id);
513            PluginCache.Instance.DeletePluginsForJob(id);
514            GC.Collect();
515          }
516          catch (Exception ex) {
517            ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
518          }
519        }
520        GC.Collect();
521      }
522    }
523  }
524}
Note: See TracBrowser for help on using the repository browser.