Free cookie consent management tool by TermsFeed Policy Generator

source: branches/SlaveShutdown/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 8949

Last change on this file since 8949 was 8949, checked in by ascheibe, 11 years ago

#1986 added slave command for shutting down a computer

File size: 18.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.ServiceModel;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive.SlaveCore.Properties;
28using HeuristicLab.Common;
29using HeuristicLab.Core;
30using TS = System.Threading.Tasks;
31
32
33namespace HeuristicLab.Clients.Hive.SlaveCore {
34  /// <summary>
35  /// The core component of the Hive Slave.
36  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
37  /// </summary>
38  public class Core : MarshalByRefObject {
39    private static HeartbeatManager heartbeatManager;
40    public static HeartbeatManager HeartbeatManager {
41      get { return heartbeatManager; }
42    }
43
44    public EventLog ServiceEventLog { get; set; }
45
46    private Semaphore waitShutdownSem = new Semaphore(0, 1);
47    private bool abortRequested;
48    private ServiceHost slaveComm;
49    private WcfService wcfService;
50    private TaskManager taskManager;
51    private ConfigManager configManager;
52    private PluginManager pluginManager;
53
54    public Core() {
55      var log = new ThreadSafeLog(Settings.Default.MaxLogCount);
56      this.pluginManager = new PluginManager(WcfService.Instance, log);
57      this.taskManager = new TaskManager(pluginManager, log);
58      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
59
60      RegisterTaskManagerEvents();
61
62      this.configManager = new ConfigManager(taskManager);
63      ConfigManager.Instance = this.configManager;
64    }
65
66    /// <summary>
67    /// Main method for the client
68    /// </summary>
69    public void Start() {
70      abortRequested = false;
71      EventLogManager.ServiceEventLog = ServiceEventLog;
72
73      try {
74        //start the client communication service (pipe between slave and slave gui)
75        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
76        slaveComm.Open();
77
78        // delete all left over task directories
79        pluginManager.CleanPluginTemp();
80        SlaveClientCom.Instance.LogMessage("Hive Slave started");
81
82        wcfService = WcfService.Instance;
83        RegisterServiceEvents();
84
85        StartHeartbeats(); // Start heartbeats thread       
86        DispatchMessageQueue(); // dispatch messages until abortRequested
87      }
88      catch (Exception ex) {
89        if (ServiceEventLog != null) {
90          EventLogManager.LogException(ex);
91        } else {
92          //try to log with SlaveClientCom.Instance. if this works the user sees at least a message,
93          //else an exception will be thrown anyways.
94          SlaveClientCom.Instance.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
95        }
96        ShutdownCore();
97      }
98      finally {
99        DeregisterServiceEvents();
100        waitShutdownSem.Release();
101      }
102    }
103
104    private void StartHeartbeats() {
105      //Initialize the heartbeat     
106      if (heartbeatManager == null) {
107        heartbeatManager = new HeartbeatManager();
108        heartbeatManager.StartHeartbeat();
109      }
110    }
111
112    private void DispatchMessageQueue() {
113      MessageQueue queue = MessageQueue.GetInstance();
114      while (!abortRequested) {
115        MessageContainer container = queue.GetMessage();
116        DetermineAction(container);
117        if (!abortRequested) {
118          SlaveClientCom.Instance.StatusChanged(configManager.GetStatusForClientConsole());
119        }
120      }
121    }
122
123    private void RegisterServiceEvents() {
124      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
125      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
126    }
127
128    private void DeregisterServiceEvents() {
129      WcfService.Instance.Connected -= WcfService_Connected;
130      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
131    }
132
133    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
134      SlaveClientCom.Instance.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
135    }
136
137    private void WcfService_Connected(object sender, EventArgs e) {
138      SlaveClientCom.Instance.LogMessage("Connected successfully to Hive server");
139    }
140
141    /// <summary>
142    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
143    /// </summary>
144    /// <param name="container">The container, containing the message</param>
145    private void DetermineAction(MessageContainer container) {
146      SlaveClientCom.Instance.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
147
148      switch (container.Message) {
149        case MessageContainer.MessageType.CalculateTask:
150          CalculateTaskAsync(container.TaskId);
151          break;
152        case MessageContainer.MessageType.AbortTask:
153          AbortTaskAsync(container.TaskId);
154          break;
155        case MessageContainer.MessageType.StopTask:
156          StopTaskAsync(container.TaskId);
157          break;
158        case MessageContainer.MessageType.PauseTask:
159          PauseTaskAsync(container.TaskId);
160          break;
161        case MessageContainer.MessageType.StopAll:
162          DoStopAll();
163          break;
164        case MessageContainer.MessageType.PauseAll:
165          DoPauseAll();
166          break;
167        case MessageContainer.MessageType.AbortAll:
168          DoAbortAll();
169          break;
170        case MessageContainer.MessageType.ShutdownSlave:
171          ShutdownCore();
172          break;
173        case MessageContainer.MessageType.Restart:
174          DoStartSlave();
175          break;
176        case MessageContainer.MessageType.Sleep:
177          Sleep();
178          break;
179        case MessageContainer.MessageType.SayHello:
180          wcfService.Connect(configManager.GetClientInfo());
181          break;
182        case MessageContainer.MessageType.NewHBInterval:
183          int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
184          if (interval != -1) {
185            HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
186          }
187          break;
188        case MessageContainer.MessageType.ShutdownComputer:
189          ShutdownComputer();
190          break;
191      }
192    }
193
194    private void CalculateTaskAsync(Guid jobId) {
195      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
196      .ContinueWith((t) => {
197        SlaveStatusInfo.IncrementExceptionOccured();
198        SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
199      }, TaskContinuationOptions.OnlyOnFaulted);
200    }
201
202    private void StopTaskAsync(Guid jobId) {
203      TS.Task.Factory.StartNew(HandleStopTask, jobId)
204       .ContinueWith((t) => {
205         SlaveStatusInfo.IncrementExceptionOccured();
206         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
207       }, TaskContinuationOptions.OnlyOnFaulted);
208    }
209
210    private void PauseTaskAsync(Guid jobId) {
211      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
212       .ContinueWith((t) => {
213         SlaveStatusInfo.IncrementExceptionOccured();
214         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
215       }, TaskContinuationOptions.OnlyOnFaulted);
216    }
217
218    private void AbortTaskAsync(Guid jobId) {
219      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
220       .ContinueWith((t) => {
221         SlaveStatusInfo.IncrementExceptionOccured();
222         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
223       }, TaskContinuationOptions.OnlyOnFaulted);
224    }
225
226    private void HandleCalculateTask(object taskIdObj) {
227      Guid taskId = (Guid)taskIdObj;
228      Task task = null;
229      int usedCores = 0;
230      try {
231        task = wcfService.GetTask(taskId);
232        if (task == null) throw new TaskNotFoundException(taskId);
233        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
234        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
235        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
236        TaskData taskData = wcfService.GetTaskData(taskId);
237        if (taskData == null) throw new TaskDataNotFoundException(taskId);
238        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
239        if (task == null) throw new TaskNotFoundException(taskId);
240        taskManager.StartTaskAsync(task, taskData);
241      }
242      catch (TaskNotFoundException) {
243        SlaveStatusInfo.DecrementUsedCores(usedCores);
244        throw;
245      }
246      catch (TaskDataNotFoundException) {
247        SlaveStatusInfo.DecrementUsedCores(usedCores);
248        throw;
249      }
250      catch (TaskAlreadyRunningException) {
251        SlaveStatusInfo.DecrementUsedCores(usedCores);
252        throw;
253      }
254      catch (OutOfCoresException) {
255        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
256        throw;
257      }
258      catch (OutOfMemoryException) {
259        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
260        throw;
261      }
262      catch (Exception e) {
263        SlaveStatusInfo.DecrementUsedCores(usedCores);
264        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
265        throw;
266      }
267    }
268
269    private void HandleStopTask(object taskIdObj) {
270      Guid taskId = (Guid)taskIdObj;
271      try {
272        Task task = wcfService.GetTask(taskId);
273        if (task == null) throw new TaskNotFoundException(taskId);
274        taskManager.StopTaskAsync(taskId);
275      }
276      catch (TaskNotFoundException) {
277        throw;
278      }
279      catch (TaskNotRunningException) {
280        throw;
281      }
282      catch (AppDomainNotCreatedException) {
283        throw;
284      }
285    }
286
287    private void HandlePauseTask(object taskIdObj) {
288      Guid taskId = (Guid)taskIdObj;
289      try {
290        Task task = wcfService.GetTask(taskId);
291        if (task == null) throw new TaskNotFoundException(taskId);
292        taskManager.PauseTaskAsync(taskId);
293      }
294      catch (TaskNotFoundException) {
295        throw;
296      }
297      catch (TaskNotRunningException) {
298        throw;
299      }
300      catch (AppDomainNotCreatedException) {
301        throw;
302      }
303    }
304
305    private void HandleAbortTask(object taskIdObj) {
306      Guid taskId = (Guid)taskIdObj;
307      try {
308        taskManager.AbortTask(taskId);
309      }
310      catch (TaskNotFoundException) {
311        throw;
312      }
313    }
314
315    #region TaskManager Events
316    private void RegisterTaskManagerEvents() {
317      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
318      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
319      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
320      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
321      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
322      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
323    }
324
325    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
326      // successfully started, everything is good
327    }
328
329    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
330      try {
331        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
332        heartbeatManager.AwakeHeartBeatThread();
333        Task task = wcfService.GetTask(e.Value.TaskId);
334        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
335        task.ExecutionTime = e.Value.ExecutionTime;
336        TaskData taskData = e.Value2;
337        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
338      }
339      catch (TaskNotFoundException ex) {
340        SlaveClientCom.Instance.LogMessage(ex.ToString());
341      }
342      catch (Exception ex) {
343        SlaveClientCom.Instance.LogMessage(ex.ToString());
344      }
345    }
346
347    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
348      try {
349        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
350        heartbeatManager.AwakeHeartBeatThread();
351        Task task = wcfService.GetTask(e.Value.TaskId);
352        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
353        task.ExecutionTime = e.Value.ExecutionTime;
354        TaskData taskData = e.Value2;
355        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
356      }
357      catch (TaskNotFoundException ex) {
358        SlaveClientCom.Instance.LogMessage(ex.ToString());
359      }
360      catch (Exception ex) {
361        SlaveClientCom.Instance.LogMessage(ex.ToString());
362      }
363    }
364
365    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
366      try {
367        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
368        heartbeatManager.AwakeHeartBeatThread();
369        SlaveTask slaveTask = e.Value.Item1;
370        TaskData taskData = e.Value.Item2;
371        Exception exception = e.Value.Item3;
372
373        Task task = wcfService.GetTask(slaveTask.TaskId);
374        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
375        task.ExecutionTime = slaveTask.ExecutionTime;
376        if (taskData != null) {
377          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
378        } else {
379          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
380        }
381        SlaveClientCom.Instance.LogMessage(exception.Message);
382      }
383      catch (TaskNotFoundException ex) {
384        SlaveStatusInfo.IncrementExceptionOccured();
385        SlaveClientCom.Instance.LogMessage(ex.ToString());
386      }
387      catch (Exception ex) {
388        SlaveStatusInfo.IncrementExceptionOccured();
389        SlaveClientCom.Instance.LogMessage(ex.ToString());
390      }
391    }
392
393    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
394      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
395      SlaveStatusInfo.IncrementExceptionOccured();
396      heartbeatManager.AwakeHeartBeatThread();
397      SlaveClientCom.Instance.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
398      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
399    }
400
401    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
402      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
403    }
404    #endregion
405
406    #region Log Events
407    private void log_MessageAdded(object sender, EventArgs<string> e) {
408      try {
409        SlaveClientCom.Instance.LogMessage(e.Value.Split('\t')[1]);
410      }
411      catch { }
412    }
413    #endregion
414
415    /// <summary>
416    /// aborts all running jobs, no results are sent back
417    /// </summary>
418    private void DoAbortAll() {
419      SlaveClientCom.Instance.LogMessage("Aborting all jobs.");
420      foreach (Guid taskId in taskManager.TaskIds) {
421        AbortTaskAsync(taskId);
422      }
423    }
424
425    /// <summary>
426    /// wait for jobs to finish, then pause client
427    /// </summary>
428    private void DoPauseAll() {
429      SlaveClientCom.Instance.LogMessage("Pausing all jobs.");
430      foreach (Guid taskId in taskManager.TaskIds) {
431        PauseTaskAsync(taskId);
432      }
433    }
434
435    /// <summary>
436    /// pause slave immediately
437    /// </summary>
438    private void DoStopAll() {
439      SlaveClientCom.Instance.LogMessage("Stopping all jobs.");
440      foreach (Guid taskId in taskManager.TaskIds) {
441        StopTaskAsync(taskId);
442      }
443    }
444
445    #region Slave Lifecycle Methods
446    /// <summary>
447    /// completly shudown slave
448    /// </summary>
449    public void Shutdown() {
450      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
451      MessageQueue.GetInstance().AddMessage(mc);
452      waitShutdownSem.WaitOne();
453    }
454
455    private void ShutdownComputer() {
456      Shutdown();
457
458      try {
459        Process.Start(Settings.Default.ShutdownCommand);
460      }
461      catch (Exception ex) {
462        if (ServiceEventLog != null) {
463          EventLogManager.LogException(ex);
464        } else
465          throw ex;
466      }
467    }
468
469    /// <summary>
470    /// complete shutdown, should be called before the the application is exited
471    /// </summary>
472    private void ShutdownCore() {
473      SlaveClientCom.Instance.LogMessage("Shutdown signal received");
474      SlaveClientCom.Instance.LogMessage("Stopping heartbeat");
475      heartbeatManager.StopHeartBeat();
476      abortRequested = true;
477
478      DoAbortAll();
479
480      SlaveClientCom.Instance.LogMessage("Logging out");
481      WcfService.Instance.Disconnect();
482      SlaveClientCom.Instance.ClientCom.Shutdown();
483      SlaveClientCom.Close();
484
485      if (slaveComm.State != CommunicationState.Closed)
486        slaveComm.Close();
487    }
488
489    /// <summary>
490    /// reinitializes everything and continues operation,
491    /// can be called after Sleep()
492    /// </summary> 
493    private void DoStartSlave() {
494      SlaveClientCom.Instance.LogMessage("Restart received");
495      configManager.Asleep = false;
496    }
497
498    /// <summary>
499    /// stop slave, except for client gui communication,
500    /// primarily used by gui if core is running as windows service
501    /// </summary>   
502    private void Sleep() {
503      SlaveClientCom.Instance.LogMessage("Sleep received - not accepting any new jobs");
504      configManager.Asleep = true;
505      DoPauseAll();
506    }
507    #endregion
508  }
509}
Note: See TracBrowser for help on using the repository browser.