Free cookie consent management tool by TermsFeed Policy Generator

source: branches/WebJobManager/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 16811

Last change on this file since 16811 was 12920, checked in by jkarder, 9 years ago

#2468: implemented checkpointing

File size: 18.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2015 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.IO;
25using System.Reflection;
26using System.ServiceModel;
27using System.Threading;
28using System.Threading.Tasks;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using TS = System.Threading.Tasks;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    private static HeartbeatManager heartbeatManager;
41    public static HeartbeatManager HeartbeatManager {
42      get { return heartbeatManager; }
43    }
44
45    public EventLog ServiceEventLog { get; set; }
46
47    private Semaphore waitShutdownSem = new Semaphore(0, 1);
48    private bool abortRequested;
49    private ServiceHost slaveComm;
50    private WcfService wcfService;
51    private TaskManager taskManager;
52    private ConfigManager configManager;
53    private PluginManager pluginManager;
54
55    public Core() {
56      var log = new ThreadSafeLog(SlaveCore.Properties.Settings.Default.MaxLogCount);
57      this.pluginManager = new PluginManager(WcfService.Instance, log);
58      this.taskManager = new TaskManager(pluginManager, log);
59      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
60
61      RegisterTaskManagerEvents();
62
63      this.configManager = new ConfigManager(taskManager);
64      ConfigManager.Instance = this.configManager;
65    }
66
67    /// <summary>
68    /// Main method for the client
69    /// </summary>
70    public void Start() {
71      abortRequested = false;
72      EventLogManager.ServiceEventLog = ServiceEventLog;
73
74      try {
75        //start the client communication service (pipe between slave and slave gui)
76        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
77
78        try {
79          slaveComm.Open();
80        }
81        catch (AddressAlreadyInUseException ex) {
82          if (ServiceEventLog != null) {
83            EventLogManager.LogException(ex);
84          }
85        }
86
87        // delete all left over task directories
88        pluginManager.CleanPluginTemp();
89        SlaveClientCom.Instance.LogMessage("Hive Slave started");
90
91        wcfService = WcfService.Instance;
92        RegisterServiceEvents();
93
94        StartHeartbeats(); // Start heartbeats thread       
95        DispatchMessageQueue(); // dispatch messages until abortRequested
96      }
97      catch (Exception ex) {
98        if (ServiceEventLog != null) {
99          EventLogManager.LogException(ex);
100        } else {
101          //try to log with SlaveClientCom.Instance. if this works the user sees at least a message,
102          //else an exception will be thrown anyways.
103          SlaveClientCom.Instance.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
104        }
105        ShutdownCore();
106      }
107      finally {
108        DeregisterServiceEvents();
109        waitShutdownSem.Release();
110      }
111    }
112
113    private void StartHeartbeats() {
114      //Initialize the heartbeat     
115      if (heartbeatManager == null) {
116        heartbeatManager = new HeartbeatManager();
117        heartbeatManager.StartHeartbeat();
118      }
119    }
120
121    private void DispatchMessageQueue() {
122      MessageQueue queue = MessageQueue.GetInstance();
123      while (!abortRequested) {
124        MessageContainer container = queue.GetMessage();
125        DetermineAction(container);
126        if (!abortRequested) {
127          SlaveClientCom.Instance.StatusChanged(configManager.GetStatusForClientConsole());
128        }
129      }
130    }
131
132    private void RegisterServiceEvents() {
133      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
134      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
135    }
136
137    private void DeregisterServiceEvents() {
138      WcfService.Instance.Connected -= WcfService_Connected;
139      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
140    }
141
142    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
143      SlaveClientCom.Instance.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
144    }
145
146    private void WcfService_Connected(object sender, EventArgs e) {
147      SlaveClientCom.Instance.LogMessage("Connected successfully to Hive server");
148    }
149
150    /// <summary>
151    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
152    /// </summary>
153    /// <param name="container">The container, containing the message</param>
154    private void DetermineAction(MessageContainer container) {
155      SlaveClientCom.Instance.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
156
157      switch (container.Message) {
158        case MessageContainer.MessageType.CalculateTask:
159          CalculateTaskAsync(container.TaskId);
160          break;
161        case MessageContainer.MessageType.AbortTask:
162          AbortTaskAsync(container.TaskId);
163          break;
164        case MessageContainer.MessageType.StopTask:
165          StopTaskAsync(container.TaskId);
166          break;
167        case MessageContainer.MessageType.PauseTask:
168          PauseTaskAsync(container.TaskId);
169          break;
170        case MessageContainer.MessageType.StopAll:
171          DoStopAll();
172          break;
173        case MessageContainer.MessageType.PauseAll:
174          DoPauseAll();
175          break;
176        case MessageContainer.MessageType.AbortAll:
177          DoAbortAll();
178          break;
179        case MessageContainer.MessageType.ShutdownSlave:
180          ShutdownCore();
181          break;
182        case MessageContainer.MessageType.Restart:
183          DoStartSlave();
184          break;
185        case MessageContainer.MessageType.Sleep:
186          Sleep();
187          break;
188        case MessageContainer.MessageType.SayHello:
189          wcfService.Connect(configManager.GetClientInfo());
190          break;
191        case MessageContainer.MessageType.NewHBInterval:
192          int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
193          if (interval != -1) {
194            HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
195          }
196          break;
197        case MessageContainer.MessageType.ShutdownComputer:
198          ShutdownComputer();
199          break;
200      }
201    }
202
203    private void CalculateTaskAsync(Guid jobId) {
204      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
205      .ContinueWith((t) => {
206        SlaveStatusInfo.IncrementTasksFailed();
207        SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
208      }, TaskContinuationOptions.OnlyOnFaulted);
209    }
210
211    private void StopTaskAsync(Guid jobId) {
212      TS.Task.Factory.StartNew(HandleStopTask, jobId)
213       .ContinueWith((t) => {
214         SlaveStatusInfo.IncrementTasksFailed();
215         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
216       }, TaskContinuationOptions.OnlyOnFaulted);
217    }
218
219    private void PauseTaskAsync(Guid jobId) {
220      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
221       .ContinueWith((t) => {
222         SlaveStatusInfo.IncrementTasksFailed();
223         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
224       }, TaskContinuationOptions.OnlyOnFaulted);
225    }
226
227    private void AbortTaskAsync(Guid jobId) {
228      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
229       .ContinueWith((t) => {
230         SlaveStatusInfo.IncrementTasksFailed();
231         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
232       }, TaskContinuationOptions.OnlyOnFaulted);
233    }
234
235    private void HandleCalculateTask(object taskIdObj) {
236      Guid taskId = (Guid)taskIdObj;
237      Task task = null;
238      int usedCores = 0;
239      try {
240        task = wcfService.GetTask(taskId);
241        if (task == null) throw new TaskNotFoundException(taskId);
242        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
243        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
244        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
245        TaskData taskData = wcfService.GetTaskData(taskId);
246        if (taskData == null) throw new TaskDataNotFoundException(taskId);
247        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
248        if (task == null) throw new TaskNotFoundException(taskId);
249        taskManager.StartTaskAsync(task, taskData);
250      }
251      catch (TaskNotFoundException) {
252        SlaveStatusInfo.DecrementUsedCores(usedCores);
253        throw;
254      }
255      catch (TaskDataNotFoundException) {
256        SlaveStatusInfo.DecrementUsedCores(usedCores);
257        throw;
258      }
259      catch (TaskAlreadyRunningException) {
260        SlaveStatusInfo.DecrementUsedCores(usedCores);
261        throw;
262      }
263      catch (OutOfCoresException) {
264        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
265        throw;
266      }
267      catch (OutOfMemoryException) {
268        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
269        throw;
270      }
271      catch (Exception e) {
272        SlaveStatusInfo.DecrementUsedCores(usedCores);
273        wcfService.UpdateJobState(taskId, TaskState.Failed, e.ToString());
274        throw;
275      }
276    }
277
278    private void HandleStopTask(object taskIdObj) {
279      Guid taskId = (Guid)taskIdObj;
280      try {
281        Task task = wcfService.GetTask(taskId);
282        if (task == null) throw new TaskNotFoundException(taskId);
283        taskManager.StopTaskAsync(taskId);
284      }
285      catch (TaskNotFoundException) {
286        throw;
287      }
288      catch (TaskNotRunningException) {
289        throw;
290      }
291      catch (AppDomainNotCreatedException) {
292        throw;
293      }
294    }
295
296    private void HandlePauseTask(object taskIdObj) {
297      Guid taskId = (Guid)taskIdObj;
298      try {
299        Task task = wcfService.GetTask(taskId);
300        if (task == null) throw new TaskNotFoundException(taskId);
301        taskManager.PauseTaskAsync(taskId);
302      }
303      catch (TaskNotFoundException) {
304        throw;
305      }
306      catch (TaskNotRunningException) {
307        throw;
308      }
309      catch (AppDomainNotCreatedException) {
310        throw;
311      }
312    }
313
314    private void HandleAbortTask(object taskIdObj) {
315      Guid taskId = (Guid)taskIdObj;
316      try {
317        taskManager.AbortTask(taskId);
318      }
319      catch (TaskNotFoundException) {
320        throw;
321      }
322    }
323
324    #region TaskManager Events
325    private void RegisterTaskManagerEvents() {
326      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
327      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
328      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
329      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
330      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
331    }
332
333    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
334      // successfully started, everything is good
335    }
336
337    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
338      try {
339        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
340        heartbeatManager.AwakeHeartBeatThread();
341        Task task = wcfService.GetTask(e.Value.TaskId);
342        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
343        task.ExecutionTime = e.Value.ExecutionTime;
344        TaskData taskData = e.Value2;
345        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
346      }
347      catch (TaskNotFoundException ex) {
348        SlaveClientCom.Instance.LogMessage(ex.ToString());
349      }
350      catch (Exception ex) {
351        SlaveClientCom.Instance.LogMessage(ex.ToString());
352      }
353    }
354
355    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
356      try {
357        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
358        heartbeatManager.AwakeHeartBeatThread();
359        Task task = wcfService.GetTask(e.Value.TaskId);
360        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
361        task.ExecutionTime = e.Value.ExecutionTime;
362        TaskData taskData = e.Value2;
363        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
364      }
365      catch (TaskNotFoundException ex) {
366        SlaveClientCom.Instance.LogMessage(ex.ToString());
367      }
368      catch (Exception ex) {
369        SlaveClientCom.Instance.LogMessage(ex.ToString());
370      }
371    }
372
373    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
374      try {
375        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
376        heartbeatManager.AwakeHeartBeatThread();
377        SlaveTask slaveTask = e.Value.Item1;
378        TaskData taskData = e.Value.Item2;
379        Exception exception = e.Value.Item3;
380
381        Task task = wcfService.GetTask(slaveTask.TaskId);
382        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
383        task.ExecutionTime = slaveTask.ExecutionTime;
384        if (taskData != null) {
385          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
386        } else {
387          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
388        }
389        SlaveClientCom.Instance.LogMessage(exception.Message);
390      }
391      catch (TaskNotFoundException ex) {
392        SlaveStatusInfo.IncrementTasksFailed();
393        SlaveClientCom.Instance.LogMessage(ex.ToString());
394      }
395      catch (Exception ex) {
396        SlaveStatusInfo.IncrementTasksFailed();
397        SlaveClientCom.Instance.LogMessage(ex.ToString());
398      }
399    }
400
401    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
402      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
403    }
404    #endregion
405
406    #region Log Events
407    private void log_MessageAdded(object sender, EventArgs<string> e) {
408      try {
409        SlaveClientCom.Instance.LogMessage(e.Value.Split('\t')[1]);
410      }
411      catch { }
412    }
413    #endregion
414
415    /// <summary>
416    /// aborts all running jobs, no results are sent back
417    /// </summary>
418    private void DoAbortAll() {
419      SlaveClientCom.Instance.LogMessage("Aborting all jobs.");
420      foreach (Guid taskId in taskManager.TaskIds) {
421        AbortTaskAsync(taskId);
422      }
423    }
424
425    /// <summary>
426    /// wait for jobs to finish, then pause client
427    /// </summary>
428    private void DoPauseAll() {
429      SlaveClientCom.Instance.LogMessage("Pausing all jobs.");
430      foreach (Guid taskId in taskManager.TaskIds) {
431        PauseTaskAsync(taskId);
432      }
433    }
434
435    /// <summary>
436    /// pause slave immediately
437    /// </summary>
438    private void DoStopAll() {
439      SlaveClientCom.Instance.LogMessage("Stopping all jobs.");
440      foreach (Guid taskId in taskManager.TaskIds) {
441        StopTaskAsync(taskId);
442      }
443    }
444
445    #region Slave Lifecycle Methods
446    /// <summary>
447    /// completly shudown slave
448    /// </summary>
449    public void Shutdown() {
450      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
451      MessageQueue.GetInstance().AddMessage(mc);
452      waitShutdownSem.WaitOne();
453    }
454
455    private void ShutdownComputer() {
456      var t = TS.Task.Factory.StartNew(new Action(Shutdown));
457      t.ContinueWith(c => {
458        try {
459          //we assume that *.exe means an executable in the current directory, otherwise it is a command
460          if (SlaveCore.Properties.Settings.Default.ShutdownCommand.EndsWith(".exe")) {
461            string dirName = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
462            Process.Start(Path.Combine(dirName, SlaveCore.Properties.Settings.Default.ShutdownCommand));
463          } else {
464            Process.Start(SlaveCore.Properties.Settings.Default.ShutdownCommand);
465          }
466        }
467        catch (Exception ex) {
468          if (ServiceEventLog != null) {
469            EventLogManager.LogException(ex);
470          } else
471            throw ex;
472        }
473      });
474    }
475
476    /// <summary>
477    /// complete shutdown, should be called before the the application is exited
478    /// </summary>
479    private void ShutdownCore() {
480      SlaveClientCom.Instance.LogMessage("Shutdown signal received");
481      SlaveClientCom.Instance.LogMessage("Stopping heartbeat");
482      heartbeatManager.StopHeartBeat();
483      SlaveClientCom.Instance.LogMessage("Stopping checkpointing");
484      taskManager.StopCheckpointing();
485
486      abortRequested = true;
487
488      DoAbortAll();
489
490      SlaveClientCom.Instance.LogMessage("Logging out");
491      WcfService.Instance.Disconnect();
492      SlaveClientCom.Instance.ClientCom.Shutdown();
493      SlaveClientCom.Close();
494
495      if (slaveComm.State != CommunicationState.Closed)
496        slaveComm.Close();
497    }
498
499    /// <summary>
500    /// reinitializes everything and continues operation,
501    /// can be called after Sleep()
502    /// </summary> 
503    private void DoStartSlave() {
504      SlaveClientCom.Instance.LogMessage("Restart received");
505      configManager.Asleep = false;
506    }
507
508    /// <summary>
509    /// stop slave, except for client gui communication,
510    /// primarily used by gui if core is running as windows service
511    /// </summary>   
512    private void Sleep() {
513      SlaveClientCom.Instance.LogMessage("Sleep received - not accepting any new jobs");
514      configManager.Asleep = true;
515      DoPauseAll();
516    }
517    #endregion
518  }
519}
Note: See TracBrowser for help on using the repository browser.