Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 6101

Last change on this file since 6101 was 6101, checked in by ascheibe, 13 years ago

#1233

  • don't lock engines for so long in StartJobInAppDomain
  • move SlaveCommListener to ConsoleClient
  • delete orphaned job folders at startup
File size: 21.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Diagnostics;
25using System.IO;
26using System.ServiceModel;
27using System.Threading;
28using System.Threading.Tasks;
29using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    public EventLog ServiceEventLog { get; set; }
41
42    public static bool abortRequested { get; set; }
43    private Semaphore waitShutdownSem = new Semaphore(0, 1);
44    public static ILog Log { get; set; }
45
46    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
47    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
48    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
49
50    private WcfService wcfService;
51    private HeartbeatManager heartbeatManager;
52    private int coreThreadId;
53
54    private ISlaveCommunication clientCom;
55    private ServiceHost slaveComm;
56
57    public Dictionary<Guid, Executor> ExecutionEngines {
58      get { return engines; }
59    }
60
61    internal Dictionary<Guid, Job> Jobs {
62      get { return jobs; }
63    }
64
65    public Core() { }
66
67    /// <summary>
68    /// Main Method for the client
69    /// </summary>
70    public void Start() {
71      coreThreadId = Thread.CurrentThread.ManagedThreadId;
72      abortRequested = false;
73
74      try {
75        ConfigManager manager = ConfigManager.Instance;
76        manager.Core = this;
77
78        //start the client communication service (pipe between slave and slave gui)
79        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
80        slaveComm.Open();
81        clientCom = SlaveClientCom.Instance.ClientCom;
82
83        // delete all left over job directories
84        PluginCache.Instance.CleanPluginTemp();
85        clientCom.LogMessage("Hive Slave started");
86
87        wcfService = WcfService.Instance;
88        RegisterServiceEvents();
89
90        StartHeartbeats(); // Start heartbeats thread       
91        DispatchMessageQueue(); // dispatch messages until abortRequested
92      }
93      catch (Exception ex) {
94        if (ServiceEventLog != null) {
95          try {
96            ServiceEventLog.WriteEntry("Hive Slave threw exception: " + ex.ToString() + " with stack trace: " + ex.StackTrace);
97          }
98          catch (Exception) { }
99        } else {
100          //try to log with clientCom. if this works the user sees at least a message,
101          //else an exception will be thrown anyways.
102          clientCom.LogMessage("Error on startup: " + ex.ToString() +
103            Environment.NewLine + "Core is going to shutdown.");
104        }
105      }
106      finally {
107        DeRegisterServiceEvents();
108        waitShutdownSem.Release();
109      }
110    }
111
112    private void StartHeartbeats() {
113      //Initialize the heartbeat     
114      if (heartbeatManager == null) {
115        heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
116        heartbeatManager.StartHeartbeat();
117      }
118    }
119
120    private void DispatchMessageQueue() {
121      MessageQueue queue = MessageQueue.GetInstance();
122      while (!abortRequested) {
123        MessageContainer container = queue.GetMessage();
124        DetermineAction(container);
125        clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
126      }
127    }
128
129    private void RegisterServiceEvents() {
130      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
131      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
132    }
133
134    private void DeRegisterServiceEvents() {
135      WcfService.Instance.Connected -= WcfService_Connected;
136      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
137    }
138
139    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
140      clientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
141    }
142
143    void WcfService_Connected(object sender, EventArgs e) {
144      clientCom.LogMessage("Connected successfully to Hive server");
145    }
146
147    /// <summary>
148    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
149    /// </summary>
150    /// <param name="container">The container, containing the message</param>
151    private void DetermineAction(MessageContainer container) {
152      clientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
153
154      if (container is ExecutorMessageContainer<Guid>) {
155        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
156        c.execute();
157      } else if (container is MessageContainer) {
158        switch (container.Message) {
159          case MessageContainer.MessageType.CalculateJob:
160            Task.Factory.StartNew((jobIdObj) => {
161              Guid jobId = (Guid)jobIdObj;
162              Job job = wcfService.GetJob(jobId);
163              if (job == null) throw new JobNotFoundException(jobId);
164              lock (engines) {
165                if (!jobs.ContainsKey(job.Id)) {
166                  jobs.Add(job.Id, job);
167                }
168              }
169              JobData jobData = wcfService.GetJobData(job.Id);
170              if (jobData == null) throw new JobDataNotFoundException(jobId);
171              SlaveStatusInfo.JobsFetched++;
172              job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
173              if (job == null) throw new JobNotFoundException(jobId);
174              StartJobInAppDomain(job, jobData);
175            }, container.JobId)
176            .ContinueWith((t) => {
177              // handle exception of task
178              clientCom.LogMessage(t.Exception.ToString());
179            }, TaskContinuationOptions.OnlyOnFaulted);
180            break;
181          case MessageContainer.MessageType.ShutdownSlave:
182            ShutdownCore();
183            break;
184          case MessageContainer.MessageType.StopAll:
185            DoStopAll();
186            break;
187          case MessageContainer.MessageType.PauseAll:
188            DoPauseAll();
189            break;
190          case MessageContainer.MessageType.AbortAll:
191            DoAbortAll();
192            break;
193          case MessageContainer.MessageType.AbortJob:
194            SlaveStatusInfo.JobsAborted++;
195            KillAppDomain(container.JobId);
196            break;
197          case MessageContainer.MessageType.StopJob:
198            DoStopJob(container.JobId);
199            break;
200          case MessageContainer.MessageType.PauseJob:
201            DoPauseJob(container.JobId);
202            break;
203          case MessageContainer.MessageType.Restart:
204            DoStartSlave();
205            break;
206          case MessageContainer.MessageType.Sleep:
207            Sleep();
208            break;
209          case MessageContainer.MessageType.SayHello:
210            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
211            break;
212        }
213      } else {
214        clientCom.LogMessage("Unknown MessageContainer: " + container);
215      }
216    }
217
218    private void DoPauseJob(Guid jobId) {
219      if (!Jobs.ContainsKey(jobId)) {
220        clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId);
221      } else {
222        Job job = Jobs[jobId];
223
224        if (job != null) {
225          engines[job.Id].Pause();
226          JobData sJob = engines[job.Id].GetPausedJob();
227          job.ExecutionTime = engines[job.Id].ExecutionTime;
228
229          try {
230            if (engines[job.Id].CurrentException != string.Empty) {
231              wcfService.UpdateJobState(job.Id, JobState.Failed, engines[job.Id].CurrentException);
232              SlaveStatusInfo.JobsAborted++;
233            } else {
234              SlaveStatusInfo.JobsProcessed++;
235            }
236            clientCom.LogMessage("Sending the paused job with id: " + job.Id);
237            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
238          }
239          catch (Exception e) {
240            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
241          }
242          finally {
243            KillAppDomain(job.Id); // kill app-domain in every case         
244          }
245        }
246      }
247    }
248
249    private void DoStopJob(Guid jobId) {
250      if (!Jobs.ContainsKey(jobId)) {
251        clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId);
252      } else {
253        Job job = Jobs[jobId];
254
255        if (job != null) {
256          engines[job.Id].Stop();
257          JobData sJob = engines[job.Id].GetFinishedJob();
258          job.ExecutionTime = engines[job.Id].ExecutionTime;
259
260
261          try {
262            if (engines[job.Id].CurrentException != string.Empty) {
263              wcfService.UpdateJobState(job.Id, JobState.Failed, engines[job.Id].CurrentException);
264            }
265            SlaveStatusInfo.JobsAborted++;
266
267            clientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
268            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted);
269          }
270          catch (Exception e) {
271            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
272          }
273          finally {
274            KillAppDomain(job.Id); // kill app-domain in every case         
275          }
276        }
277      }
278    }
279
280    /// <summary>
281    /// aborts all running jobs, no results are sent back
282    /// </summary>
283    private void DoAbortAll() {
284      List<Guid> guids = new List<Guid>();
285      foreach (Guid job in Jobs.Keys) {
286        guids.Add(job);
287      }
288
289      foreach (Guid g in guids) {
290        KillAppDomain(g);
291      }
292
293      clientCom.LogMessage("Aborted all jobs!");
294    }
295
296    /// <summary>
297    /// wait for jobs to finish, then pause client
298    /// </summary>
299    private void DoPauseAll() {
300      clientCom.LogMessage("Pause all received");
301
302      //copy guids because there will be removed items from 'Jobs'
303      List<Guid> guids = new List<Guid>();
304      foreach (Guid job in Jobs.Keys) {
305        guids.Add(job);
306      }
307
308      foreach (Guid g in guids) {
309        DoPauseJob(g);
310      }
311    }
312
313    /// <summary>
314    /// pause slave immediately
315    /// </summary>
316    private void DoStopAll() {
317      clientCom.LogMessage("Stop all received");
318
319      //copy guids because there will be removed items from 'Jobs'
320      List<Guid> guids = new List<Guid>();
321      foreach (Guid job in Jobs.Keys) {
322        guids.Add(job);
323      }
324
325      foreach (Guid g in guids) {
326        DoStopJob(g);
327      }
328    }
329
330    /// <summary>
331    /// completly shudown slave
332    /// </summary>
333    public void Shutdown() {
334      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
335      MessageQueue.GetInstance().AddMessage(mc);
336      waitShutdownSem.WaitOne();
337    }
338
339    /// <summary>
340    /// complete shutdown, should be called before the the application is exited
341    /// </summary>
342    private void ShutdownCore() {
343      clientCom.LogMessage("Shutdown Signal received");
344      clientCom.LogMessage("Stopping heartbeat");
345      heartbeatManager.StopHeartBeat();
346      abortRequested = true;
347      clientCom.LogMessage("Logging out");
348
349
350      lock (engines) {
351        clientCom.LogMessage("engines locked");
352        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
353          clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
354          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
355          AppDomain.Unload(kvp.Value);
356        }
357      }
358      WcfService.Instance.Disconnect();
359      clientCom.Shutdown();
360      SlaveClientCom.Close();
361
362      if (slaveComm.State != CommunicationState.Closed)
363        slaveComm.Close();
364    }
365
366    /// <summary>
367    /// reinitializes everything and continues operation,
368    /// can be called after Sleep()
369    /// </summary> 
370    private void DoStartSlave() {
371      clientCom.LogMessage("Restart received");
372      StartHeartbeats();
373      clientCom.LogMessage("Restart done");
374    }
375
376    /// <summary>
377    /// stop slave, except for client gui communication,
378    /// primarily used by gui if core is running as windows service
379    /// </summary>   
380    private void Sleep() {
381      clientCom.LogMessage("Sleep received");
382      heartbeatManager.StopHeartBeat();
383      heartbeatManager = null;
384      DoStopAll();
385      WcfService.Instance.Disconnect();
386      clientCom.LogMessage("Sleep done");
387    }
388
389    /// <summary>
390    /// Pauses a job, which means sending it to the server and killing it locally;
391    /// atm only used when executor is waiting for child jobs
392    /// </summary>
393    public void PauseWaitJob(JobData data) {
394      if (!Jobs.ContainsKey(data.JobId)) {
395        clientCom.LogMessage("Can't find job with id " + data.JobId);
396      } else {
397        Job job = Jobs[data.JobId];
398        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
399        wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
400      }
401      KillAppDomain(data.JobId);
402    }
403
404    /// <summary>
405    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
406    /// once the connection gets reestablished, the job gets submitted
407    /// </summary>
408    public void SendFinishedJob(Guid jobId) {
409      try {
410        clientCom.LogMessage("Getting the finished job with id: " + jobId);
411        if (!engines.ContainsKey(jobId)) {
412          clientCom.LogMessage("Engine doesn't exist");
413          return;
414        }
415        if (!jobs.ContainsKey(jobId)) {
416          clientCom.LogMessage("Job doesn't exist");
417          return;
418        }
419        Job cJob = jobs[jobId];
420        cJob.ExecutionTime = engines[jobId].ExecutionTime;
421
422        if (engines[jobId].Aborted) {
423          SlaveStatusInfo.JobsAborted++;
424        } else {
425          SlaveStatusInfo.JobsProcessed++;
426        }
427
428        if (engines[jobId].CurrentException != string.Empty) {
429          wcfService.UpdateJobState(jobId, JobState.Failed, engines[jobId].CurrentException);
430        }
431
432        JobData sJob = engines[jobId].GetFinishedJob();
433        try {
434          clientCom.LogMessage("Sending the finished job with id: " + jobId);
435          wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
436        }
437        catch (Exception e) {
438          clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
439        }
440        finally {
441          KillAppDomain(jobId);
442          heartbeatManager.AwakeHeartBeatThread();
443        }
444      }
445      catch (Exception e) {
446        OnExceptionOccured(e);
447      }
448    }
449
450    private static object locker = new object();
451
452    /// <summary>
453    /// A new Job from the wcfService has been received and will be started within a AppDomain.
454    /// </summary>   
455    private void StartJobInAppDomain(Job myJob, JobData jobData) {
456      clientCom.LogMessage("Received new job with id " + myJob.Id);
457      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
458
459      lock (locker) {
460        if (engines.ContainsKey(myJob.Id)) {
461          clientCom.LogMessage("Job with key " + myJob.Id + " already exists. Job will be ignored.");
462          return;
463        }
464
465        String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
466        bool pluginsPrepared = false;
467        string configFileName = string.Empty;
468
469        try {
470          PluginCache.Instance.PreparePlugins(myJob, out configFileName);
471          clientCom.LogMessage("Plugins fetched for job " + myJob.Id);
472          pluginsPrepared = true;
473        }
474        catch (Exception exception) {
475          clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
476          lock (engines) {
477            if (jobs.ContainsKey(myJob.Id)) {
478              jobs.Remove(myJob.Id);
479            }
480          }
481        }
482
483        if (pluginsPrepared) {
484          try {
485            AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
486            appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
487            Executor engine;
488            lock (engines) {
489              appDomains.Add(myJob.Id, appDomain);
490              clientCom.LogMessage("Creating AppDomain");
491              engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
492              clientCom.LogMessage("Created AppDomain");
493              engine.JobId = myJob.Id;
494              engine.Core = this;
495              clientCom.LogMessage("Starting Engine for job " + myJob.Id);
496              engines.Add(myJob.Id, engine);
497            }
498            engine.Start(jobData.Data);
499          }
500          catch (Exception exception) {
501            clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
502            clientCom.LogMessage("Error thrown is: " + exception.ToString());
503            KillAppDomain(myJob.Id);
504          }
505        }
506      }
507      heartbeatManager.AwakeHeartBeatThread();
508    }
509
510    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
511    private void OnExceptionOccured(Exception e) {
512      clientCom.LogMessage("Error: " + e.ToString());
513      var handler = ExceptionOccured;
514      if (handler != null) handler(this, new EventArgs<Exception>(e));
515    }
516
517    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
518      clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
519      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
520    }
521
522    /// <summary>
523    /// Enqueues messages from the executor to the message queue.
524    /// This is necessary if the core thread has to execute certain actions, e.g.
525    /// killing of an app domain.
526    /// </summary>   
527    /// <returns>true if the calling method can continue execution, else false</returns>
528    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
529      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
530      container.Callback = action;
531      container.CallbackParameter = parameter;
532      MessageQueue.GetInstance().AddMessage(container);
533    }
534
535    /// <summary>
536    /// Kill a appdomain with a specific id.
537    /// </summary>
538    /// <param name="id">the GUID of the job</param>   
539    public void KillAppDomain(Guid id) {
540      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
541        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
542        return;
543      }
544
545      clientCom.LogMessage("Shutting down Appdomain for Job " + id);
546      lock (engines) {
547        try {
548          if (engines.ContainsKey(id)) {
549            engines[id].Dispose();
550            engines.Remove(id);
551          }
552
553          if (appDomains.ContainsKey(id)) {
554            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
555
556            int repeat = 5;
557            while (repeat > 0) {
558              try {
559                AppDomain.Unload(appDomains[id]);
560                repeat = 0;
561              }
562              catch (CannotUnloadAppDomainException) {
563                clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
564                Thread.Sleep(1000);
565                repeat--;
566                if (repeat == 0) {
567                  clientCom.LogMessage("Could not unload AppDomain, shutting down core...");
568                  throw; // rethrow and let app crash
569                }
570              }
571            }
572            appDomains.Remove(id);
573          }
574
575          jobs.Remove(id);
576          PluginCache.Instance.DeletePluginsForJob(id);
577          GC.Collect();
578        }
579        catch (Exception ex) {
580          clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
581        }
582      }
583      GC.Collect();
584      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
585    }
586  }
587}
Note: See TracBrowser for help on using the repository browser.