Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 6175

Last change on this file since 6175 was 6175, checked in by ascheibe, 13 years ago

#1233 temporary switch to privileged sandboxing until communication between core and executor works with sandboxing

File size: 22.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Diagnostics;
25using System.IO;
26using System.Linq;
27using System.ServiceModel;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
31using HeuristicLab.Common;
32using HeuristicLab.Core;
33
34
35namespace HeuristicLab.Clients.Hive.SlaveCore {
36  /// <summary>
37  /// The core component of the Hive Slave.
38  /// Handles commands sent from the Hive Server.
39  /// </summary>
40  public class Core : MarshalByRefObject {
41    public EventLog ServiceEventLog { get; set; }
42
43    public static bool abortRequested { get; set; }
44    private Semaphore waitShutdownSem = new Semaphore(0, 1);
45    public static ILog Log { get; set; }
46
47    private Dictionary<Guid, Executor> executors = new Dictionary<Guid, Executor>();
48    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
49
50    private WcfService wcfService;
51    private HeartbeatManager heartbeatManager;
52    private int coreThreadId;
53
54    private ISlaveCommunication clientCom;
55    private ServiceHost slaveComm;
56
57    public Dictionary<Guid, Executor> Executors {
58      get { return executors; }
59    }
60
61    public Core() { }
62
63    /// <summary>
64    /// Main Method for the client
65    /// </summary>
66    public void Start() {
67      coreThreadId = Thread.CurrentThread.ManagedThreadId;
68      abortRequested = false;
69
70      try {
71        ConfigManager manager = ConfigManager.Instance;
72        manager.Core = this;
73
74        //start the client communication service (pipe between slave and slave gui)
75        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
76        slaveComm.Open();
77        clientCom = SlaveClientCom.Instance.ClientCom;
78
79        // delete all left over job directories
80        PluginCache.Instance.CleanPluginTemp();
81        clientCom.LogMessage("Hive Slave started");
82
83        wcfService = WcfService.Instance;
84        RegisterServiceEvents();
85
86        StartHeartbeats(); // Start heartbeats thread       
87        DispatchMessageQueue(); // dispatch messages until abortRequested
88      }
89      catch (Exception ex) {
90        if (ServiceEventLog != null) {
91          try {
92            ServiceEventLog.WriteEntry("Hive Slave threw exception: " + ex.ToString() + " with stack trace: " + ex.StackTrace);
93          }
94          catch (Exception) { }
95        } else {
96          //try to log with clientCom. if this works the user sees at least a message,
97          //else an exception will be thrown anyways.
98          clientCom.LogMessage("Uncaught exception: " + ex.ToString() +
99            Environment.NewLine + "Core is going to shutdown.");
100        }
101        ShutdownCore();
102      }
103      finally {
104        DeRegisterServiceEvents();
105        waitShutdownSem.Release();
106      }
107    }
108
109    private void StartHeartbeats() {
110      //Initialize the heartbeat     
111      if (heartbeatManager == null) {
112        heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
113        heartbeatManager.StartHeartbeat();
114      }
115    }
116
117    private void DispatchMessageQueue() {
118      MessageQueue queue = MessageQueue.GetInstance();
119      while (!abortRequested) {
120        MessageContainer container = queue.GetMessage();
121        DetermineAction(container);
122        clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
123      }
124    }
125
126    private void RegisterServiceEvents() {
127      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
128      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
129    }
130
131    private void DeRegisterServiceEvents() {
132      WcfService.Instance.Connected -= WcfService_Connected;
133      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
134    }
135
136    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
137      clientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
138    }
139
140    void WcfService_Connected(object sender, EventArgs e) {
141      clientCom.LogMessage("Connected successfully to Hive server");
142    }
143
144    /// <summary>
145    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
146    /// </summary>
147    /// <param name="container">The container, containing the message</param>
148    private void DetermineAction(MessageContainer container) {
149      clientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
150
151      if (container is ExecutorMessageContainer<Guid>) {
152        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
153        c.execute();
154      } else if (container is MessageContainer) {
155        switch (container.Message) {
156          case MessageContainer.MessageType.CalculateJob:
157            Task.Factory.StartNew((jobIdObj) => {
158              Guid jobId = (Guid)jobIdObj;
159              Job job = wcfService.GetJob(jobId);
160              if (job == null) throw new JobNotFoundException(jobId);
161              JobData jobData = wcfService.GetJobData(job.Id);
162              if (jobData == null) throw new JobDataNotFoundException(jobId);
163              SlaveStatusInfo.JobsFetched++;
164              job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
165              if (job == null) throw new JobNotFoundException(jobId);
166              StartJobInAppDomain(job, jobData);
167            }, container.JobId)
168            .ContinueWith((t) => {
169              // handle exception of task
170              clientCom.LogMessage(t.Exception.ToString());
171              wcfService.UpdateJobState(container.JobId, JobState.Failed, t.Exception.ToString());
172              SlaveStatusInfo.JobsAborted++;
173            }, TaskContinuationOptions.OnlyOnFaulted);
174            break;
175          case MessageContainer.MessageType.ShutdownSlave:
176            ShutdownCore();
177            break;
178          case MessageContainer.MessageType.StopAll:
179            DoStopAll();
180            break;
181          case MessageContainer.MessageType.PauseAll:
182            DoPauseAll();
183            break;
184          case MessageContainer.MessageType.AbortAll:
185            DoAbortAll();
186            break;
187          case MessageContainer.MessageType.AbortJob:
188            SlaveStatusInfo.JobsAborted++;
189            KillAppDomain(container.JobId);
190            break;
191          case MessageContainer.MessageType.StopJob:
192            DoStopJob(container.JobId);
193            break;
194          case MessageContainer.MessageType.PauseJob:
195            DoPauseJob(container.JobId);
196            break;
197          case MessageContainer.MessageType.Restart:
198            DoStartSlave();
199            break;
200          case MessageContainer.MessageType.Sleep:
201            Sleep();
202            break;
203          case MessageContainer.MessageType.SayHello:
204            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
205            break;
206        }
207      } else {
208        clientCom.LogMessage("Unknown MessageContainer: " + container);
209      }
210    }
211
212    private void DoPauseJob(Guid jobId) {
213      if (!executors.ContainsKey(jobId)) {
214        clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId);
215      } else {
216        Job job = wcfService.GetJob(jobId);
217
218        if (job != null && executors.ContainsKey(job.Id)) {
219          executors[job.Id].Pause();
220          JobData sJob = executors[job.Id].GetPausedJob();
221          job.ExecutionTime = executors[job.Id].ExecutionTime;
222
223          try {
224            if (executors[job.Id].CurrentException != string.Empty) {
225              wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
226              SlaveStatusInfo.JobsAborted++;
227            } else {
228              SlaveStatusInfo.JobsProcessed++;
229            }
230            clientCom.LogMessage("Sending the paused job with id: " + job.Id);
231            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
232          }
233          catch (Exception e) {
234            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
235          }
236          finally {
237            KillAppDomain(job.Id); // kill app-domain in every case         
238          }
239        }
240      }
241    }
242
243    private void DoStopJob(Guid jobId) {
244      if (!executors.ContainsKey(jobId)) {
245        clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId);
246      } else {
247        Job job = wcfService.GetJob(jobId);
248
249        if (job != null) {
250          executors[job.Id].Stop();
251          JobData sJob = executors[job.Id].GetFinishedJob();
252          job.ExecutionTime = executors[job.Id].ExecutionTime;
253
254          try {
255            if (executors[job.Id].CurrentException != string.Empty) {
256              wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
257            }
258            SlaveStatusInfo.JobsAborted++;
259
260            clientCom.LogMessage("Sending the stopped job with id: " + job.Id);
261            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted);
262          }
263          catch (Exception e) {
264            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
265          }
266          finally {
267            KillAppDomain(job.Id); // kill app-domain in every case         
268          }
269        }
270      }
271    }
272
273    /// <summary>
274    /// aborts all running jobs, no results are sent back
275    /// </summary>
276    private void DoAbortAll() {
277      List<Guid> jobIds;
278      lock (executors) {
279        jobIds = new List<Guid>(executors.Keys);
280      }
281      foreach (Guid jobId in jobIds) {
282        KillAppDomain(jobId);
283      }
284      clientCom.LogMessage("Aborted all jobs!");
285    }
286
287    /// <summary>
288    /// wait for jobs to finish, then pause client
289    /// </summary>
290    private void DoPauseAll() {
291      clientCom.LogMessage("Pause all received");
292
293      //copy guids because there will be removed items from 'Jobs'
294      List<Guid> jobIds;
295      lock (executors) {
296        jobIds = new List<Guid>(Executors.Keys);
297      }
298
299      foreach (Guid jobId in jobIds) {
300        DoPauseJob(jobId);
301      }
302    }
303
304    /// <summary>
305    /// pause slave immediately
306    /// </summary>
307    private void DoStopAll() {
308      clientCom.LogMessage("Stop all received");
309
310      //copy guids because there will be removed items from 'Jobs'
311      List<Guid> jobIds;
312      lock (executors) {
313        jobIds = new List<Guid>(executors.Keys);
314      }
315
316      foreach (Guid jobId in jobIds) {
317        DoStopJob(jobId);
318      }
319    }
320
321    /// <summary>
322    /// completly shudown slave
323    /// </summary>
324    public void Shutdown() {
325      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
326      MessageQueue.GetInstance().AddMessage(mc);
327      waitShutdownSem.WaitOne();
328    }
329
330    /// <summary>
331    /// complete shutdown, should be called before the the application is exited
332    /// </summary>
333    private void ShutdownCore() {
334      clientCom.LogMessage("Shutdown Signal received");
335      clientCom.LogMessage("Stopping heartbeat");
336      heartbeatManager.StopHeartBeat();
337      abortRequested = true;
338      clientCom.LogMessage("Logging out");
339
340
341      lock (executors) {
342        clientCom.LogMessage("executors locked");
343        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
344          clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
345          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
346          AppDomain.Unload(kvp.Value);
347        }
348      }
349      WcfService.Instance.Disconnect();
350      clientCom.Shutdown();
351      SlaveClientCom.Close();
352
353      if (slaveComm.State != CommunicationState.Closed)
354        slaveComm.Close();
355    }
356
357    /// <summary>
358    /// reinitializes everything and continues operation,
359    /// can be called after Sleep()
360    /// </summary> 
361    private void DoStartSlave() {
362      clientCom.LogMessage("Restart received");
363      StartHeartbeats();
364      clientCom.LogMessage("Restart done");
365    }
366
367    /// <summary>
368    /// stop slave, except for client gui communication,
369    /// primarily used by gui if core is running as windows service
370    /// </summary>   
371    private void Sleep() {
372      clientCom.LogMessage("Sleep received");
373      heartbeatManager.StopHeartBeat();
374      heartbeatManager = null;
375      DoStopAll();
376      WcfService.Instance.Disconnect();
377      clientCom.LogMessage("Sleep done");
378    }
379
380    /// <summary>
381    /// Pauses a job, which means sending it to the server and killing it locally;
382    /// atm only used when executor is waiting for child jobs
383    /// </summary>
384    public void PauseWaitJob(JobData data) {
385      if (!Executors.ContainsKey(data.JobId)) {
386        clientCom.LogMessage("Can't find job with id " + data.JobId);
387      } else {
388        Job job = wcfService.GetJob(data.JobId);
389        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
390        wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
391      }
392      KillAppDomain(data.JobId);
393    }
394
395    /// <summary>
396    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
397    /// once the connection gets reestablished, the job gets submitted
398    /// </summary>
399    public void SendFinishedJob(Guid jobId) {
400      try {
401        clientCom.LogMessage("Getting the finished job with id: " + jobId);
402        if (!executors.ContainsKey(jobId)) {
403          clientCom.LogMessage("Executor doesn't exist");
404          return;
405        }
406        if (!executors.ContainsKey(jobId)) {
407          clientCom.LogMessage("Job doesn't exist");
408          return;
409        }
410        Job job = wcfService.GetJob(jobId);
411        job.ExecutionTime = executors[jobId].ExecutionTime;
412
413        if (executors[jobId].Aborted) {
414          SlaveStatusInfo.JobsAborted++;
415        } else {
416          SlaveStatusInfo.JobsProcessed++;
417        }
418
419        if (executors[jobId].CurrentException != string.Empty) {
420          wcfService.UpdateJobState(jobId, JobState.Failed, executors[jobId].CurrentException);
421        }
422
423        JobData sJob = executors[jobId].GetFinishedJob();
424        try {
425          clientCom.LogMessage("Sending the finished job with id: " + jobId);
426          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
427        }
428        catch (Exception e) {
429          clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
430        }
431        finally {
432          KillAppDomain(jobId);
433          heartbeatManager.AwakeHeartBeatThread();
434        }
435      }
436      catch (Exception e) {
437        OnExceptionOccured(e);
438      }
439    }
440
441    private static object startInAppDomainLocker = new object();
442
443    /// <summary>
444    /// A new Job from the wcfService has been received and will be started within a AppDomain.
445    /// </summary>   
446    private void StartJobInAppDomain(Job job, JobData jobData) {
447      clientCom.LogMessage("Received new job with id " + job.Id);
448      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
449
450      lock (startInAppDomainLocker) {
451        if (executors.ContainsKey(job.Id)) {
452          clientCom.LogMessage("Job with key " + job.Id + " already exists. Job will be ignored.");
453          return;
454        }
455
456        String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, job.Id.ToString());
457        bool pluginsPrepared = false;
458        string configFileName = string.Empty;
459
460        try {
461          PluginCache.Instance.PreparePlugins(job, out configFileName);
462          clientCom.LogMessage("Plugins fetched for job " + job.Id);
463          pluginsPrepared = true;
464        }
465        catch (Exception exception) {
466          clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", job.Id, exception));
467          wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
468          SlaveStatusInfo.JobsAborted++;
469        }
470
471        if (pluginsPrepared) {
472          try {
473            //TODO: switch back to unprivileged sandbox
474            AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitPrivilegedSandbox(job.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
475            appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
476            Executor executor;
477            appDomains.Add(job.Id, appDomain);
478            clientCom.LogMessage("Creating AppDomain");
479            executor = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
480            clientCom.LogMessage("Created AppDomain");
481            executor.Core = this;
482            executor.JobId = job.Id;
483            executor.CoresNeeded = job.CoresNeeded;
484            executor.MemoryNeeded = job.MemoryNeeded;
485            clientCom.LogMessage("Starting Executor for job " + job.Id);
486            executor.Start(jobData.Data);
487            lock (executors) {
488              executors.Add(job.Id, executor);
489            }
490          }
491          catch (Exception exception) {
492            clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + job.Id);
493            clientCom.LogMessage("Error thrown is: " + exception.ToString());
494
495            if (executors.ContainsKey(job.Id) && executors[job.Id].CurrentException != string.Empty) {
496              wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
497            } else {
498              wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
499            }
500            SlaveStatusInfo.JobsAborted++;
501
502            KillAppDomain(job.Id);
503          }
504        }
505      }
506      heartbeatManager.AwakeHeartBeatThread();
507    }
508
509    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
510    private void OnExceptionOccured(Exception e) {
511      clientCom.LogMessage("Error: " + e.ToString());
512      var handler = ExceptionOccured;
513      if (handler != null) handler(this, new EventArgs<Exception>(e));
514    }
515
516    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
517      clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
518      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
519    }
520
521    /// <summary>
522    /// Enqueues messages from the executor to the message queue.
523    /// This is necessary if the core thread has to execute certain actions, e.g.
524    /// killing of an app domain.
525    /// </summary>   
526    /// <returns>true if the calling method can continue execution, else false</returns>
527    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
528      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
529      container.Callback = action;
530      container.CallbackParameter = parameter;
531      MessageQueue.GetInstance().AddMessage(container);
532    }
533
534    /// <summary>
535    /// Kill a appdomain with a specific id.
536    /// </summary>
537    /// <param name="id">the GUID of the job</param>   
538    public void KillAppDomain(Guid id) {
539      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
540        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
541        return;
542      }
543
544      clientCom.LogMessage("Shutting down Appdomain for Job " + id);
545      lock (executors) {
546        try {
547          if (executors.ContainsKey(id)) {
548            executors[id].Dispose();
549            executors.Remove(id);
550          }
551
552          if (appDomains.ContainsKey(id)) {
553            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
554
555            int repeat = 5;
556            while (repeat > 0) {
557              try {
558                AppDomain.Unload(appDomains[id]);
559                repeat = 0;
560              }
561              catch (CannotUnloadAppDomainException) {
562                clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
563                Thread.Sleep(1000);
564                repeat--;
565                if (repeat == 0) {
566                  clientCom.LogMessage("Could not unload AppDomain, shutting down core...");
567                  throw; // rethrow and let app crash
568                }
569              }
570            }
571            appDomains.Remove(id);
572          }
573
574          PluginCache.Instance.DeletePluginsForJob(id);
575          GC.Collect();
576        }
577        catch (Exception ex) {
578          clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
579        }
580      }
581      GC.Collect();
582      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
583    }
584
585    public override object InitializeLifetimeService() {
586      return null; // avoid destruction of proxy object after 5 minutes
587    }
588
589    public int GetCoresNeeded() {
590      lock (executors) {
591        return executors.Sum(x => x.Value.CoresNeeded);
592      }
593    }
594  }
595}
Note: See TracBrowser for help on using the repository browser.