Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 6107

Last change on this file since 6107 was 6107, checked in by ascheibe, 13 years ago

#1233

  • simplify PreparePlugins
  • send more exceptions to ExperimentManager
File size: 22.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Diagnostics;
25using System.IO;
26using System.ServiceModel;
27using System.Threading;
28using System.Threading.Tasks;
29using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    public EventLog ServiceEventLog { get; set; }
41
42    public static bool abortRequested { get; set; }
43    private Semaphore waitShutdownSem = new Semaphore(0, 1);
44    public static ILog Log { get; set; }
45
46    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
47    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
48    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
49
50    private WcfService wcfService;
51    private HeartbeatManager heartbeatManager;
52    private int coreThreadId;
53
54    private ISlaveCommunication clientCom;
55    private ServiceHost slaveComm;
56
57    public Dictionary<Guid, Executor> ExecutionEngines {
58      get { return engines; }
59    }
60
61    internal Dictionary<Guid, Job> Jobs {
62      get { return jobs; }
63    }
64
65    public Core() { }
66
67    /// <summary>
68    /// Main Method for the client
69    /// </summary>
70    public void Start() {
71      coreThreadId = Thread.CurrentThread.ManagedThreadId;
72      abortRequested = false;
73
74      try {
75        ConfigManager manager = ConfigManager.Instance;
76        manager.Core = this;
77
78        //start the client communication service (pipe between slave and slave gui)
79        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
80        slaveComm.Open();
81        clientCom = SlaveClientCom.Instance.ClientCom;
82
83        // delete all left over job directories
84        PluginCache.Instance.CleanPluginTemp();
85        clientCom.LogMessage("Hive Slave started");
86
87        wcfService = WcfService.Instance;
88        RegisterServiceEvents();
89
90        StartHeartbeats(); // Start heartbeats thread       
91        DispatchMessageQueue(); // dispatch messages until abortRequested
92      }
93      catch (Exception ex) {
94        if (ServiceEventLog != null) {
95          try {
96            ServiceEventLog.WriteEntry("Hive Slave threw exception: " + ex.ToString() + " with stack trace: " + ex.StackTrace);
97          }
98          catch (Exception) { }
99        } else {
100          //try to log with clientCom. if this works the user sees at least a message,
101          //else an exception will be thrown anyways.
102          clientCom.LogMessage("Error on startup: " + ex.ToString() +
103            Environment.NewLine + "Core is going to shutdown.");
104        }
105      }
106      finally {
107        DeRegisterServiceEvents();
108        waitShutdownSem.Release();
109      }
110    }
111
112    private void StartHeartbeats() {
113      //Initialize the heartbeat     
114      if (heartbeatManager == null) {
115        heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
116        heartbeatManager.StartHeartbeat();
117      }
118    }
119
120    private void DispatchMessageQueue() {
121      MessageQueue queue = MessageQueue.GetInstance();
122      while (!abortRequested) {
123        MessageContainer container = queue.GetMessage();
124        DetermineAction(container);
125        clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
126      }
127    }
128
129    private void RegisterServiceEvents() {
130      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
131      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
132    }
133
134    private void DeRegisterServiceEvents() {
135      WcfService.Instance.Connected -= WcfService_Connected;
136      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
137    }
138
139    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
140      clientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
141    }
142
143    void WcfService_Connected(object sender, EventArgs e) {
144      clientCom.LogMessage("Connected successfully to Hive server");
145    }
146
147    /// <summary>
148    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
149    /// </summary>
150    /// <param name="container">The container, containing the message</param>
151    private void DetermineAction(MessageContainer container) {
152      clientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
153
154      if (container is ExecutorMessageContainer<Guid>) {
155        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
156        c.execute();
157      } else if (container is MessageContainer) {
158        switch (container.Message) {
159          case MessageContainer.MessageType.CalculateJob:
160            Task.Factory.StartNew((jobIdObj) => {
161              Guid jobId = (Guid)jobIdObj;
162              Job job = wcfService.GetJob(jobId);
163              if (job == null) throw new JobNotFoundException(jobId);
164              lock (engines) {
165                if (!jobs.ContainsKey(job.Id)) {
166                  jobs.Add(job.Id, job);
167                }
168              }
169              JobData jobData = wcfService.GetJobData(job.Id);
170              if (jobData == null) throw new JobDataNotFoundException(jobId);
171              SlaveStatusInfo.JobsFetched++;
172              job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
173              if (job == null) throw new JobNotFoundException(jobId);
174              StartJobInAppDomain(job, jobData);
175            }, container.JobId)
176            .ContinueWith((t) => {
177              // handle exception of task
178              clientCom.LogMessage(t.Exception.ToString());
179              wcfService.UpdateJobState(container.JobId, JobState.Failed, t.Exception.ToString());
180              SlaveStatusInfo.JobsAborted++;
181            }, TaskContinuationOptions.OnlyOnFaulted);
182            break;
183          case MessageContainer.MessageType.ShutdownSlave:
184            ShutdownCore();
185            break;
186          case MessageContainer.MessageType.StopAll:
187            DoStopAll();
188            break;
189          case MessageContainer.MessageType.PauseAll:
190            DoPauseAll();
191            break;
192          case MessageContainer.MessageType.AbortAll:
193            DoAbortAll();
194            break;
195          case MessageContainer.MessageType.AbortJob:
196            SlaveStatusInfo.JobsAborted++;
197            KillAppDomain(container.JobId);
198            break;
199          case MessageContainer.MessageType.StopJob:
200            DoStopJob(container.JobId);
201            break;
202          case MessageContainer.MessageType.PauseJob:
203            DoPauseJob(container.JobId);
204            break;
205          case MessageContainer.MessageType.Restart:
206            DoStartSlave();
207            break;
208          case MessageContainer.MessageType.Sleep:
209            Sleep();
210            break;
211          case MessageContainer.MessageType.SayHello:
212            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
213            break;
214        }
215      } else {
216        clientCom.LogMessage("Unknown MessageContainer: " + container);
217      }
218    }
219
220    private void DoPauseJob(Guid jobId) {
221      if (!Jobs.ContainsKey(jobId)) {
222        clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId);
223      } else {
224        Job job = Jobs[jobId];
225
226        if (job != null) {
227          engines[job.Id].Pause();
228          JobData sJob = engines[job.Id].GetPausedJob();
229          job.ExecutionTime = engines[job.Id].ExecutionTime;
230
231          try {
232            if (engines[job.Id].CurrentException != string.Empty) {
233              wcfService.UpdateJobState(job.Id, JobState.Failed, engines[job.Id].CurrentException);
234              SlaveStatusInfo.JobsAborted++;
235            } else {
236              SlaveStatusInfo.JobsProcessed++;
237            }
238            clientCom.LogMessage("Sending the paused job with id: " + job.Id);
239            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
240          }
241          catch (Exception e) {
242            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
243          }
244          finally {
245            KillAppDomain(job.Id); // kill app-domain in every case         
246          }
247        }
248      }
249    }
250
251    private void DoStopJob(Guid jobId) {
252      if (!Jobs.ContainsKey(jobId)) {
253        clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId);
254      } else {
255        Job job = Jobs[jobId];
256
257        if (job != null) {
258          engines[job.Id].Stop();
259          JobData sJob = engines[job.Id].GetFinishedJob();
260          job.ExecutionTime = engines[job.Id].ExecutionTime;
261
262
263          try {
264            if (engines[job.Id].CurrentException != string.Empty) {
265              wcfService.UpdateJobState(job.Id, JobState.Failed, engines[job.Id].CurrentException);
266            }
267            SlaveStatusInfo.JobsAborted++;
268
269            clientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
270            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted);
271          }
272          catch (Exception e) {
273            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
274          }
275          finally {
276            KillAppDomain(job.Id); // kill app-domain in every case         
277          }
278        }
279      }
280    }
281
282    /// <summary>
283    /// aborts all running jobs, no results are sent back
284    /// </summary>
285    private void DoAbortAll() {
286      List<Guid> guids = new List<Guid>();
287      foreach (Guid job in Jobs.Keys) {
288        guids.Add(job);
289      }
290
291      foreach (Guid g in guids) {
292        KillAppDomain(g);
293      }
294
295      clientCom.LogMessage("Aborted all jobs!");
296    }
297
298    /// <summary>
299    /// wait for jobs to finish, then pause client
300    /// </summary>
301    private void DoPauseAll() {
302      clientCom.LogMessage("Pause all received");
303
304      //copy guids because there will be removed items from 'Jobs'
305      List<Guid> guids = new List<Guid>();
306      foreach (Guid job in Jobs.Keys) {
307        guids.Add(job);
308      }
309
310      foreach (Guid g in guids) {
311        DoPauseJob(g);
312      }
313    }
314
315    /// <summary>
316    /// pause slave immediately
317    /// </summary>
318    private void DoStopAll() {
319      clientCom.LogMessage("Stop all received");
320
321      //copy guids because there will be removed items from 'Jobs'
322      List<Guid> guids = new List<Guid>();
323      foreach (Guid job in Jobs.Keys) {
324        guids.Add(job);
325      }
326
327      foreach (Guid g in guids) {
328        DoStopJob(g);
329      }
330    }
331
332    /// <summary>
333    /// completly shudown slave
334    /// </summary>
335    public void Shutdown() {
336      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
337      MessageQueue.GetInstance().AddMessage(mc);
338      waitShutdownSem.WaitOne();
339    }
340
341    /// <summary>
342    /// complete shutdown, should be called before the the application is exited
343    /// </summary>
344    private void ShutdownCore() {
345      clientCom.LogMessage("Shutdown Signal received");
346      clientCom.LogMessage("Stopping heartbeat");
347      heartbeatManager.StopHeartBeat();
348      abortRequested = true;
349      clientCom.LogMessage("Logging out");
350
351
352      lock (engines) {
353        clientCom.LogMessage("engines locked");
354        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
355          clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
356          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
357          AppDomain.Unload(kvp.Value);
358        }
359      }
360      WcfService.Instance.Disconnect();
361      clientCom.Shutdown();
362      SlaveClientCom.Close();
363
364      if (slaveComm.State != CommunicationState.Closed)
365        slaveComm.Close();
366    }
367
368    /// <summary>
369    /// reinitializes everything and continues operation,
370    /// can be called after Sleep()
371    /// </summary> 
372    private void DoStartSlave() {
373      clientCom.LogMessage("Restart received");
374      StartHeartbeats();
375      clientCom.LogMessage("Restart done");
376    }
377
378    /// <summary>
379    /// stop slave, except for client gui communication,
380    /// primarily used by gui if core is running as windows service
381    /// </summary>   
382    private void Sleep() {
383      clientCom.LogMessage("Sleep received");
384      heartbeatManager.StopHeartBeat();
385      heartbeatManager = null;
386      DoStopAll();
387      WcfService.Instance.Disconnect();
388      clientCom.LogMessage("Sleep done");
389    }
390
391    /// <summary>
392    /// Pauses a job, which means sending it to the server and killing it locally;
393    /// atm only used when executor is waiting for child jobs
394    /// </summary>
395    public void PauseWaitJob(JobData data) {
396      if (!Jobs.ContainsKey(data.JobId)) {
397        clientCom.LogMessage("Can't find job with id " + data.JobId);
398      } else {
399        Job job = Jobs[data.JobId];
400        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
401        wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
402      }
403      KillAppDomain(data.JobId);
404    }
405
406    /// <summary>
407    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
408    /// once the connection gets reestablished, the job gets submitted
409    /// </summary>
410    public void SendFinishedJob(Guid jobId) {
411      try {
412        clientCom.LogMessage("Getting the finished job with id: " + jobId);
413        if (!engines.ContainsKey(jobId)) {
414          clientCom.LogMessage("Engine doesn't exist");
415          return;
416        }
417        if (!jobs.ContainsKey(jobId)) {
418          clientCom.LogMessage("Job doesn't exist");
419          return;
420        }
421        Job cJob = jobs[jobId];
422        cJob.ExecutionTime = engines[jobId].ExecutionTime;
423
424        if (engines[jobId].Aborted) {
425          SlaveStatusInfo.JobsAborted++;
426        } else {
427          SlaveStatusInfo.JobsProcessed++;
428        }
429
430        if (engines[jobId].CurrentException != string.Empty) {
431          wcfService.UpdateJobState(jobId, JobState.Failed, engines[jobId].CurrentException);
432        }
433
434        JobData sJob = engines[jobId].GetFinishedJob();
435        try {
436          clientCom.LogMessage("Sending the finished job with id: " + jobId);
437          wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
438        }
439        catch (Exception e) {
440          clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
441        }
442        finally {
443          KillAppDomain(jobId);
444          heartbeatManager.AwakeHeartBeatThread();
445        }
446      }
447      catch (Exception e) {
448        OnExceptionOccured(e);
449      }
450    }
451
452    private static object locker = new object();
453
454    /// <summary>
455    /// A new Job from the wcfService has been received and will be started within a AppDomain.
456    /// </summary>   
457    private void StartJobInAppDomain(Job myJob, JobData jobData) {
458      clientCom.LogMessage("Received new job with id " + myJob.Id);
459      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
460
461      lock (locker) {
462        if (engines.ContainsKey(myJob.Id)) {
463          clientCom.LogMessage("Job with key " + myJob.Id + " already exists. Job will be ignored.");
464          return;
465        }
466
467        String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
468        bool pluginsPrepared = false;
469        string configFileName = string.Empty;
470
471        try {
472          PluginCache.Instance.PreparePlugins(myJob, out configFileName);
473          clientCom.LogMessage("Plugins fetched for job " + myJob.Id);
474          pluginsPrepared = true;
475        }
476        catch (Exception exception) {
477          clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
478          wcfService.UpdateJobState(myJob.Id, JobState.Failed, exception.ToString());
479          SlaveStatusInfo.JobsAborted++;
480
481          lock (engines) {
482            if (jobs.ContainsKey(myJob.Id)) {
483              jobs.Remove(myJob.Id);
484            }
485          }
486        }
487
488        if (pluginsPrepared) {
489          try {
490            AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
491            appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
492            Executor engine;
493            lock (engines) {
494              appDomains.Add(myJob.Id, appDomain);
495              clientCom.LogMessage("Creating AppDomain");
496              engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
497              clientCom.LogMessage("Created AppDomain");
498              engine.JobId = myJob.Id;
499              engine.Core = this;
500              clientCom.LogMessage("Starting Engine for job " + myJob.Id);
501              engines.Add(myJob.Id, engine);
502            }
503            engine.Start(jobData.Data);
504          }
505          catch (Exception exception) {
506            clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
507            clientCom.LogMessage("Error thrown is: " + exception.ToString());
508
509            if (engines.ContainsKey(myJob.Id) && engines[myJob.Id].CurrentException != string.Empty) {
510              wcfService.UpdateJobState(myJob.Id, JobState.Failed, engines[myJob.Id].CurrentException);
511            } else {
512              wcfService.UpdateJobState(myJob.Id, JobState.Failed, exception.ToString());
513            }
514            SlaveStatusInfo.JobsAborted++;
515
516            KillAppDomain(myJob.Id);
517          }
518        }
519      }
520      heartbeatManager.AwakeHeartBeatThread();
521    }
522
523    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
524    private void OnExceptionOccured(Exception e) {
525      clientCom.LogMessage("Error: " + e.ToString());
526      var handler = ExceptionOccured;
527      if (handler != null) handler(this, new EventArgs<Exception>(e));
528    }
529
530    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
531      clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
532      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
533    }
534
535    /// <summary>
536    /// Enqueues messages from the executor to the message queue.
537    /// This is necessary if the core thread has to execute certain actions, e.g.
538    /// killing of an app domain.
539    /// </summary>   
540    /// <returns>true if the calling method can continue execution, else false</returns>
541    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
542      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
543      container.Callback = action;
544      container.CallbackParameter = parameter;
545      MessageQueue.GetInstance().AddMessage(container);
546    }
547
548    /// <summary>
549    /// Kill a appdomain with a specific id.
550    /// </summary>
551    /// <param name="id">the GUID of the job</param>   
552    public void KillAppDomain(Guid id) {
553      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
554        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
555        return;
556      }
557
558      clientCom.LogMessage("Shutting down Appdomain for Job " + id);
559      lock (engines) {
560        try {
561          if (engines.ContainsKey(id)) {
562            engines[id].Dispose();
563            engines.Remove(id);
564          }
565
566          if (appDomains.ContainsKey(id)) {
567            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
568
569            int repeat = 5;
570            while (repeat > 0) {
571              try {
572                AppDomain.Unload(appDomains[id]);
573                repeat = 0;
574              }
575              catch (CannotUnloadAppDomainException) {
576                clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
577                Thread.Sleep(1000);
578                repeat--;
579                if (repeat == 0) {
580                  clientCom.LogMessage("Could not unload AppDomain, shutting down core...");
581                  throw; // rethrow and let app crash
582                }
583              }
584            }
585            appDomains.Remove(id);
586          }
587
588          jobs.Remove(id);
589          PluginCache.Instance.DeletePluginsForJob(id);
590          GC.Collect();
591        }
592        catch (Exception ex) {
593          clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
594        }
595      }
596      GC.Collect();
597      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
598    }
599  }
600}
Note: See TracBrowser for help on using the repository browser.