Free cookie consent management tool by TermsFeed Policy Generator

source: branches/SlaveShutdown/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 8953

Last change on this file since 8953 was 8953, checked in by ascheibe, 11 years ago

#1986 added the ui components for shutting down slaves

File size: 18.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.ServiceModel;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Common;
28using HeuristicLab.Core;
29using TS = System.Threading.Tasks;
30
31
32namespace HeuristicLab.Clients.Hive.SlaveCore {
33  /// <summary>
34  /// The core component of the Hive Slave.
35  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
36  /// </summary>
37  public class Core : MarshalByRefObject {
38    private static HeartbeatManager heartbeatManager;
39    public static HeartbeatManager HeartbeatManager {
40      get { return heartbeatManager; }
41    }
42
43    public EventLog ServiceEventLog { get; set; }
44
45    private Semaphore waitShutdownSem = new Semaphore(0, 1);
46    private bool abortRequested;
47    private ServiceHost slaveComm;
48    private WcfService wcfService;
49    private TaskManager taskManager;
50    private ConfigManager configManager;
51    private PluginManager pluginManager;
52
53    public Core() {
54      var log = new ThreadSafeLog(SlaveCore.Properties.Settings.Default.MaxLogCount);
55      this.pluginManager = new PluginManager(WcfService.Instance, log);
56      this.taskManager = new TaskManager(pluginManager, log);
57      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
58
59      RegisterTaskManagerEvents();
60
61      this.configManager = new ConfigManager(taskManager);
62      ConfigManager.Instance = this.configManager;
63    }
64
65    /// <summary>
66    /// Main method for the client
67    /// </summary>
68    public void Start() {
69      abortRequested = false;
70      EventLogManager.ServiceEventLog = ServiceEventLog;
71
72      try {
73        //start the client communication service (pipe between slave and slave gui)
74        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
75        slaveComm.Open();
76
77        // delete all left over task directories
78        pluginManager.CleanPluginTemp();
79        SlaveClientCom.Instance.LogMessage("Hive Slave started");
80
81        wcfService = WcfService.Instance;
82        RegisterServiceEvents();
83
84        StartHeartbeats(); // Start heartbeats thread       
85        DispatchMessageQueue(); // dispatch messages until abortRequested
86      }
87      catch (Exception ex) {
88        if (ServiceEventLog != null) {
89          EventLogManager.LogException(ex);
90        } else {
91          //try to log with SlaveClientCom.Instance. if this works the user sees at least a message,
92          //else an exception will be thrown anyways.
93          SlaveClientCom.Instance.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
94        }
95        ShutdownCore();
96      }
97      finally {
98        DeregisterServiceEvents();
99        waitShutdownSem.Release();
100      }
101    }
102
103    private void StartHeartbeats() {
104      //Initialize the heartbeat     
105      if (heartbeatManager == null) {
106        heartbeatManager = new HeartbeatManager();
107        heartbeatManager.StartHeartbeat();
108      }
109    }
110
111    private void DispatchMessageQueue() {
112      MessageQueue queue = MessageQueue.GetInstance();
113      while (!abortRequested) {
114        MessageContainer container = queue.GetMessage();
115        DetermineAction(container);
116        if (!abortRequested) {
117          SlaveClientCom.Instance.StatusChanged(configManager.GetStatusForClientConsole());
118        }
119      }
120    }
121
122    private void RegisterServiceEvents() {
123      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
124      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
125    }
126
127    private void DeregisterServiceEvents() {
128      WcfService.Instance.Connected -= WcfService_Connected;
129      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
130    }
131
132    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
133      SlaveClientCom.Instance.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
134    }
135
136    private void WcfService_Connected(object sender, EventArgs e) {
137      SlaveClientCom.Instance.LogMessage("Connected successfully to Hive server");
138    }
139
140    /// <summary>
141    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
142    /// </summary>
143    /// <param name="container">The container, containing the message</param>
144    private void DetermineAction(MessageContainer container) {
145      SlaveClientCom.Instance.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
146
147      switch (container.Message) {
148        case MessageContainer.MessageType.CalculateTask:
149          CalculateTaskAsync(container.TaskId);
150          break;
151        case MessageContainer.MessageType.AbortTask:
152          AbortTaskAsync(container.TaskId);
153          break;
154        case MessageContainer.MessageType.StopTask:
155          StopTaskAsync(container.TaskId);
156          break;
157        case MessageContainer.MessageType.PauseTask:
158          PauseTaskAsync(container.TaskId);
159          break;
160        case MessageContainer.MessageType.StopAll:
161          DoStopAll();
162          break;
163        case MessageContainer.MessageType.PauseAll:
164          DoPauseAll();
165          break;
166        case MessageContainer.MessageType.AbortAll:
167          DoAbortAll();
168          break;
169        case MessageContainer.MessageType.ShutdownSlave:
170          ShutdownCore();
171          break;
172        case MessageContainer.MessageType.Restart:
173          DoStartSlave();
174          break;
175        case MessageContainer.MessageType.Sleep:
176          Sleep();
177          break;
178        case MessageContainer.MessageType.SayHello:
179          wcfService.Connect(configManager.GetClientInfo());
180          break;
181        case MessageContainer.MessageType.NewHBInterval:
182          int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
183          if (interval != -1) {
184            HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
185          }
186          break;
187        case MessageContainer.MessageType.ShutdownComputer:
188          ShutdownComputer();
189          break;
190      }
191    }
192
193    private void CalculateTaskAsync(Guid jobId) {
194      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
195      .ContinueWith((t) => {
196        SlaveStatusInfo.IncrementExceptionOccured();
197        SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
198      }, TaskContinuationOptions.OnlyOnFaulted);
199    }
200
201    private void StopTaskAsync(Guid jobId) {
202      TS.Task.Factory.StartNew(HandleStopTask, jobId)
203       .ContinueWith((t) => {
204         SlaveStatusInfo.IncrementExceptionOccured();
205         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
206       }, TaskContinuationOptions.OnlyOnFaulted);
207    }
208
209    private void PauseTaskAsync(Guid jobId) {
210      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
211       .ContinueWith((t) => {
212         SlaveStatusInfo.IncrementExceptionOccured();
213         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
214       }, TaskContinuationOptions.OnlyOnFaulted);
215    }
216
217    private void AbortTaskAsync(Guid jobId) {
218      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
219       .ContinueWith((t) => {
220         SlaveStatusInfo.IncrementExceptionOccured();
221         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
222       }, TaskContinuationOptions.OnlyOnFaulted);
223    }
224
225    private void HandleCalculateTask(object taskIdObj) {
226      Guid taskId = (Guid)taskIdObj;
227      Task task = null;
228      int usedCores = 0;
229      try {
230        task = wcfService.GetTask(taskId);
231        if (task == null) throw new TaskNotFoundException(taskId);
232        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
233        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
234        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
235        TaskData taskData = wcfService.GetTaskData(taskId);
236        if (taskData == null) throw new TaskDataNotFoundException(taskId);
237        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
238        if (task == null) throw new TaskNotFoundException(taskId);
239        taskManager.StartTaskAsync(task, taskData);
240      }
241      catch (TaskNotFoundException) {
242        SlaveStatusInfo.DecrementUsedCores(usedCores);
243        throw;
244      }
245      catch (TaskDataNotFoundException) {
246        SlaveStatusInfo.DecrementUsedCores(usedCores);
247        throw;
248      }
249      catch (TaskAlreadyRunningException) {
250        SlaveStatusInfo.DecrementUsedCores(usedCores);
251        throw;
252      }
253      catch (OutOfCoresException) {
254        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
255        throw;
256      }
257      catch (OutOfMemoryException) {
258        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
259        throw;
260      }
261      catch (Exception e) {
262        SlaveStatusInfo.DecrementUsedCores(usedCores);
263        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
264        throw;
265      }
266    }
267
268    private void HandleStopTask(object taskIdObj) {
269      Guid taskId = (Guid)taskIdObj;
270      try {
271        Task task = wcfService.GetTask(taskId);
272        if (task == null) throw new TaskNotFoundException(taskId);
273        taskManager.StopTaskAsync(taskId);
274      }
275      catch (TaskNotFoundException) {
276        throw;
277      }
278      catch (TaskNotRunningException) {
279        throw;
280      }
281      catch (AppDomainNotCreatedException) {
282        throw;
283      }
284    }
285
286    private void HandlePauseTask(object taskIdObj) {
287      Guid taskId = (Guid)taskIdObj;
288      try {
289        Task task = wcfService.GetTask(taskId);
290        if (task == null) throw new TaskNotFoundException(taskId);
291        taskManager.PauseTaskAsync(taskId);
292      }
293      catch (TaskNotFoundException) {
294        throw;
295      }
296      catch (TaskNotRunningException) {
297        throw;
298      }
299      catch (AppDomainNotCreatedException) {
300        throw;
301      }
302    }
303
304    private void HandleAbortTask(object taskIdObj) {
305      Guid taskId = (Guid)taskIdObj;
306      try {
307        taskManager.AbortTask(taskId);
308      }
309      catch (TaskNotFoundException) {
310        throw;
311      }
312    }
313
314    #region TaskManager Events
315    private void RegisterTaskManagerEvents() {
316      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
317      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
318      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
319      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
320      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
321      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
322    }
323
324    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
325      // successfully started, everything is good
326    }
327
328    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
329      try {
330        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
331        heartbeatManager.AwakeHeartBeatThread();
332        Task task = wcfService.GetTask(e.Value.TaskId);
333        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
334        task.ExecutionTime = e.Value.ExecutionTime;
335        TaskData taskData = e.Value2;
336        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
337      }
338      catch (TaskNotFoundException ex) {
339        SlaveClientCom.Instance.LogMessage(ex.ToString());
340      }
341      catch (Exception ex) {
342        SlaveClientCom.Instance.LogMessage(ex.ToString());
343      }
344    }
345
346    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
347      try {
348        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
349        heartbeatManager.AwakeHeartBeatThread();
350        Task task = wcfService.GetTask(e.Value.TaskId);
351        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
352        task.ExecutionTime = e.Value.ExecutionTime;
353        TaskData taskData = e.Value2;
354        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
355      }
356      catch (TaskNotFoundException ex) {
357        SlaveClientCom.Instance.LogMessage(ex.ToString());
358      }
359      catch (Exception ex) {
360        SlaveClientCom.Instance.LogMessage(ex.ToString());
361      }
362    }
363
364    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
365      try {
366        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
367        heartbeatManager.AwakeHeartBeatThread();
368        SlaveTask slaveTask = e.Value.Item1;
369        TaskData taskData = e.Value.Item2;
370        Exception exception = e.Value.Item3;
371
372        Task task = wcfService.GetTask(slaveTask.TaskId);
373        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
374        task.ExecutionTime = slaveTask.ExecutionTime;
375        if (taskData != null) {
376          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
377        } else {
378          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
379        }
380        SlaveClientCom.Instance.LogMessage(exception.Message);
381      }
382      catch (TaskNotFoundException ex) {
383        SlaveStatusInfo.IncrementExceptionOccured();
384        SlaveClientCom.Instance.LogMessage(ex.ToString());
385      }
386      catch (Exception ex) {
387        SlaveStatusInfo.IncrementExceptionOccured();
388        SlaveClientCom.Instance.LogMessage(ex.ToString());
389      }
390    }
391
392    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
393      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
394      SlaveStatusInfo.IncrementExceptionOccured();
395      heartbeatManager.AwakeHeartBeatThread();
396      SlaveClientCom.Instance.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
397      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
398    }
399
400    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
401      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
402    }
403    #endregion
404
405    #region Log Events
406    private void log_MessageAdded(object sender, EventArgs<string> e) {
407      try {
408        SlaveClientCom.Instance.LogMessage(e.Value.Split('\t')[1]);
409      }
410      catch { }
411    }
412    #endregion
413
414    /// <summary>
415    /// aborts all running jobs, no results are sent back
416    /// </summary>
417    private void DoAbortAll() {
418      SlaveClientCom.Instance.LogMessage("Aborting all jobs.");
419      foreach (Guid taskId in taskManager.TaskIds) {
420        AbortTaskAsync(taskId);
421      }
422    }
423
424    /// <summary>
425    /// wait for jobs to finish, then pause client
426    /// </summary>
427    private void DoPauseAll() {
428      SlaveClientCom.Instance.LogMessage("Pausing all jobs.");
429      foreach (Guid taskId in taskManager.TaskIds) {
430        PauseTaskAsync(taskId);
431      }
432    }
433
434    /// <summary>
435    /// pause slave immediately
436    /// </summary>
437    private void DoStopAll() {
438      SlaveClientCom.Instance.LogMessage("Stopping all jobs.");
439      foreach (Guid taskId in taskManager.TaskIds) {
440        StopTaskAsync(taskId);
441      }
442    }
443
444    #region Slave Lifecycle Methods
445    /// <summary>
446    /// completly shudown slave
447    /// </summary>
448    public void Shutdown() {
449      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
450      MessageQueue.GetInstance().AddMessage(mc);
451      waitShutdownSem.WaitOne();
452    }
453
454    private void ShutdownComputer() {
455      var t = TS.Task.Factory.StartNew(new Action(Shutdown));
456      t.ContinueWith(c => {
457        try {
458          Process.Start(SlaveCore.Properties.Settings.Default.ShutdownCommand);
459        }
460        catch (Exception ex) {
461          if (ServiceEventLog != null) {
462            EventLogManager.LogException(ex);
463          } else
464            throw ex;
465        }
466      });
467    }
468
469    /// <summary>
470    /// complete shutdown, should be called before the the application is exited
471    /// </summary>
472    private void ShutdownCore() {
473      SlaveClientCom.Instance.LogMessage("Shutdown signal received");
474      SlaveClientCom.Instance.LogMessage("Stopping heartbeat");
475      heartbeatManager.StopHeartBeat();
476      abortRequested = true;
477
478      DoAbortAll();
479
480      SlaveClientCom.Instance.LogMessage("Logging out");
481      WcfService.Instance.Disconnect();
482      SlaveClientCom.Instance.ClientCom.Shutdown();
483      SlaveClientCom.Close();
484
485      if (slaveComm.State != CommunicationState.Closed)
486        slaveComm.Close();
487    }
488
489    /// <summary>
490    /// reinitializes everything and continues operation,
491    /// can be called after Sleep()
492    /// </summary> 
493    private void DoStartSlave() {
494      SlaveClientCom.Instance.LogMessage("Restart received");
495      configManager.Asleep = false;
496    }
497
498    /// <summary>
499    /// stop slave, except for client gui communication,
500    /// primarily used by gui if core is running as windows service
501    /// </summary>   
502    private void Sleep() {
503      SlaveClientCom.Instance.LogMessage("Sleep received - not accepting any new jobs");
504      configManager.Asleep = true;
505      DoPauseAll();
506    }
507    #endregion
508  }
509}
Note: See TracBrowser for help on using the repository browser.