Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 6202

Last change on this file since 6202 was 6202, checked in by cneumuel, 13 years ago

#1233

  • stability fixes for slave
File size: 23.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Diagnostics;
25using System.IO;
26using System.Linq;
27using System.ServiceModel;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
31using HeuristicLab.Common;
32using HeuristicLab.Core;
33
34
35namespace HeuristicLab.Clients.Hive.SlaveCore {
36  /// <summary>
37  /// The core component of the Hive Slave.
38  /// Handles commands sent from the Hive Server.
39  /// </summary>
40  public class Core : MarshalByRefObject {
41    public EventLog ServiceEventLog { get; set; }
42
43    public static bool abortRequested { get; set; }
44    private Semaphore waitShutdownSem = new Semaphore(0, 1);
45    public static ILog Log { get; set; }
46
47    private Dictionary<Guid, Executor> executors = new Dictionary<Guid, Executor>();
48    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
49
50    // signalizes if the Executor.Start method has properly finished. only then the appdomain may be unloaded
51    private Dictionary<Guid, Semaphore> semaphores = new Dictionary<Guid, Semaphore>();
52
53    private WcfService wcfService;
54    private HeartbeatManager heartbeatManager;
55    private int coreThreadId;
56
57    private ISlaveCommunication clientCom;
58    private ServiceHost slaveComm;
59
60    public Dictionary<Guid, Executor> Executors {
61      get { return executors; }
62    }
63
64    public Core() { }
65
66    /// <summary>
67    /// Main Method for the client
68    /// </summary>
69    public void Start() {
70      coreThreadId = Thread.CurrentThread.ManagedThreadId;
71      abortRequested = false;
72
73      try {
74        ConfigManager manager = ConfigManager.Instance;
75        manager.Core = this;
76
77        //start the client communication service (pipe between slave and slave gui)
78        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
79        slaveComm.Open();
80        clientCom = SlaveClientCom.Instance.ClientCom;
81
82        // delete all left over job directories
83        PluginCache.Instance.CleanPluginTemp();
84        clientCom.LogMessage("Hive Slave started");
85
86        wcfService = WcfService.Instance;
87        RegisterServiceEvents();
88
89        StartHeartbeats(); // Start heartbeats thread       
90        DispatchMessageQueue(); // dispatch messages until abortRequested
91      }
92      catch (Exception ex) {
93        if (ServiceEventLog != null) {
94          try {
95            ServiceEventLog.WriteEntry("Hive Slave threw exception: " + ex.ToString() + " with stack trace: " + ex.StackTrace);
96          }
97          catch (Exception) { }
98        } else {
99          //try to log with clientCom. if this works the user sees at least a message,
100          //else an exception will be thrown anyways.
101          clientCom.LogMessage("Uncaught exception: " + ex.ToString() +
102            Environment.NewLine + "Core is going to shutdown.");
103        }
104        ShutdownCore();
105      }
106      finally {
107        DeRegisterServiceEvents();
108        waitShutdownSem.Release();
109      }
110    }
111
112    private void StartHeartbeats() {
113      //Initialize the heartbeat     
114      if (heartbeatManager == null) {
115        heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
116        heartbeatManager.StartHeartbeat();
117      }
118    }
119
120    private void DispatchMessageQueue() {
121      MessageQueue queue = MessageQueue.GetInstance();
122      while (!abortRequested) {
123        MessageContainer container = queue.GetMessage();
124        DetermineAction(container);
125        clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
126      }
127    }
128
129    private void RegisterServiceEvents() {
130      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
131      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
132    }
133
134    private void DeRegisterServiceEvents() {
135      WcfService.Instance.Connected -= WcfService_Connected;
136      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
137    }
138
139    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
140      clientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
141    }
142
143    void WcfService_Connected(object sender, EventArgs e) {
144      clientCom.LogMessage("Connected successfully to Hive server");
145    }
146
147    /// <summary>
148    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
149    /// </summary>
150    /// <param name="container">The container, containing the message</param>
151    private void DetermineAction(MessageContainer container) {
152      clientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
153
154      if (container is ExecutorMessageContainer<Guid>) {
155        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
156        c.execute();
157      } else if (container is MessageContainer) {
158        switch (container.Message) {
159          case MessageContainer.MessageType.CalculateJob:
160            Task.Factory.StartNew((jobIdObj) => {
161              Guid jobId = (Guid)jobIdObj;
162              Job job = wcfService.GetJob(jobId);
163              if (job == null) throw new JobNotFoundException(jobId);
164              JobData jobData = wcfService.GetJobData(job.Id);
165              if (jobData == null) throw new JobDataNotFoundException(jobId);
166              SlaveStatusInfo.JobsFetched++;
167              job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
168              if (job == null) throw new JobNotFoundException(jobId);
169              StartJobInAppDomain(job, jobData);
170            }, container.JobId)
171            .ContinueWith((t) => {
172              // handle exception of task
173              clientCom.LogMessage(t.Exception.ToString());
174              wcfService.UpdateJobState(container.JobId, JobState.Failed, t.Exception.ToString());
175              SlaveStatusInfo.JobsAborted++;
176            }, TaskContinuationOptions.OnlyOnFaulted);
177            break;
178          case MessageContainer.MessageType.ShutdownSlave:
179            ShutdownCore();
180            break;
181          case MessageContainer.MessageType.StopAll:
182            DoStopAll();
183            break;
184          case MessageContainer.MessageType.PauseAll:
185            DoPauseAll();
186            break;
187          case MessageContainer.MessageType.AbortAll:
188            DoAbortAll();
189            break;
190          case MessageContainer.MessageType.AbortJob:
191            SlaveStatusInfo.JobsAborted++;
192            KillAppDomain(container.JobId);
193            break;
194          case MessageContainer.MessageType.StopJob:
195            DoStopJob(container.JobId);
196            break;
197          case MessageContainer.MessageType.PauseJob:
198            DoPauseJob(container.JobId);
199            break;
200          case MessageContainer.MessageType.Restart:
201            DoStartSlave();
202            break;
203          case MessageContainer.MessageType.Sleep:
204            Sleep();
205            break;
206          case MessageContainer.MessageType.SayHello:
207            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
208            break;
209        }
210      } else {
211        clientCom.LogMessage("Unknown MessageContainer: " + container);
212      }
213    }
214
215    private void DoPauseJob(Guid jobId) {
216      if (!executors.ContainsKey(jobId)) {
217        clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId);
218      } else {
219        Job job = wcfService.GetJob(jobId);
220
221        if (job != null && executors.ContainsKey(job.Id)) {
222          executors[job.Id].Pause();
223          JobData sJob = executors[job.Id].GetPausedJob();
224          job.ExecutionTime = executors[job.Id].ExecutionTime;
225
226          try {
227            if (executors[job.Id].CurrentException != string.Empty) {
228              wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
229              SlaveStatusInfo.JobsAborted++;
230            } else {
231              SlaveStatusInfo.JobsProcessed++;
232            }
233            clientCom.LogMessage("Sending the paused job with id: " + job.Id);
234            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
235          }
236          catch (Exception e) {
237            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
238          }
239          finally {
240            KillAppDomain(job.Id); // kill app-domain in every case         
241          }
242        }
243      }
244    }
245
246    private void DoStopJob(Guid jobId) {
247      if (!executors.ContainsKey(jobId)) {
248        clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId);
249      } else {
250        Job job = wcfService.GetJob(jobId);
251
252        if (job != null) {
253          executors[job.Id].Stop();
254          JobData sJob = executors[job.Id].GetFinishedJob();
255          job.ExecutionTime = executors[job.Id].ExecutionTime;
256
257          try {
258            if (executors[job.Id].CurrentException != string.Empty) {
259              wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
260            }
261            SlaveStatusInfo.JobsAborted++;
262
263            clientCom.LogMessage("Sending the stopped job with id: " + job.Id);
264            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted);
265          }
266          catch (Exception e) {
267            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
268          }
269          finally {
270            KillAppDomain(job.Id); // kill app-domain in every case         
271          }
272        }
273      }
274    }
275
276    /// <summary>
277    /// aborts all running jobs, no results are sent back
278    /// </summary>
279    private void DoAbortAll() {
280      List<Guid> jobIds;
281      lock (executors) {
282        jobIds = new List<Guid>(executors.Keys);
283      }
284      foreach (Guid jobId in jobIds) {
285        KillAppDomain(jobId);
286      }
287      clientCom.LogMessage("Aborted all jobs!");
288    }
289
290    /// <summary>
291    /// wait for jobs to finish, then pause client
292    /// </summary>
293    private void DoPauseAll() {
294      clientCom.LogMessage("Pause all received");
295
296      //copy guids because there will be removed items from 'Jobs'
297      List<Guid> jobIds;
298      lock (executors) {
299        jobIds = new List<Guid>(Executors.Keys);
300      }
301
302      foreach (Guid jobId in jobIds) {
303        DoPauseJob(jobId);
304      }
305    }
306
307    /// <summary>
308    /// pause slave immediately
309    /// </summary>
310    private void DoStopAll() {
311      clientCom.LogMessage("Stop all received");
312
313      //copy guids because there will be removed items from 'Jobs'
314      List<Guid> jobIds;
315      lock (executors) {
316        jobIds = new List<Guid>(executors.Keys);
317      }
318
319      foreach (Guid jobId in jobIds) {
320        DoStopJob(jobId);
321      }
322    }
323
324    /// <summary>
325    /// completly shudown slave
326    /// </summary>
327    public void Shutdown() {
328      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
329      MessageQueue.GetInstance().AddMessage(mc);
330      waitShutdownSem.WaitOne();
331    }
332
333    /// <summary>
334    /// complete shutdown, should be called before the the application is exited
335    /// </summary>
336    private void ShutdownCore() {
337      clientCom.LogMessage("Shutdown Signal received");
338      clientCom.LogMessage("Stopping heartbeat");
339      heartbeatManager.StopHeartBeat();
340      abortRequested = true;
341      clientCom.LogMessage("Logging out");
342
343
344      lock (executors) {
345        clientCom.LogMessage("executors locked");
346        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
347          clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
348          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
349          AppDomain.Unload(kvp.Value);
350        }
351      }
352      WcfService.Instance.Disconnect();
353      clientCom.Shutdown();
354      SlaveClientCom.Close();
355
356      if (slaveComm.State != CommunicationState.Closed)
357        slaveComm.Close();
358    }
359
360    /// <summary>
361    /// reinitializes everything and continues operation,
362    /// can be called after Sleep()
363    /// </summary> 
364    private void DoStartSlave() {
365      clientCom.LogMessage("Restart received");
366      StartHeartbeats();
367      clientCom.LogMessage("Restart done");
368    }
369
370    /// <summary>
371    /// stop slave, except for client gui communication,
372    /// primarily used by gui if core is running as windows service
373    /// </summary>   
374    private void Sleep() {
375      clientCom.LogMessage("Sleep received");
376      heartbeatManager.StopHeartBeat();
377      heartbeatManager = null;
378      DoStopAll();
379      WcfService.Instance.Disconnect();
380      clientCom.LogMessage("Sleep done");
381    }
382
383    /// <summary>
384    /// Pauses a job, which means sending it to the server and killing it locally;
385    /// atm only used when executor is waiting for child jobs
386    /// </summary>
387    public void PauseWaitJob(JobData data) {
388      if (!Executors.ContainsKey(data.JobId)) {
389        clientCom.LogMessage("Can't find job with id " + data.JobId);
390      } else {
391        Job job = wcfService.GetJob(data.JobId);
392        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
393        wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
394      }
395      KillAppDomain(data.JobId);
396    }
397
398    /// <summary>
399    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
400    /// once the connection gets reestablished, the job gets submitted
401    /// </summary>
402    public void SendFinishedJob(Guid jobId) {
403      try {
404        clientCom.LogMessage("Getting the finished job with id: " + jobId);
405        if (!executors.ContainsKey(jobId)) {
406          clientCom.LogMessage("Executor doesn't exist");
407          return;
408        }
409        if (!executors.ContainsKey(jobId)) {
410          clientCom.LogMessage("Job doesn't exist");
411          return;
412        }
413        Job job = wcfService.GetJob(jobId);
414        job.ExecutionTime = executors[jobId].ExecutionTime;
415
416        if (executors[jobId].Aborted) {
417          SlaveStatusInfo.JobsAborted++;
418        } else {
419          SlaveStatusInfo.JobsProcessed++;
420        }
421
422        if (executors[jobId].CurrentException != string.Empty) {
423          wcfService.UpdateJobState(jobId, JobState.Failed, executors[jobId].CurrentException);
424        }
425
426        JobData sJob = executors[jobId].GetFinishedJob();
427        try {
428          clientCom.LogMessage("Sending the finished job with id: " + jobId);
429          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
430        }
431        catch (Exception e) {
432          clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
433        }
434        finally {
435          KillAppDomain(jobId);
436          heartbeatManager.AwakeHeartBeatThread();
437        }
438      }
439      catch (Exception e) {
440        OnExceptionOccured(e);
441      }
442    }
443
444    private static object startInAppDomainLocker = new object();
445    private static Mutex startInAppDomainMutex = new Mutex();
446    /// <summary>
447    /// A new Job from the wcfService has been received and will be started within a AppDomain.
448    /// </summary>   
449    private void StartJobInAppDomain(Job job, JobData jobData) {
450      clientCom.LogMessage("Received new job with id " + job.Id);
451      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
452
453      startInAppDomainMutex.WaitOne(); // mutex is used instead of lock to be able to release it before executor.Start() is called (which may take long time)
454      bool released = false;
455
456        if (executors.ContainsKey(job.Id)) {
457          clientCom.LogMessage("Job with key " + job.Id + " already exists. Job will be ignored.");
458          return;
459        }
460
461        String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, job.Id.ToString());
462        bool pluginsPrepared = false;
463        string configFileName = string.Empty;
464
465        try {
466          PluginCache.Instance.PreparePlugins(job, out configFileName);
467          clientCom.LogMessage("Plugins fetched for job " + job.Id);
468          pluginsPrepared = true;
469        }
470        catch (Exception exception) {
471          clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", job.Id, exception));
472          wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
473          SlaveStatusInfo.JobsAborted++;
474        }
475
476        if (pluginsPrepared) {
477          try {
478            //TODO: switch back to unprivileged sandbox
479            AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitPrivilegedSandbox(job.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
480            appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
481            Executor executor;
482            appDomains.Add(job.Id, appDomain);
483            clientCom.LogMessage("Creating AppDomain");
484            executor = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
485            clientCom.LogMessage("Created AppDomain");
486            executor.Core = this;
487            executor.JobId = job.Id;
488            executor.CoresNeeded = job.CoresNeeded;
489            executor.MemoryNeeded = job.MemoryNeeded;
490            clientCom.LogMessage("Starting Executor for job " + job.Id);
491            lock (executors) {
492              if (executors.ContainsKey(job.Id)) {
493                throw new JobAlreadyExistsException(job.Id);
494              }
495              executors.Add(job.Id, executor);
496            }
497            startInAppDomainMutex.ReleaseMutex();
498            released = true;
499            semaphores[job.Id] = new Semaphore(0, 1);
500            executor.Start(jobData.Data);
501            semaphores[job.Id].Release();
502          }
503          catch (JobAlreadyExistsException e) {
504            clientCom.LogMessage(string.Format("Job {0} has already been started. Job will be ignored", e.JobId));
505          }
506          catch (Exception exception) {
507            clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + job.Id);
508            clientCom.LogMessage("Error thrown is: " + exception.ToString());
509
510            if (executors.ContainsKey(job.Id) && executors[job.Id].CurrentException != string.Empty) {
511              wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
512            } else {
513              wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
514            }
515            SlaveStatusInfo.JobsAborted++;
516
517            KillAppDomain(job.Id);
518          }
519
520          if (!released)
521            startInAppDomainMutex.ReleaseMutex();
522        }
523      heartbeatManager.AwakeHeartBeatThread();
524    }
525
526    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
527    private void OnExceptionOccured(Exception e) {
528      clientCom.LogMessage("Error: " + e.ToString());
529      var handler = ExceptionOccured;
530      if (handler != null) handler(this, new EventArgs<Exception>(e));
531    }
532
533    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
534      clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
535      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
536    }
537
538    /// <summary>
539    /// Enqueues messages from the executor to the message queue.
540    /// This is necessary if the core thread has to execute certain actions, e.g.
541    /// killing of an app domain.
542    /// </summary>   
543    /// <returns>true if the calling method can continue execution, else false</returns>
544    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
545      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
546      container.Callback = action;
547      container.CallbackParameter = parameter;
548      MessageQueue.GetInstance().AddMessage(container);
549    }
550
551    /// <summary>
552    /// Kill a appdomain with a specific id.
553    /// </summary>
554    /// <param name="jobId">the GUID of the job</param>   
555    public void KillAppDomain(Guid jobId) {
556      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
557        EnqueueExecutorMessage<Guid>(KillAppDomain, jobId);
558        return;
559      }
560
561      clientCom.LogMessage("Shutting down Appdomain for Job " + jobId);
562      lock (executors) {
563        try {
564          if (executors.ContainsKey(jobId)) {
565            executors[jobId].Dispose();
566            executors.Remove(jobId);
567          }
568
569          if (appDomains.ContainsKey(jobId)) {
570            appDomains[jobId].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
571            int repeat = 5;
572            while (repeat > 0) {
573              try {
574                semaphores[jobId].WaitOne();
575                AppDomain.Unload(appDomains[jobId]);
576                semaphores[jobId].Dispose();
577                semaphores.Remove(jobId);
578                repeat = 0;
579              }
580              catch (CannotUnloadAppDomainException) {
581                clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
582                Thread.Sleep(1000);
583                repeat--;
584                if (repeat == 0) {
585                  clientCom.LogMessage("Could not unload AppDomain, shutting down core...");
586                  throw; // rethrow and let app crash
587                }
588              }
589            }
590            appDomains.Remove(jobId);
591          }
592
593          PluginCache.Instance.DeletePluginsForJob(jobId);
594          GC.Collect();
595        }
596        catch (Exception ex) {
597          clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
598        }
599      }
600      GC.Collect();
601      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
602    }
603
604    public override object InitializeLifetimeService() {
605      return null; // avoid destruction of proxy object after 5 minutes
606    }
607
608    public int GetCoresNeeded() {
609      lock (executors) {
610        return executors.Sum(x => x.Value.CoresNeeded);
611      }
612    }
613  }
614}
Note: See TracBrowser for help on using the repository browser.