Free cookie consent management tool by TermsFeed Policy Generator

source: branches/2521_ProblemRefactoring/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 17656

Last change on this file since 17656 was 17586, checked in by mkommend, 4 years ago

#2521: Merged trunk changes into problem refactoring branch.

File size: 19.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.IO;
25using System.Reflection;
26using System.ServiceModel;
27using System.Threading;
28using System.Threading.Tasks;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using TS = System.Threading.Tasks;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    private static readonly object locker = new object();
41
42    private static HeartbeatManager heartbeatManager;
43    public static HeartbeatManager HeartbeatManager {
44      get { return heartbeatManager; }
45    }
46
47    public EventLog ServiceEventLog { get; set; }
48
49    private Semaphore waitShutdownSem = new Semaphore(0, 1);
50    private bool abortRequested;
51    private ServiceHost slaveComm;
52    private WcfService wcfService;
53    private TaskManager taskManager;
54    private ConfigManager configManager;
55    private PluginManager pluginManager;
56
57    public Core() {
58      var log = new ThreadSafeLog(SlaveCore.Properties.Settings.Default.MaxLogCount);
59      this.pluginManager = new PluginManager(WcfService.Instance, log);
60      this.taskManager = new TaskManager(pluginManager, log);
61      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
62
63      RegisterTaskManagerEvents();
64
65      this.configManager = new ConfigManager(taskManager);
66      ConfigManager.Instance = this.configManager;
67    }
68
69    /// <summary>
70    /// Main method for the client
71    /// </summary>
72    public void Start() {
73      abortRequested = false;
74      EventLogManager.ServiceEventLog = ServiceEventLog;
75
76      try {
77        //start the client communication service (pipe between slave and slave gui)
78        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
79
80        try {
81          slaveComm.Open();
82        }
83        catch (AddressAlreadyInUseException ex) {
84          if (ServiceEventLog != null) {
85            EventLogManager.LogException(ex);
86          }
87        }
88
89        // delete all left over task directories
90        pluginManager.CleanPluginTemp();
91        SlaveClientCom.Instance.LogMessage("Hive Slave started");
92
93        wcfService = WcfService.Instance;
94        RegisterServiceEvents();
95
96        StartHeartbeats(); // Start heartbeats thread       
97        DispatchMessageQueue(); // dispatch messages until abortRequested
98      }
99      catch (Exception ex) {
100        if (ServiceEventLog != null) {
101          EventLogManager.LogException(ex);
102        } else {
103          //try to log with SlaveClientCom.Instance. if this works the user sees at least a message,
104          //else an exception will be thrown anyways.
105          SlaveClientCom.Instance.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
106        }
107        ShutdownCore();
108      }
109      finally {
110        DeregisterServiceEvents();
111        waitShutdownSem.Release();
112      }
113    }
114
115    private void StartHeartbeats() {
116      //Initialize the heartbeat     
117      if (heartbeatManager == null) {
118        heartbeatManager = new HeartbeatManager();
119        heartbeatManager.StartHeartbeat();
120      }
121    }
122
123    private void DispatchMessageQueue() {
124      MessageQueue queue = MessageQueue.GetInstance();
125      while (!abortRequested) {
126        MessageContainer container = queue.GetMessage();
127        DetermineAction(container);
128        if (!abortRequested) {
129          SlaveClientCom.Instance.StatusChanged(configManager.GetStatusForClientConsole());
130        }
131      }
132    }
133
134    private void RegisterServiceEvents() {
135      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
136      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
137    }
138
139    private void DeregisterServiceEvents() {
140      WcfService.Instance.Connected -= WcfService_Connected;
141      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
142    }
143
144    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
145      SlaveClientCom.Instance.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
146    }
147
148    private void WcfService_Connected(object sender, EventArgs e) {
149      SlaveClientCom.Instance.LogMessage("Connected successfully to Hive server");
150    }
151
152    /// <summary>
153    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
154    /// </summary>
155    /// <param name="container">The container, containing the message</param>
156    private void DetermineAction(MessageContainer container) {
157      SlaveClientCom.Instance.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
158
159      switch (container.Message) {
160        case MessageContainer.MessageType.CalculateTask:
161          CalculateTaskAsync(container.TaskId);
162          break;
163        case MessageContainer.MessageType.AbortTask:
164          AbortTaskAsync(container.TaskId);
165          break;
166        case MessageContainer.MessageType.StopTask:
167          StopTaskAsync(container.TaskId);
168          break;
169        case MessageContainer.MessageType.PauseTask:
170          PauseTaskAsync(container.TaskId);
171          break;
172        case MessageContainer.MessageType.StopAll:
173          DoStopAll();
174          break;
175        case MessageContainer.MessageType.PauseAll:
176          DoPauseAll();
177          break;
178        case MessageContainer.MessageType.AbortAll:
179          DoAbortAll();
180          break;
181        case MessageContainer.MessageType.ShutdownSlave:
182          ShutdownCore();
183          break;
184        case MessageContainer.MessageType.Restart:
185          DoStartSlave();
186          break;
187        case MessageContainer.MessageType.Sleep:
188          Sleep();
189          break;
190        case MessageContainer.MessageType.SayHello:
191          wcfService.Connect(configManager.GetClientInfo());
192          break;
193        case MessageContainer.MessageType.NewHBInterval:
194          int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
195          if (interval != -1) {
196            HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
197          }
198          break;
199        case MessageContainer.MessageType.ShutdownComputer:
200          ShutdownComputer();
201          break;
202      }
203    }
204
205    private void CalculateTaskAsync(Guid jobId) {
206      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
207      .ContinueWith((t) => {
208        SlaveStatusInfo.IncrementTasksFailed();
209        SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
210      }, TaskContinuationOptions.OnlyOnFaulted);
211    }
212
213    private void StopTaskAsync(Guid jobId) {
214      TS.Task.Factory.StartNew(HandleStopTask, jobId)
215       .ContinueWith((t) => {
216         SlaveStatusInfo.IncrementTasksFailed();
217         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
218       }, TaskContinuationOptions.OnlyOnFaulted);
219    }
220
221    private void PauseTaskAsync(Guid jobId) {
222      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
223       .ContinueWith((t) => {
224         SlaveStatusInfo.IncrementTasksFailed();
225         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
226       }, TaskContinuationOptions.OnlyOnFaulted);
227    }
228
229    private void AbortTaskAsync(Guid jobId) {
230      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
231       .ContinueWith((t) => {
232         SlaveStatusInfo.IncrementTasksFailed();
233         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
234       }, TaskContinuationOptions.OnlyOnFaulted);
235    }
236
237    private void HandleCalculateTask(object taskIdObj) {
238      Guid taskId = (Guid)taskIdObj;
239      Task task = null;
240      int usedCores = 0;
241      try {
242        task = wcfService.GetTask(taskId);
243        if (task == null) throw new TaskNotFoundException(taskId);
244        lock (locker) {
245          // the amount of used cores/memory could exceed the amount of available cores/memory
246          // if HandleCalculateTask is called simultaneously from within two different tasks
247          if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
248          if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
249          SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
250        }
251        TaskData taskData = wcfService.GetTaskData(taskId);
252        if (taskData == null) throw new TaskDataNotFoundException(taskId);
253        task = wcfService.UpdateTaskState(taskId, TaskState.Calculating, null);
254        if (task == null) throw new TaskNotFoundException(taskId);
255        taskManager.StartTaskAsync(task, taskData);
256      }
257      catch (TaskNotFoundException) {
258        SlaveStatusInfo.DecrementUsedCores(usedCores);
259        throw;
260      }
261      catch (TaskDataNotFoundException) {
262        SlaveStatusInfo.DecrementUsedCores(usedCores);
263        throw;
264      }
265      catch (TaskAlreadyRunningException) {
266        SlaveStatusInfo.DecrementUsedCores(usedCores);
267        throw;
268      }
269      catch (OutOfCoresException) {
270        wcfService.UpdateTaskState(taskId, TaskState.Waiting, "No more cores available");
271        throw;
272      }
273      catch (OutOfMemoryException) {
274        wcfService.UpdateTaskState(taskId, TaskState.Waiting, "No more memory available");
275        throw;
276      }
277      catch (Exception e) {
278        SlaveStatusInfo.DecrementUsedCores(usedCores);
279        wcfService.UpdateTaskState(taskId, TaskState.Failed, e.ToString());
280        throw;
281      }
282    }
283
284    private void HandleStopTask(object taskIdObj) {
285      Guid taskId = (Guid)taskIdObj;
286      try {
287        Task task = wcfService.GetTask(taskId);
288        if (task == null) throw new TaskNotFoundException(taskId);
289        taskManager.StopTaskAsync(taskId);
290      }
291      catch (TaskNotFoundException) {
292        throw;
293      }
294      catch (TaskNotRunningException) {
295        throw;
296      }
297      catch (AppDomainNotCreatedException) {
298        throw;
299      }
300    }
301
302    private void HandlePauseTask(object taskIdObj) {
303      Guid taskId = (Guid)taskIdObj;
304      try {
305        Task task = wcfService.GetTask(taskId);
306        if (task == null) throw new TaskNotFoundException(taskId);
307        taskManager.PauseTaskAsync(taskId);
308      }
309      catch (TaskNotFoundException) {
310        throw;
311      }
312      catch (TaskNotRunningException) {
313        throw;
314      }
315      catch (AppDomainNotCreatedException) {
316        throw;
317      }
318    }
319
320    private void HandleAbortTask(object taskIdObj) {
321      Guid taskId = (Guid)taskIdObj;
322      try {
323        taskManager.AbortTask(taskId);
324      }
325      catch (TaskNotFoundException) {
326        throw;
327      }
328    }
329
330    #region TaskManager Events
331    private void RegisterTaskManagerEvents() {
332      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
333      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
334      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
335      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
336      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
337    }
338
339    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
340      // successfully started, everything is good
341    }
342
343    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
344      try {
345        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
346        heartbeatManager.AwakeHeartBeatThread();
347        Task task = wcfService.GetTask(e.Value.TaskId);
348        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
349        task.ExecutionTime = e.Value.ExecutionTime;
350        TaskData taskData = e.Value2;
351        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
352      }
353      catch (TaskNotFoundException ex) {
354        SlaveClientCom.Instance.LogMessage(ex.ToString());
355      }
356      catch (Exception ex) {
357        SlaveClientCom.Instance.LogMessage(ex.ToString());
358      }
359    }
360
361    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
362      try {
363        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
364        heartbeatManager.AwakeHeartBeatThread();
365        Task task = wcfService.GetTask(e.Value.TaskId);
366        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
367        task.ExecutionTime = e.Value.ExecutionTime;
368        TaskData taskData = e.Value2;
369        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
370      }
371      catch (TaskNotFoundException ex) {
372        SlaveClientCom.Instance.LogMessage(ex.ToString());
373      }
374      catch (Exception ex) {
375        SlaveClientCom.Instance.LogMessage(ex.ToString());
376      }
377    }
378
379    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
380      try {
381        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
382        heartbeatManager.AwakeHeartBeatThread();
383        SlaveTask slaveTask = e.Value.Item1;
384        TaskData taskData = e.Value.Item2;
385        Exception exception = e.Value.Item3;
386
387        Task task = wcfService.GetTask(slaveTask.TaskId);
388        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
389        task.ExecutionTime = slaveTask.ExecutionTime;
390        if (taskData != null) {
391          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
392        } else {
393          wcfService.UpdateTaskState(task.Id, TaskState.Failed, exception.ToString());
394        }
395        SlaveClientCom.Instance.LogMessage(exception.Message);
396      }
397      catch (TaskNotFoundException ex) {
398        SlaveStatusInfo.IncrementTasksFailed();
399        SlaveClientCom.Instance.LogMessage(ex.ToString());
400      }
401      catch (Exception ex) {
402        SlaveStatusInfo.IncrementTasksFailed();
403        SlaveClientCom.Instance.LogMessage(ex.ToString());
404      }
405    }
406
407    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
408      var slaveTask = e.Value;
409      var task = wcfService.GetTask(slaveTask.TaskId);
410      wcfService.UpdateTaskState(task.Id, TaskState.Aborted, null);
411      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
412    }
413    #endregion
414
415    #region Log Events
416    private void log_MessageAdded(object sender, EventArgs<string> e) {
417      try {
418        SlaveClientCom.Instance.LogMessage(e.Value.Split('\t')[1]);
419      }
420      catch { }
421    }
422    #endregion
423
424    /// <summary>
425    /// aborts all running jobs, no results are sent back
426    /// </summary>
427    private void DoAbortAll() {
428      SlaveClientCom.Instance.LogMessage("Aborting all jobs.");
429      foreach (Guid taskId in taskManager.TaskIds) {
430        AbortTaskAsync(taskId);
431      }
432    }
433
434    /// <summary>
435    /// wait for jobs to finish, then pause client
436    /// </summary>
437    private void DoPauseAll() {
438      SlaveClientCom.Instance.LogMessage("Pausing all jobs.");
439      foreach (Guid taskId in taskManager.TaskIds) {
440        PauseTaskAsync(taskId);
441      }
442    }
443
444    /// <summary>
445    /// pause slave immediately
446    /// </summary>
447    private void DoStopAll() {
448      SlaveClientCom.Instance.LogMessage("Stopping all jobs.");
449      foreach (Guid taskId in taskManager.TaskIds) {
450        StopTaskAsync(taskId);
451      }
452    }
453
454    #region Slave Lifecycle Methods
455    /// <summary>
456    /// completly shudown slave
457    /// </summary>
458    public void Shutdown() {
459      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
460      MessageQueue.GetInstance().AddMessage(mc);
461      waitShutdownSem.WaitOne();
462    }
463
464    private void ShutdownComputer() {
465      var t = TS.Task.Factory.StartNew(new Action(Shutdown));
466      t.ContinueWith(c => {
467        try {
468          //we assume that *.exe means an executable in the current directory, otherwise it is a command
469          if (SlaveCore.Properties.Settings.Default.ShutdownCommand.EndsWith(".exe")) {
470            string dirName = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
471            Process.Start(Path.Combine(dirName, SlaveCore.Properties.Settings.Default.ShutdownCommand));
472          } else {
473            Process.Start(SlaveCore.Properties.Settings.Default.ShutdownCommand);
474          }
475        }
476        catch (Exception ex) {
477          if (ServiceEventLog != null) {
478            EventLogManager.LogException(ex);
479          } else
480            throw ex;
481        }
482      });
483    }
484
485    /// <summary>
486    /// complete shutdown, should be called before the the application is exited
487    /// </summary>
488    private void ShutdownCore() {
489      SlaveClientCom.Instance.LogMessage("Shutdown signal received");
490      SlaveClientCom.Instance.LogMessage("Stopping heartbeat");
491      heartbeatManager.StopHeartBeat();
492      SlaveClientCom.Instance.LogMessage("Stopping checkpointing");
493      taskManager.StopCheckpointing();
494
495      abortRequested = true;
496
497      DoAbortAll();
498
499      SlaveClientCom.Instance.LogMessage("Logging out");
500      WcfService.Instance.Disconnect();
501      SlaveClientCom.Instance.ClientCom.Shutdown();
502      SlaveClientCom.Close();
503
504      if (slaveComm.State != CommunicationState.Closed)
505        slaveComm.Close();
506    }
507
508    /// <summary>
509    /// reinitializes everything and continues operation,
510    /// can be called after Sleep()
511    /// </summary> 
512    private void DoStartSlave() {
513      SlaveClientCom.Instance.LogMessage("Restart received");
514      configManager.Asleep = false;
515    }
516
517    /// <summary>
518    /// stop slave, except for client gui communication,
519    /// primarily used by gui if core is running as windows service
520    /// </summary>   
521    private void Sleep() {
522      SlaveClientCom.Instance.LogMessage("Sleep received - not accepting any new jobs");
523      configManager.Asleep = true;
524      DoPauseAll();
525    }
526    #endregion
527  }
528}
Note: See TracBrowser for help on using the repository browser.