Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 6039

Last change on this file since 6039 was 6039, checked in by cneumuel, 13 years ago

#1233

  • small fixes
File size: 20.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Diagnostics;
25using System.IO;
26using System.ServiceModel;
27using System.Threading;
28using System.Threading.Tasks;
29using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    public EventLog ServiceEventLog { get; set; }
41
42    public static bool abortRequested { get; set; }
43    private Semaphore waitShutdownSem = new Semaphore(0, 1);
44    public static ILog Log { get; set; }
45
46    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
47    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
48    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
49
50    private WcfService wcfService;
51    private HeartbeatManager heartbeatManager;
52    private int coreThreadId;
53
54    private ISlaveCommunication clientCom;
55    private ServiceHost slaveComm;
56
57    public Dictionary<Guid, Executor> ExecutionEngines {
58      get { return engines; }
59    }
60
61    internal Dictionary<Guid, Job> Jobs {
62      get { return jobs; }
63    }
64
65    public Core() { }
66
67    /// <summary>
68    /// Main Method for the client
69    /// </summary>
70    public void Start() {
71      coreThreadId = Thread.CurrentThread.ManagedThreadId;
72      abortRequested = false;
73
74      try {
75        ConfigManager manager = ConfigManager.Instance;
76        manager.Core = this;
77
78        //start the client communication service (pipe between slave and slave gui)
79        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
80        slaveComm.Open();
81
82        clientCom = SlaveClientCom.Instance.ClientCom;
83        clientCom.LogMessage("Hive Slave started");
84
85        wcfService = WcfService.Instance;
86        RegisterServiceEvents();
87
88        StartHeartbeats(); // Start heartbeats thread       
89        DispatchMessageQueue(); // dispatch messages until abortRequested
90      }
91      catch (Exception ex) {
92        if (ServiceEventLog != null) {
93          try {
94            ServiceEventLog.WriteEntry("Hive Slave threw exception: " + ex.ToString() + " with stack trace: " + ex.StackTrace);
95          }
96          catch (Exception) { }
97        } else {
98          throw ex;
99        }
100      }
101      finally {
102        DeRegisterServiceEvents();
103        waitShutdownSem.Release();
104      }
105    }
106
107    private void StartHeartbeats() {
108      //Initialize the heartbeat     
109      if (heartbeatManager == null) {
110        heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
111        heartbeatManager.StartHeartbeat();
112      }
113    }
114
115    private void DispatchMessageQueue() {
116      MessageQueue queue = MessageQueue.GetInstance();
117      while (!abortRequested) {
118        MessageContainer container = queue.GetMessage();
119        DetermineAction(container);
120        clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
121      }
122    }
123
124    private void RegisterServiceEvents() {
125      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
126      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
127    }
128
129    private void DeRegisterServiceEvents() {
130      WcfService.Instance.Connected -= WcfService_Connected;
131      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
132    }
133
134    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
135      clientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
136    }
137
138    void WcfService_Connected(object sender, EventArgs e) {
139      clientCom.LogMessage("Connected successfully to Hive server");
140    }
141
142    /// <summary>
143    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
144    /// </summary>
145    /// <param name="container">The container, containing the message</param>
146    private void DetermineAction(MessageContainer container) {
147      clientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
148
149      if (container is ExecutorMessageContainer<Guid>) {
150        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
151        c.execute();
152      } else if (container is MessageContainer) {
153        switch (container.Message) {
154          case MessageContainer.MessageType.CalculateJob:
155            Task.Factory.StartNew((jobIdObj) => {
156              Guid jobId = (Guid)jobIdObj;
157              Job job = wcfService.GetJob(jobId);
158              if (job == null) throw new JobNotFoundException(jobId);
159              lock (engines) {
160                if (!jobs.ContainsKey(job.Id)) {
161                  jobs.Add(job.Id, job);
162                }
163              }
164              JobData jobData = wcfService.GetJobData(job.Id);
165              if (jobData == null) throw new JobDataNotFoundException(jobId);
166              SlaveStatusInfo.JobsFetched++;
167              job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
168              if (job == null) throw new JobNotFoundException(jobId);
169              StartJobInAppDomain(job, jobData);
170            }, container.JobId)
171            .ContinueWith((t) => {
172              // handle exception of task
173              clientCom.LogMessage(t.Exception.ToString());
174            }, TaskContinuationOptions.OnlyOnFaulted);
175            break;
176          case MessageContainer.MessageType.ShutdownSlave:
177            ShutdownCore();
178            break;
179          case MessageContainer.MessageType.StopAll:
180            DoStopAll();
181            break;
182          case MessageContainer.MessageType.PauseAll:
183            DoPauseAll();
184            break;
185          case MessageContainer.MessageType.AbortAll:
186            DoAbortAll();
187            break;
188          case MessageContainer.MessageType.AbortJob:
189            KillAppDomain(container.JobId);
190            break;
191          case MessageContainer.MessageType.StopJob:
192            DoStopJob(container.JobId);
193            break;
194          case MessageContainer.MessageType.PauseJob:
195            DoPauseJob(container.JobId);
196            break;
197          case MessageContainer.MessageType.Restart:
198            DoStartSlave();
199            break;
200          case MessageContainer.MessageType.Sleep:
201            Sleep();
202            break;
203          case MessageContainer.MessageType.SayHello:
204            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
205            break;
206        }
207      } else {
208        clientCom.LogMessage("Unknown MessageContainer: " + container);
209      }
210    }
211
212    private void DoPauseJob(Guid jobId) {
213      if (!Jobs.ContainsKey(jobId)) {
214        clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId);
215      } else {
216        Job job = Jobs[jobId];
217
218        if (job != null) {
219          engines[job.Id].Pause();
220          JobData sJob = engines[job.Id].GetPausedJob();
221          job.ExecutionTime = engines[job.Id].ExecutionTime;
222
223          try {
224            clientCom.LogMessage("Sending the paused job with id: " + job.Id);
225            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
226          }
227          catch (Exception e) {
228            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
229          }
230          finally {
231            KillAppDomain(job.Id); // kill app-domain in every case         
232          }
233        }
234      }
235    }
236
237    private void DoStopJob(Guid jobId) {
238      if (!Jobs.ContainsKey(jobId)) {
239        clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId);
240      } else {
241        Job job = Jobs[jobId];
242
243        if (job != null) {
244          engines[job.Id].Stop();
245          JobData sJob = engines[job.Id].GetFinishedJob();
246          job.ExecutionTime = engines[job.Id].ExecutionTime;
247
248          try {
249            clientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
250            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
251          }
252          catch (Exception e) {
253            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
254          }
255          finally {
256            KillAppDomain(job.Id); // kill app-domain in every case         
257          }
258        }
259      }
260    }
261
262    /// <summary>
263    /// aborts all running jobs, no results are sent back
264    /// </summary>
265    private void DoAbortAll() {
266      List<Guid> guids = new List<Guid>();
267      foreach (Guid job in Jobs.Keys) {
268        guids.Add(job);
269      }
270
271      foreach (Guid g in guids) {
272        KillAppDomain(g);
273      }
274
275      clientCom.LogMessage("Aborted all jobs!");
276    }
277
278    /// <summary>
279    /// wait for jobs to finish, then pause client
280    /// </summary>
281    private void DoPauseAll() {
282      clientCom.LogMessage("Pause all received");
283
284      //copy guids because there will be removed items from 'Jobs'
285      List<Guid> guids = new List<Guid>();
286      foreach (Guid job in Jobs.Keys) {
287        guids.Add(job);
288      }
289
290      foreach (Guid g in guids) {
291        DoPauseJob(g);
292      }
293    }
294
295    /// <summary>
296    /// pause slave immediately
297    /// </summary>
298    private void DoStopAll() {
299      clientCom.LogMessage("Stop all received");
300
301      //copy guids because there will be removed items from 'Jobs'
302      List<Guid> guids = new List<Guid>();
303      foreach (Guid job in Jobs.Keys) {
304        guids.Add(job);
305      }
306
307      foreach (Guid g in guids) {
308        DoStopJob(g);
309      }
310    }
311
312    /// <summary>
313    /// completly shudown slave
314    /// </summary>
315    public void Shutdown() {
316      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
317      MessageQueue.GetInstance().AddMessage(mc);
318      waitShutdownSem.WaitOne();
319    }
320
321    /// <summary>
322    /// complete shutdown, should be called before the the application is exited
323    /// </summary>
324    private void ShutdownCore() {
325      clientCom.LogMessage("Shutdown Signal received");
326      clientCom.LogMessage("Stopping heartbeat");
327      heartbeatManager.StopHeartBeat();
328      abortRequested = true;
329      clientCom.LogMessage("Logging out");
330
331
332      lock (engines) {
333        clientCom.LogMessage("engines locked");
334        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
335          clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
336          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
337          AppDomain.Unload(kvp.Value);
338        }
339      }
340      WcfService.Instance.Disconnect();
341      clientCom.Shutdown();
342      SlaveClientCom.Close();
343
344      if (slaveComm.State != CommunicationState.Closed)
345        slaveComm.Close();
346    }
347
348    /// <summary>
349    /// reinitializes everything and continues operation,
350    /// can be called after Sleep()
351    /// </summary> 
352    private void DoStartSlave() {
353      clientCom.LogMessage("Restart received");
354      StartHeartbeats();
355      clientCom.LogMessage("Restart done");
356    }
357
358    /// <summary>
359    /// stop slave, except for client gui communication,
360    /// primarily used by gui if core is running as windows service
361    /// </summary>   
362    private void Sleep() {
363      clientCom.LogMessage("Sleep received");
364      heartbeatManager.StopHeartBeat();
365      heartbeatManager = null;
366      DoStopAll();
367      WcfService.Instance.Disconnect();
368      clientCom.LogMessage("Sleep done");
369    }
370
371    /// <summary>
372    /// Pauses a job, which means sending it to the server and killing it locally;
373    /// atm only used when executor is waiting for child jobs
374    /// </summary>
375    public void PauseWaitJob(JobData data) {
376      if (!Jobs.ContainsKey(data.JobId)) {
377        clientCom.LogMessage("Can't find job with id " + data.JobId);
378      } else {
379        Job job = Jobs[data.JobId];
380        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
381        wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
382      }
383      KillAppDomain(data.JobId);
384    }
385
386    /// <summary>
387    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
388    /// once the connection gets reestablished, the job gets submitted
389    /// </summary>
390    public void SendFinishedJob(Guid jobId) {
391      try {
392        clientCom.LogMessage("Getting the finished job with id: " + jobId);
393        if (!engines.ContainsKey(jobId)) {
394          clientCom.LogMessage("Engine doesn't exist");
395          return;
396        }
397        if (!jobs.ContainsKey(jobId)) {
398          clientCom.LogMessage("Job doesn't exist");
399          return;
400        }
401        Job cJob = jobs[jobId];
402        cJob.ExecutionTime = engines[jobId].ExecutionTime;
403
404        if (engines[jobId].Aborted) {
405          SlaveStatusInfo.JobsAborted++;
406        } else {
407          SlaveStatusInfo.JobsProcessed++;
408        }
409
410        if (engines[jobId].CurrentException != string.Empty) {
411          wcfService.UpdateJobState(jobId, JobState.Failed, engines[jobId].CurrentException);
412        }
413
414        JobData sJob = engines[jobId].GetFinishedJob();
415        try {
416          clientCom.LogMessage("Sending the finished job with id: " + jobId);
417          wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
418        }
419        catch (Exception e) {
420          clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
421        }
422        finally {
423          KillAppDomain(jobId);
424          heartbeatManager.AwakeHeartBeatThread();
425        }
426      }
427      catch (Exception e) {
428        OnExceptionOccured(e);
429      }
430    }
431
432    private static object locker = new object();
433
434    /// <summary>
435    /// A new Job from the wcfService has been received and will be started within a AppDomain.
436    /// </summary>   
437    private void StartJobInAppDomain(Job myJob, JobData jobData) {
438      clientCom.LogMessage("Received new job with id " + myJob.Id);
439      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
440
441      lock (locker) {
442        if (engines.ContainsKey(myJob.Id))
443          throw new Exception("Job with key " + myJob.Id + " already exists");
444
445        String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
446        bool pluginsPrepared = false;
447        string configFileName = string.Empty;
448
449        try {
450          PluginCache.Instance.PreparePlugins(myJob, out configFileName);
451          clientCom.LogMessage("Plugins fetched for job " + myJob.Id);
452          pluginsPrepared = true;
453        }
454        catch (Exception exception) {
455          clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
456          lock (engines) {
457            if (jobs.ContainsKey(myJob.Id)) {
458              jobs.Remove(myJob.Id);
459            }
460          }
461        }
462
463        if (pluginsPrepared) {
464          try {
465            AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
466            appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
467            lock (engines) {
468              appDomains.Add(myJob.Id, appDomain);
469              clientCom.LogMessage("Creating AppDomain");
470              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
471              clientCom.LogMessage("Created AppDomain");
472              engine.JobId = myJob.Id;
473              engine.Core = this;
474              clientCom.LogMessage("Starting Engine for job " + myJob.Id);
475              engines.Add(myJob.Id, engine);
476              engine.Start(jobData.Data);
477            }
478          }
479          catch (Exception exception) {
480            clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
481            clientCom.LogMessage("Error thrown is: " + exception.ToString());
482            KillAppDomain(myJob.Id);
483          }
484        }
485      }
486      heartbeatManager.AwakeHeartBeatThread();
487    }
488
489    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
490    private void OnExceptionOccured(Exception e) {
491      clientCom.LogMessage("Error: " + e.ToString());
492      var handler = ExceptionOccured;
493      if (handler != null) handler(this, new EventArgs<Exception>(e));
494    }
495
496    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
497      clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
498      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
499    }
500
501    /// <summary>
502    /// Enqueues messages from the executor to the message queue.
503    /// This is necessary if the core thread has to execute certain actions, e.g.
504    /// killing of an app domain.
505    /// </summary>   
506    /// <returns>true if the calling method can continue execution, else false</returns>
507    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
508      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
509      container.Callback = action;
510      container.CallbackParameter = parameter;
511      MessageQueue.GetInstance().AddMessage(container);
512    }
513
514    /// <summary>
515    /// Kill a appdomain with a specific id.
516    /// </summary>
517    /// <param name="id">the GUID of the job</param>   
518    public void KillAppDomain(Guid id) {
519      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
520        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
521        return;
522      }
523
524      clientCom.LogMessage("Shutting down Appdomain for Job " + id);
525      lock (engines) {
526        try {
527          if (engines.ContainsKey(id)) {
528            engines[id].Dispose();
529            engines.Remove(id);
530          }
531
532          if (appDomains.ContainsKey(id)) {
533            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
534
535            int repeat = 5;
536            while (repeat > 0) {
537              try {
538                AppDomain.Unload(appDomains[id]);
539                repeat = 0;
540              }
541              catch (CannotUnloadAppDomainException) {
542                clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
543                Thread.Sleep(1000);
544                repeat--;
545                if (repeat == 0) {
546                  throw; // rethrow and let app crash
547                }
548              }
549            }
550            appDomains.Remove(id);
551          }
552
553          jobs.Remove(id);
554          PluginCache.Instance.DeletePluginsForJob(id);
555          GC.Collect();
556        }
557        catch (Exception ex) {
558          clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
559        }
560      }
561      GC.Collect();
562      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
563    }
564  }
565}
Note: See TracBrowser for help on using the repository browser.