Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 6168

Last change on this file since 6168 was 6168, checked in by cneumuel, 13 years ago

#1233

  • removed Job-dto objects from slave core (since it stores outdated objects)
  • added command textbox to HiveJobView
  • improved the way the control buttons behave in HiveJobView
  • improved job control (pause and stop is also possible when job is not currently calculating)
  • improved gantt chart view (last state log entry is also displayed)
  • unified code for downloading jobs between experiment manager and hive engine
File size: 22.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Diagnostics;
25using System.IO;
26using System.Linq;
27using System.ServiceModel;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
31using HeuristicLab.Common;
32using HeuristicLab.Core;
33
34
35namespace HeuristicLab.Clients.Hive.SlaveCore {
36  /// <summary>
37  /// The core component of the Hive Slave.
38  /// Handles commands sent from the Hive Server.
39  /// </summary>
40  public class Core : MarshalByRefObject {
41    public EventLog ServiceEventLog { get; set; }
42
43    public static bool abortRequested { get; set; }
44    private Semaphore waitShutdownSem = new Semaphore(0, 1);
45    public static ILog Log { get; set; }
46
47    private Dictionary<Guid, Executor> executors = new Dictionary<Guid, Executor>();
48    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
49
50    private WcfService wcfService;
51    private HeartbeatManager heartbeatManager;
52    private int coreThreadId;
53
54    private ISlaveCommunication clientCom;
55    private ServiceHost slaveComm;
56
57    public Dictionary<Guid, Executor> Executors {
58      get { return executors; }
59    }
60
61    public Core() { }
62
63    /// <summary>
64    /// Main Method for the client
65    /// </summary>
66    public void Start() {
67      coreThreadId = Thread.CurrentThread.ManagedThreadId;
68      abortRequested = false;
69
70      try {
71        ConfigManager manager = ConfigManager.Instance;
72        manager.Core = this;
73
74        //start the client communication service (pipe between slave and slave gui)
75        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
76        slaveComm.Open();
77        clientCom = SlaveClientCom.Instance.ClientCom;
78
79        // delete all left over job directories
80        PluginCache.Instance.CleanPluginTemp();
81        clientCom.LogMessage("Hive Slave started");
82
83        wcfService = WcfService.Instance;
84        RegisterServiceEvents();
85
86        StartHeartbeats(); // Start heartbeats thread       
87        DispatchMessageQueue(); // dispatch messages until abortRequested
88      }
89      catch (Exception ex) {
90        if (ServiceEventLog != null) {
91          try {
92            ServiceEventLog.WriteEntry("Hive Slave threw exception: " + ex.ToString() + " with stack trace: " + ex.StackTrace);
93          }
94          catch (Exception) { }
95        } else {
96          //try to log with clientCom. if this works the user sees at least a message,
97          //else an exception will be thrown anyways.
98          clientCom.LogMessage("Uncaught exception: " + ex.ToString() +
99            Environment.NewLine + "Core is going to shutdown.");
100        }
101        ShutdownCore();
102      }
103      finally {
104        DeRegisterServiceEvents();
105        waitShutdownSem.Release();
106      }
107    }
108
109    private void StartHeartbeats() {
110      //Initialize the heartbeat     
111      if (heartbeatManager == null) {
112        heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
113        heartbeatManager.StartHeartbeat();
114      }
115    }
116
117    private void DispatchMessageQueue() {
118      MessageQueue queue = MessageQueue.GetInstance();
119      while (!abortRequested) {
120        MessageContainer container = queue.GetMessage();
121        DetermineAction(container);
122        clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
123      }
124    }
125
126    private void RegisterServiceEvents() {
127      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
128      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
129    }
130
131    private void DeRegisterServiceEvents() {
132      WcfService.Instance.Connected -= WcfService_Connected;
133      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
134    }
135
136    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
137      clientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
138    }
139
140    void WcfService_Connected(object sender, EventArgs e) {
141      clientCom.LogMessage("Connected successfully to Hive server");
142    }
143
144    /// <summary>
145    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
146    /// </summary>
147    /// <param name="container">The container, containing the message</param>
148    private void DetermineAction(MessageContainer container) {
149      clientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
150
151      if (container is ExecutorMessageContainer<Guid>) {
152        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
153        c.execute();
154      } else if (container is MessageContainer) {
155        switch (container.Message) {
156          case MessageContainer.MessageType.CalculateJob:
157            Task.Factory.StartNew((jobIdObj) => {
158              Guid jobId = (Guid)jobIdObj;
159              Job job = wcfService.GetJob(jobId);
160              if (job == null) throw new JobNotFoundException(jobId);
161              JobData jobData = wcfService.GetJobData(job.Id);
162              if (jobData == null) throw new JobDataNotFoundException(jobId);
163              SlaveStatusInfo.JobsFetched++;
164              job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
165              if (job == null) throw new JobNotFoundException(jobId);
166              StartJobInAppDomain(job, jobData);
167            }, container.JobId)
168            .ContinueWith((t) => {
169              // handle exception of task
170              clientCom.LogMessage(t.Exception.ToString());
171              wcfService.UpdateJobState(container.JobId, JobState.Failed, t.Exception.ToString());
172              SlaveStatusInfo.JobsAborted++;
173            }, TaskContinuationOptions.OnlyOnFaulted);
174            break;
175          case MessageContainer.MessageType.ShutdownSlave:
176            ShutdownCore();
177            break;
178          case MessageContainer.MessageType.StopAll:
179            DoStopAll();
180            break;
181          case MessageContainer.MessageType.PauseAll:
182            DoPauseAll();
183            break;
184          case MessageContainer.MessageType.AbortAll:
185            DoAbortAll();
186            break;
187          case MessageContainer.MessageType.AbortJob:
188            SlaveStatusInfo.JobsAborted++;
189            KillAppDomain(container.JobId);
190            break;
191          case MessageContainer.MessageType.StopJob:
192            DoStopJob(container.JobId);
193            break;
194          case MessageContainer.MessageType.PauseJob:
195            DoPauseJob(container.JobId);
196            break;
197          case MessageContainer.MessageType.Restart:
198            DoStartSlave();
199            break;
200          case MessageContainer.MessageType.Sleep:
201            Sleep();
202            break;
203          case MessageContainer.MessageType.SayHello:
204            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
205            break;
206        }
207      } else {
208        clientCom.LogMessage("Unknown MessageContainer: " + container);
209      }
210    }
211
212    private void DoPauseJob(Guid jobId) {
213      if (!executors.ContainsKey(jobId)) {
214        clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId);
215      } else {
216        Job job = wcfService.GetJob(jobId);
217
218        if (job != null && executors.ContainsKey(job.Id)) {
219          executors[job.Id].Pause();
220          JobData sJob = executors[job.Id].GetPausedJob();
221          job.ExecutionTime = executors[job.Id].ExecutionTime;
222
223          try {
224            if (executors[job.Id].CurrentException != string.Empty) {
225              wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
226              SlaveStatusInfo.JobsAborted++;
227            } else {
228              SlaveStatusInfo.JobsProcessed++;
229            }
230            clientCom.LogMessage("Sending the paused job with id: " + job.Id);
231            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
232          }
233          catch (Exception e) {
234            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
235          }
236          finally {
237            KillAppDomain(job.Id); // kill app-domain in every case         
238          }
239        }
240      }
241    }
242
243    private void DoStopJob(Guid jobId) {
244      if (!executors.ContainsKey(jobId)) {
245        clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId);
246      } else {
247        Job job = wcfService.GetJob(jobId);
248
249        if (job != null) {
250          executors[job.Id].Stop();
251          JobData sJob = executors[job.Id].GetFinishedJob();
252          job.ExecutionTime = executors[job.Id].ExecutionTime;
253
254          try {
255            if (executors[job.Id].CurrentException != string.Empty) {
256              wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
257            }
258            SlaveStatusInfo.JobsAborted++;
259
260            clientCom.LogMessage("Sending the stopped job with id: " + job.Id);
261            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Aborted);
262          }
263          catch (Exception e) {
264            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
265          }
266          finally {
267            KillAppDomain(job.Id); // kill app-domain in every case         
268          }
269        }
270      }
271    }
272
273    /// <summary>
274    /// aborts all running jobs, no results are sent back
275    /// </summary>
276    private void DoAbortAll() {
277      List<Guid> jobIds;
278      lock (executors) {
279        jobIds = new List<Guid>(executors.Keys);
280      }
281      foreach (Guid jobId in jobIds) {
282        KillAppDomain(jobId);
283      }
284      clientCom.LogMessage("Aborted all jobs!");
285    }
286
287    /// <summary>
288    /// wait for jobs to finish, then pause client
289    /// </summary>
290    private void DoPauseAll() {
291      clientCom.LogMessage("Pause all received");
292
293      //copy guids because there will be removed items from 'Jobs'
294      List<Guid> jobIds;
295      lock (executors) {
296        jobIds = new List<Guid>(Executors.Keys);
297      }
298
299      foreach (Guid jobId in jobIds) {
300        DoPauseJob(jobId);
301      }
302    }
303
304    /// <summary>
305    /// pause slave immediately
306    /// </summary>
307    private void DoStopAll() {
308      clientCom.LogMessage("Stop all received");
309
310      //copy guids because there will be removed items from 'Jobs'
311      List<Guid> jobIds;
312      lock (executors) {
313        jobIds = new List<Guid>(executors.Keys);
314      }
315
316      foreach (Guid jobId in jobIds) {
317        DoStopJob(jobId);
318      }
319    }
320
321    /// <summary>
322    /// completly shudown slave
323    /// </summary>
324    public void Shutdown() {
325      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
326      MessageQueue.GetInstance().AddMessage(mc);
327      waitShutdownSem.WaitOne();
328    }
329
330    /// <summary>
331    /// complete shutdown, should be called before the the application is exited
332    /// </summary>
333    private void ShutdownCore() {
334      clientCom.LogMessage("Shutdown Signal received");
335      clientCom.LogMessage("Stopping heartbeat");
336      heartbeatManager.StopHeartBeat();
337      abortRequested = true;
338      clientCom.LogMessage("Logging out");
339
340
341      lock (executors) {
342        clientCom.LogMessage("executors locked");
343        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
344          clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
345          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
346          AppDomain.Unload(kvp.Value);
347        }
348      }
349      WcfService.Instance.Disconnect();
350      clientCom.Shutdown();
351      SlaveClientCom.Close();
352
353      if (slaveComm.State != CommunicationState.Closed)
354        slaveComm.Close();
355    }
356
357    /// <summary>
358    /// reinitializes everything and continues operation,
359    /// can be called after Sleep()
360    /// </summary> 
361    private void DoStartSlave() {
362      clientCom.LogMessage("Restart received");
363      StartHeartbeats();
364      clientCom.LogMessage("Restart done");
365    }
366
367    /// <summary>
368    /// stop slave, except for client gui communication,
369    /// primarily used by gui if core is running as windows service
370    /// </summary>   
371    private void Sleep() {
372      clientCom.LogMessage("Sleep received");
373      heartbeatManager.StopHeartBeat();
374      heartbeatManager = null;
375      DoStopAll();
376      WcfService.Instance.Disconnect();
377      clientCom.LogMessage("Sleep done");
378    }
379
380    /// <summary>
381    /// Pauses a job, which means sending it to the server and killing it locally;
382    /// atm only used when executor is waiting for child jobs
383    /// </summary>
384    public void PauseWaitJob(JobData data) {
385      if (!Executors.ContainsKey(data.JobId)) {
386        clientCom.LogMessage("Can't find job with id " + data.JobId);
387      } else {
388        Job job = wcfService.GetJob(data.JobId);
389        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
390        wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
391      }
392      KillAppDomain(data.JobId);
393    }
394
395    /// <summary>
396    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
397    /// once the connection gets reestablished, the job gets submitted
398    /// </summary>
399    public void SendFinishedJob(Guid jobId) {
400      try {
401        clientCom.LogMessage("Getting the finished job with id: " + jobId);
402        if (!executors.ContainsKey(jobId)) {
403          clientCom.LogMessage("Executor doesn't exist");
404          return;
405        }
406        if (!executors.ContainsKey(jobId)) {
407          clientCom.LogMessage("Job doesn't exist");
408          return;
409        }
410        Job job = wcfService.GetJob(jobId);
411        job.ExecutionTime = executors[jobId].ExecutionTime;
412
413        if (executors[jobId].Aborted) {
414          SlaveStatusInfo.JobsAborted++;
415        } else {
416          SlaveStatusInfo.JobsProcessed++;
417        }
418
419        if (executors[jobId].CurrentException != string.Empty) {
420          wcfService.UpdateJobState(jobId, JobState.Failed, executors[jobId].CurrentException);
421        }
422
423        JobData sJob = executors[jobId].GetFinishedJob();
424        try {
425          clientCom.LogMessage("Sending the finished job with id: " + jobId);
426          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
427        }
428        catch (Exception e) {
429          clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
430        }
431        finally {
432          KillAppDomain(jobId);
433          heartbeatManager.AwakeHeartBeatThread();
434        }
435      }
436      catch (Exception e) {
437        OnExceptionOccured(e);
438      }
439    }
440
441    private static object startInAppDomainLocker = new object();
442
443    /// <summary>
444    /// A new Job from the wcfService has been received and will be started within a AppDomain.
445    /// </summary>   
446    private void StartJobInAppDomain(Job job, JobData jobData) {
447      clientCom.LogMessage("Received new job with id " + job.Id);
448      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
449
450      lock (startInAppDomainLocker) {
451        if (executors.ContainsKey(job.Id)) {
452          clientCom.LogMessage("Job with key " + job.Id + " already exists. Job will be ignored.");
453          return;
454        }
455
456        String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, job.Id.ToString());
457        bool pluginsPrepared = false;
458        string configFileName = string.Empty;
459
460        try {
461          PluginCache.Instance.PreparePlugins(job, out configFileName);
462          clientCom.LogMessage("Plugins fetched for job " + job.Id);
463          pluginsPrepared = true;
464        }
465        catch (Exception exception) {
466          clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", job.Id, exception));
467          wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
468          SlaveStatusInfo.JobsAborted++;
469        }
470
471        if (pluginsPrepared) {
472          try {
473            AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(job.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
474            appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
475            Executor executor;
476            appDomains.Add(job.Id, appDomain);
477            clientCom.LogMessage("Creating AppDomain");
478            executor = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
479            clientCom.LogMessage("Created AppDomain");
480            executor.Core = this;
481            executor.JobId = job.Id;
482            executor.CoresNeeded = job.CoresNeeded;
483            executor.MemoryNeeded = job.MemoryNeeded;
484            clientCom.LogMessage("Starting Executor for job " + job.Id);
485            executor.Start(jobData.Data);
486            lock (executors) {
487              executors.Add(job.Id, executor);
488            }
489          }
490          catch (Exception exception) {
491            clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + job.Id);
492            clientCom.LogMessage("Error thrown is: " + exception.ToString());
493
494            if (executors.ContainsKey(job.Id) && executors[job.Id].CurrentException != string.Empty) {
495              wcfService.UpdateJobState(job.Id, JobState.Failed, executors[job.Id].CurrentException);
496            } else {
497              wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
498            }
499            SlaveStatusInfo.JobsAborted++;
500
501            KillAppDomain(job.Id);
502          }
503        }
504      }
505      heartbeatManager.AwakeHeartBeatThread();
506    }
507
508    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
509    private void OnExceptionOccured(Exception e) {
510      clientCom.LogMessage("Error: " + e.ToString());
511      var handler = ExceptionOccured;
512      if (handler != null) handler(this, new EventArgs<Exception>(e));
513    }
514
515    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
516      clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
517      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
518    }
519
520    /// <summary>
521    /// Enqueues messages from the executor to the message queue.
522    /// This is necessary if the core thread has to execute certain actions, e.g.
523    /// killing of an app domain.
524    /// </summary>   
525    /// <returns>true if the calling method can continue execution, else false</returns>
526    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
527      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
528      container.Callback = action;
529      container.CallbackParameter = parameter;
530      MessageQueue.GetInstance().AddMessage(container);
531    }
532
533    /// <summary>
534    /// Kill a appdomain with a specific id.
535    /// </summary>
536    /// <param name="id">the GUID of the job</param>   
537    public void KillAppDomain(Guid id) {
538      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
539        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
540        return;
541      }
542
543      clientCom.LogMessage("Shutting down Appdomain for Job " + id);
544      lock (executors) {
545        try {
546          if (executors.ContainsKey(id)) {
547            executors[id].Dispose();
548            executors.Remove(id);
549          }
550
551          if (appDomains.ContainsKey(id)) {
552            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
553
554            int repeat = 5;
555            while (repeat > 0) {
556              try {
557                AppDomain.Unload(appDomains[id]);
558                repeat = 0;
559              }
560              catch (CannotUnloadAppDomainException) {
561                clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
562                Thread.Sleep(1000);
563                repeat--;
564                if (repeat == 0) {
565                  clientCom.LogMessage("Could not unload AppDomain, shutting down core...");
566                  throw; // rethrow and let app crash
567                }
568              }
569            }
570            appDomains.Remove(id);
571          }
572
573          PluginCache.Instance.DeletePluginsForJob(id);
574          GC.Collect();
575        }
576        catch (Exception ex) {
577          clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
578        }
579      }
580      GC.Collect();
581      clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
582    }
583
584    public override object InitializeLifetimeService() {
585      return null; // avoid destruction of proxy object after 5 minutes
586    }
587
588    public int GetCoresNeeded() {
589      lock (executors) {
590        return executors.Sum(x => x.Value.CoresNeeded);
591      }
592    }
593  }
594}
Note: See TracBrowser for help on using the repository browser.