Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 6100

Last change on this file since 6100 was 6100, checked in by ascheibe, 13 years ago

#1233

  • Executor now sends all exceptions to the ExperimentManager as NetNamedPipe communication won't be possible in a Sandbox due to security constraints
  • count stopped and aborted jobs correctly
  • send correct status when a job is stopped by the ExperimentManager
  • try to log unhandled exceptions to gui if no EventLog is available
  • don't crash if job is sent more than once by server
File size: 21.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Diagnostics;
25using System.IO;
26using System.ServiceModel;
27using System.Threading;
28using System.Threading.Tasks;
29using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    public EventLog ServiceEventLog { get; set; }
41
42    public static bool abortRequested { get; set; }
43    private Semaphore waitShutdownSem = new Semaphore(0, 1);
44    public static ILog Log { get; set; }
45
46    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
47    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
48    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
49
50    private WcfService wcfService;
51    private HeartbeatManager heartbeatManager;
52    private int coreThreadId;
53
54    private ISlaveCommunication clientCom;
55    private ServiceHost slaveComm;
56
57    public Dictionary<Guid, Executor> ExecutionEngines {
58      get { return engines; }
59    }
60
61    internal Dictionary<Guid, Job> Jobs {
62      get { return jobs; }
63    }
64
65    public Core() { }
66
67    /// <summary>
68    /// Main Method for the client
69    /// </summary>
70    public void Start() {
71      coreThreadId = Thread.CurrentThread.ManagedThreadId;
72      abortRequested = false;
73
74      try {
75        ConfigManager manager = ConfigManager.Instance;
76        manager.Core = this;
77
78        //start the client communication service (pipe between slave and slave gui)
79        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
80        slaveComm.Open();
81
82        clientCom = SlaveClientCom.Instance.ClientCom;
83        clientCom.LogMessage("Hive Slave started");
84
85        wcfService = WcfService.Instance;
86        RegisterServiceEvents();
87
88        StartHeartbeats(); // Start heartbeats thread       
89        DispatchMessageQueue(); // dispatch messages until abortRequested
90      }
91      catch (Exception ex) {
92        if (ServiceEventLog != null) {
93          try {
94            ServiceEventLog.WriteEntry("Hive Slave threw exception: " + ex.ToString() + " with stack trace: " + ex.StackTrace);
95          }
96          catch (Exception) { }
97        } else {
98          //try to log with clientCom. if this works the user sees at least a message,
99          //else an exception will be thrown anyways.
100          clientCom.LogMessage("Error on startup: " + ex.ToString() +
101            Environment.NewLine + "Core is going to shutdown.");
102        }
103      }
104      finally {
105        DeRegisterServiceEvents();
106        waitShutdownSem.Release();
107      }
108    }
109
110    private void StartHeartbeats() {
111      //Initialize the heartbeat     
112      if (heartbeatManager == null) {
113        heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
114        heartbeatManager.StartHeartbeat();
115      }
116    }
117
118    private void DispatchMessageQueue() {
119      MessageQueue queue = MessageQueue.GetInstance();
120      while (!abortRequested) {
121        MessageContainer container = queue.GetMessage();
122        DetermineAction(container);
123        clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
124      }
125    }
126
127    private void RegisterServiceEvents() {
128      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
129      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
130    }
131
132    private void DeRegisterServiceEvents() {
133      WcfService.Instance.Connected -= WcfService_Connected;
134      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
135    }
136
137    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
138      clientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
139    }
140
141    void WcfService_Connected(object sender, EventArgs e) {
142      clientCom.LogMessage("Connected successfully to Hive server");
143    }
144
145    /// <summary>
146    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
147    /// </summary>
148    /// <param name="container">The container, containing the message</param>
149    private void DetermineAction(MessageContainer container) {
150      clientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
151
152      if (container is ExecutorMessageContainer<Guid>) {
153        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
154        c.execute();
155      } else if (container is MessageContainer) {
156        switch (container.Message) {
157          case MessageContainer.MessageType.CalculateJob:
158            Task.Factory.StartNew((jobIdObj) => {
159              Guid jobId = (Guid)jobIdObj;
160              Job job = wcfService.GetJob(jobId);
161              if (job == null) throw new JobNotFoundException(jobId);
162              lock (engines) {
163                if (!jobs.ContainsKey(job.Id)) {
164                  jobs.Add(job.Id, job);
165                }
166              }
167              JobData jobData = wcfService.GetJobData(job.Id);
168              if (jobData == null) throw new JobDataNotFoundException(jobId);
169              SlaveStatusInfo.JobsFetched++;
170              job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
171              if (job == null) throw new JobNotFoundException(jobId);
172              StartJobInAppDomain(job, jobData);
173            }, container.JobId)
174            .ContinueWith((t) => {
175              // handle exception of task
176              clientCom.LogMessage(t.Exception.ToString());
177            }, TaskContinuationOptions.OnlyOnFaulted);
178            break;
179          case MessageContainer.MessageType.ShutdownSlave:
180            ShutdownCore();
181            break;
182          case MessageContainer.MessageType.StopAll:
183            DoStopAll();
184            break;
185          case MessageContainer.MessageType.PauseAll:
186            DoPauseAll();
187            break;
188          case MessageContainer.MessageType.AbortAll:
189            DoAbortAll();
190            break;
191          case MessageContainer.MessageType.AbortJob:
192            SlaveStatusInfo.JobsAborted++;
193            KillAppDomain(container.JobId);
194            break;
195          case MessageContainer.MessageType.StopJob:
196            DoStopJob(container.JobId);
197            break;
198          case MessageContainer.MessageType.PauseJob:
199            DoPauseJob(container.JobId);
200            break;
201          case MessageContainer.MessageType.Restart:
202            DoStartSlave();
203            break;
204          case MessageContainer.MessageType.Sleep:
205            Sleep();
206            break;
207          case MessageContainer.MessageType.SayHello:
208            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
209            break;
210        }
211      } else {
212        clientCom.LogMessage("Unknown MessageContainer: " + container);
213      }
214    }
215
216    private void DoPauseJob(Guid jobId) {
217      if (!Jobs.ContainsKey(jobId)) {
218        clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId);
219      } else {
220        Job job = Jobs[jobId];
221
222        if (job != null) {
223          engines[job.Id].Pause();
224          JobData sJob = engines[job.Id].GetPausedJob();
225          job.ExecutionTime = engines[job.Id].ExecutionTime;
226
227          try {
228            if (engines[job.Id].CurrentException != string.Empty) {
229              wcfService.UpdateJobState(job.Id, JobState.Failed, engines[job.Id].CurrentException);
230              SlaveStatusInfo.JobsAborted++;
231            } else {
232              SlaveStatusInfo.JobsProcessed++;
233            }
234            clientCom.LogMessage("Sending the paused job with id: " + job.Id);
235            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
236          }
237          catch (Exception e) {
238            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
239          }
240          finally {
241            KillAppDomain(job.Id); // kill app-domain in every case         
242          }
243        }
244      }
245    }
246
247    private void DoStopJob(Guid jobId) {
248      if (!Jobs.ContainsKey(jobId)) {
249        clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId);
250      } else {
251        Job job = Jobs[jobId];
252
253        if (job != null) {
254          engines[job.Id].Stop();
255          JobData sJob = engines[job.Id].GetFinishedJob();
256          job.ExecutionTime = engines[job.Id].ExecutionTime;
257
258
259          try {
260            if (engines[job.Id].CurrentException != string.Empty) {
261              wcfService.UpdateJobState(job.Id, JobState.Failed, engines[job.Id].CurrentException);
262            }
263            SlaveStatusInfo.JobsAborted++;
264
265            clientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
266            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted);
267          }
268          catch (Exception e) {
269            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
270          }
271          finally {
272            KillAppDomain(job.Id); // kill app-domain in every case         
273          }
274        }
275      }
276    }
277
278    /// <summary>
279    /// aborts all running jobs, no results are sent back
280    /// </summary>
281    private void DoAbortAll() {
282      List<Guid> guids = new List<Guid>();
283      foreach (Guid job in Jobs.Keys) {
284        guids.Add(job);
285      }
286
287      foreach (Guid g in guids) {
288        KillAppDomain(g);
289      }
290
291      clientCom.LogMessage("Aborted all jobs!");
292    }
293
294    /// <summary>
295    /// wait for jobs to finish, then pause client
296    /// </summary>
297    private void DoPauseAll() {
298      clientCom.LogMessage("Pause all received");
299
300      //copy guids because there will be removed items from 'Jobs'
301      List<Guid> guids = new List<Guid>();
302      foreach (Guid job in Jobs.Keys) {
303        guids.Add(job);
304      }
305
306      foreach (Guid g in guids) {
307        DoPauseJob(g);
308      }
309    }
310
311    /// <summary>
312    /// pause slave immediately
313    /// </summary>
314    private void DoStopAll() {
315      clientCom.LogMessage("Stop all received");
316
317      //copy guids because there will be removed items from 'Jobs'
318      List<Guid> guids = new List<Guid>();
319      foreach (Guid job in Jobs.Keys) {
320        guids.Add(job);
321      }
322
323      foreach (Guid g in guids) {
324        DoStopJob(g);
325      }
326    }
327
328    /// <summary>
329    /// completly shudown slave
330    /// </summary>
331    public void Shutdown() {
332      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
333      MessageQueue.GetInstance().AddMessage(mc);
334      waitShutdownSem.WaitOne();
335    }
336
337    /// <summary>
338    /// complete shutdown, should be called before the the application is exited
339    /// </summary>
340    private void ShutdownCore() {
341      clientCom.LogMessage("Shutdown Signal received");
342      clientCom.LogMessage("Stopping heartbeat");
343      heartbeatManager.StopHeartBeat();
344      abortRequested = true;
345      clientCom.LogMessage("Logging out");
346
347
348      lock (engines) {
349        clientCom.LogMessage("engines locked");
350        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
351          clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
352          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
353          AppDomain.Unload(kvp.Value);
354        }
355      }
356      WcfService.Instance.Disconnect();
357      clientCom.Shutdown();
358      SlaveClientCom.Close();
359
360      if (slaveComm.State != CommunicationState.Closed)
361        slaveComm.Close();
362    }
363
364    /// <summary>
365    /// reinitializes everything and continues operation,
366    /// can be called after Sleep()
367    /// </summary> 
368    private void DoStartSlave() {
369      clientCom.LogMessage("Restart received");
370      StartHeartbeats();
371      clientCom.LogMessage("Restart done");
372    }
373
374    /// <summary>
375    /// stop slave, except for client gui communication,
376    /// primarily used by gui if core is running as windows service
377    /// </summary>   
378    private void Sleep() {
379      clientCom.LogMessage("Sleep received");
380      heartbeatManager.StopHeartBeat();
381      heartbeatManager = null;
382      DoStopAll();
383      WcfService.Instance.Disconnect();
384      clientCom.LogMessage("Sleep done");
385    }
386
387    /// <summary>
388    /// Pauses a job, which means sending it to the server and killing it locally;
389    /// atm only used when executor is waiting for child jobs
390    /// </summary>
391    public void PauseWaitJob(JobData data) {
392      if (!Jobs.ContainsKey(data.JobId)) {
393        clientCom.LogMessage("Can't find job with id " + data.JobId);
394      } else {
395        Job job = Jobs[data.JobId];
396        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
397        wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
398      }
399      KillAppDomain(data.JobId);
400    }
401
402    /// <summary>
403    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
404    /// once the connection gets reestablished, the job gets submitted
405    /// </summary>
406    public void SendFinishedJob(Guid jobId) {
407      try {
408        clientCom.LogMessage("Getting the finished job with id: " + jobId);
409        if (!engines.ContainsKey(jobId)) {
410          clientCom.LogMessage("Engine doesn't exist");
411          return;
412        }
413        if (!jobs.ContainsKey(jobId)) {
414          clientCom.LogMessage("Job doesn't exist");
415          return;
416        }
417        Job cJob = jobs[jobId];
418        cJob.ExecutionTime = engines[jobId].ExecutionTime;
419
420        if (engines[jobId].Aborted) {
421          SlaveStatusInfo.JobsAborted++;
422        } else {
423          SlaveStatusInfo.JobsProcessed++;
424        }
425
426        if (engines[jobId].CurrentException != string.Empty) {
427          wcfService.UpdateJobState(jobId, JobState.Failed, engines[jobId].CurrentException);
428        }
429
430        JobData sJob = engines[jobId].GetFinishedJob();
431        try {
432          clientCom.LogMessage("Sending the finished job with id: " + jobId);
433          wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
434        }
435        catch (Exception e) {
436          clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
437        }
438        finally {
439          KillAppDomain(jobId);
440          heartbeatManager.AwakeHeartBeatThread();
441        }
442      }
443      catch (Exception e) {
444        OnExceptionOccured(e);
445      }
446    }
447
448    private static object locker = new object();
449
450    /// <summary>
451    /// A new Job from the wcfService has been received and will be started within a AppDomain.
452    /// </summary>   
453    private void StartJobInAppDomain(Job myJob, JobData jobData) {
454      clientCom.LogMessage("Received new job with id " + myJob.Id);
455      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
456
457      lock (locker) {
458        if (engines.ContainsKey(myJob.Id)) {
459          clientCom.LogMessage("Job with key " + myJob.Id + " already exists. Job will be ignored.");
460          return;
461        }
462
463        String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
464        bool pluginsPrepared = false;
465        string configFileName = string.Empty;
466
467        try {
468          PluginCache.Instance.PreparePlugins(myJob, out configFileName);
469          clientCom.LogMessage("Plugins fetched for job " + myJob.Id);
470          pluginsPrepared = true;
471        }
472        catch (Exception exception) {
473          clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
474          lock (engines) {
475            if (jobs.ContainsKey(myJob.Id)) {
476              jobs.Remove(myJob.Id);
477            }
478          }
479        }
480
481        if (pluginsPrepared) {
482          try {
483            AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
484            appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
485            lock (engines) {
486              appDomains.Add(myJob.Id, appDomain);
487              clientCom.LogMessage("Creating AppDomain");
488              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
489              clientCom.LogMessage("Created AppDomain");
490              engine.JobId = myJob.Id;
491              engine.Core = this;
492              clientCom.LogMessage("Starting Engine for job " + myJob.Id);
493              engines.Add(myJob.Id, engine);
494              engine.Start(jobData.Data);
495            }
496
497          }
498          catch (Exception exception) {
499            clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
500            clientCom.LogMessage("Error thrown is: " + exception.ToString());
501            KillAppDomain(myJob.Id);
502          }
503        }
504      }
505      heartbeatManager.AwakeHeartBeatThread();
506    }
507
508    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
509    private void OnExceptionOccured(Exception e) {
510      clientCom.LogMessage("Error: " + e.ToString());
511      var handler = ExceptionOccured;
512      if (handler != null) handler(this, new EventArgs<Exception>(e));
513    }
514
515    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
516      clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
517      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
518    }
519
520    /// <summary>
521    /// Enqueues messages from the executor to the message queue.
522    /// This is necessary if the core thread has to execute certain actions, e.g.
523    /// killing of an app domain.
524    /// </summary>   
525    /// <returns>true if the calling method can continue execution, else false</returns>
526    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
527      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
528      container.Callback = action;
529      container.CallbackParameter = parameter;
530      MessageQueue.GetInstance().AddMessage(container);
531    }
532
533    /// <summary>
534    /// Kill a appdomain with a specific id.
535    /// </summary>
536    /// <param name="id">the GUID of the job</param>   
537    public void KillAppDomain(Guid id) {
538      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
539        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
540        return;
541      }
542
543      clientCom.LogMessage("Shutting down Appdomain for Job " + id);
544      lock (engines) {
545        try {
546          if (engines.ContainsKey(id)) {
547            engines[id].Dispose();
548            engines.Remove(id);
549          }
550
551          if (appDomains.ContainsKey(id)) {
552            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
553
554            int repeat = 5;
555            while (repeat > 0) {
556              try {
557                AppDomain.Unload(appDomains[id]);
558                repeat = 0;
559              }
560              catch (CannotUnloadAppDomainException) {
561                clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
562                Thread.Sleep(1000);
563                repeat--;
564                if (repeat == 0) {
565                  clientCom.LogMessage("Could not unload AppDomain, shutting down core...");
566                  throw; // rethrow and let app crash
567                }
568              }
569            }
570            appDomains.Remove(id);
571          }
572
573          jobs.Remove(id);
574          PluginCache.Instance.DeletePluginsForJob(id);
575          GC.Collect();
576        }
577        catch (Exception ex) {
578          clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
579        }
580      }
581      GC.Collect();
582      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
583    }
584  }
585}
Note: See TracBrowser for help on using the repository browser.