Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 9286

Last change on this file since 9286 was 8964, checked in by ascheibe, 12 years ago

#1986 fixed calling the shutdown command from a windows service

File size: 19.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.IO;
25using System.Reflection;
26using System.ServiceModel;
27using System.Threading;
28using System.Threading.Tasks;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using TS = System.Threading.Tasks;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    private static HeartbeatManager heartbeatManager;
41    public static HeartbeatManager HeartbeatManager {
42      get { return heartbeatManager; }
43    }
44
45    public EventLog ServiceEventLog { get; set; }
46
47    private Semaphore waitShutdownSem = new Semaphore(0, 1);
48    private bool abortRequested;
49    private ServiceHost slaveComm;
50    private WcfService wcfService;
51    private TaskManager taskManager;
52    private ConfigManager configManager;
53    private PluginManager pluginManager;
54
55    public Core() {
56      var log = new ThreadSafeLog(SlaveCore.Properties.Settings.Default.MaxLogCount);
57      this.pluginManager = new PluginManager(WcfService.Instance, log);
58      this.taskManager = new TaskManager(pluginManager, log);
59      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
60
61      RegisterTaskManagerEvents();
62
63      this.configManager = new ConfigManager(taskManager);
64      ConfigManager.Instance = this.configManager;
65    }
66
67    /// <summary>
68    /// Main method for the client
69    /// </summary>
70    public void Start() {
71      abortRequested = false;
72      EventLogManager.ServiceEventLog = ServiceEventLog;
73
74      try {
75        //start the client communication service (pipe between slave and slave gui)
76        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
77        slaveComm.Open();
78
79        // delete all left over task directories
80        pluginManager.CleanPluginTemp();
81        SlaveClientCom.Instance.LogMessage("Hive Slave started");
82
83        wcfService = WcfService.Instance;
84        RegisterServiceEvents();
85
86        StartHeartbeats(); // Start heartbeats thread       
87        DispatchMessageQueue(); // dispatch messages until abortRequested
88      }
89      catch (Exception ex) {
90        if (ServiceEventLog != null) {
91          EventLogManager.LogException(ex);
92        } else {
93          //try to log with SlaveClientCom.Instance. if this works the user sees at least a message,
94          //else an exception will be thrown anyways.
95          SlaveClientCom.Instance.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
96        }
97        ShutdownCore();
98      }
99      finally {
100        DeregisterServiceEvents();
101        waitShutdownSem.Release();
102      }
103    }
104
105    private void StartHeartbeats() {
106      //Initialize the heartbeat     
107      if (heartbeatManager == null) {
108        heartbeatManager = new HeartbeatManager();
109        heartbeatManager.StartHeartbeat();
110      }
111    }
112
113    private void DispatchMessageQueue() {
114      MessageQueue queue = MessageQueue.GetInstance();
115      while (!abortRequested) {
116        MessageContainer container = queue.GetMessage();
117        DetermineAction(container);
118        if (!abortRequested) {
119          SlaveClientCom.Instance.StatusChanged(configManager.GetStatusForClientConsole());
120        }
121      }
122    }
123
124    private void RegisterServiceEvents() {
125      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
126      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
127    }
128
129    private void DeregisterServiceEvents() {
130      WcfService.Instance.Connected -= WcfService_Connected;
131      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
132    }
133
134    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
135      SlaveClientCom.Instance.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
136    }
137
138    private void WcfService_Connected(object sender, EventArgs e) {
139      SlaveClientCom.Instance.LogMessage("Connected successfully to Hive server");
140    }
141
142    /// <summary>
143    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
144    /// </summary>
145    /// <param name="container">The container, containing the message</param>
146    private void DetermineAction(MessageContainer container) {
147      SlaveClientCom.Instance.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
148
149      switch (container.Message) {
150        case MessageContainer.MessageType.CalculateTask:
151          CalculateTaskAsync(container.TaskId);
152          break;
153        case MessageContainer.MessageType.AbortTask:
154          AbortTaskAsync(container.TaskId);
155          break;
156        case MessageContainer.MessageType.StopTask:
157          StopTaskAsync(container.TaskId);
158          break;
159        case MessageContainer.MessageType.PauseTask:
160          PauseTaskAsync(container.TaskId);
161          break;
162        case MessageContainer.MessageType.StopAll:
163          DoStopAll();
164          break;
165        case MessageContainer.MessageType.PauseAll:
166          DoPauseAll();
167          break;
168        case MessageContainer.MessageType.AbortAll:
169          DoAbortAll();
170          break;
171        case MessageContainer.MessageType.ShutdownSlave:
172          ShutdownCore();
173          break;
174        case MessageContainer.MessageType.Restart:
175          DoStartSlave();
176          break;
177        case MessageContainer.MessageType.Sleep:
178          Sleep();
179          break;
180        case MessageContainer.MessageType.SayHello:
181          wcfService.Connect(configManager.GetClientInfo());
182          break;
183        case MessageContainer.MessageType.NewHBInterval:
184          int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
185          if (interval != -1) {
186            HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
187          }
188          break;
189        case MessageContainer.MessageType.ShutdownComputer:
190          ShutdownComputer();
191          break;
192      }
193    }
194
195    private void CalculateTaskAsync(Guid jobId) {
196      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
197      .ContinueWith((t) => {
198        SlaveStatusInfo.IncrementExceptionOccured();
199        SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
200      }, TaskContinuationOptions.OnlyOnFaulted);
201    }
202
203    private void StopTaskAsync(Guid jobId) {
204      TS.Task.Factory.StartNew(HandleStopTask, jobId)
205       .ContinueWith((t) => {
206         SlaveStatusInfo.IncrementExceptionOccured();
207         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
208       }, TaskContinuationOptions.OnlyOnFaulted);
209    }
210
211    private void PauseTaskAsync(Guid jobId) {
212      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
213       .ContinueWith((t) => {
214         SlaveStatusInfo.IncrementExceptionOccured();
215         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
216       }, TaskContinuationOptions.OnlyOnFaulted);
217    }
218
219    private void AbortTaskAsync(Guid jobId) {
220      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
221       .ContinueWith((t) => {
222         SlaveStatusInfo.IncrementExceptionOccured();
223         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
224       }, TaskContinuationOptions.OnlyOnFaulted);
225    }
226
227    private void HandleCalculateTask(object taskIdObj) {
228      Guid taskId = (Guid)taskIdObj;
229      Task task = null;
230      int usedCores = 0;
231      try {
232        task = wcfService.GetTask(taskId);
233        if (task == null) throw new TaskNotFoundException(taskId);
234        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
235        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
236        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
237        TaskData taskData = wcfService.GetTaskData(taskId);
238        if (taskData == null) throw new TaskDataNotFoundException(taskId);
239        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
240        if (task == null) throw new TaskNotFoundException(taskId);
241        taskManager.StartTaskAsync(task, taskData);
242      }
243      catch (TaskNotFoundException) {
244        SlaveStatusInfo.DecrementUsedCores(usedCores);
245        throw;
246      }
247      catch (TaskDataNotFoundException) {
248        SlaveStatusInfo.DecrementUsedCores(usedCores);
249        throw;
250      }
251      catch (TaskAlreadyRunningException) {
252        SlaveStatusInfo.DecrementUsedCores(usedCores);
253        throw;
254      }
255      catch (OutOfCoresException) {
256        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
257        throw;
258      }
259      catch (OutOfMemoryException) {
260        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
261        throw;
262      }
263      catch (Exception e) {
264        SlaveStatusInfo.DecrementUsedCores(usedCores);
265        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
266        throw;
267      }
268    }
269
270    private void HandleStopTask(object taskIdObj) {
271      Guid taskId = (Guid)taskIdObj;
272      try {
273        Task task = wcfService.GetTask(taskId);
274        if (task == null) throw new TaskNotFoundException(taskId);
275        taskManager.StopTaskAsync(taskId);
276      }
277      catch (TaskNotFoundException) {
278        throw;
279      }
280      catch (TaskNotRunningException) {
281        throw;
282      }
283      catch (AppDomainNotCreatedException) {
284        throw;
285      }
286    }
287
288    private void HandlePauseTask(object taskIdObj) {
289      Guid taskId = (Guid)taskIdObj;
290      try {
291        Task task = wcfService.GetTask(taskId);
292        if (task == null) throw new TaskNotFoundException(taskId);
293        taskManager.PauseTaskAsync(taskId);
294      }
295      catch (TaskNotFoundException) {
296        throw;
297      }
298      catch (TaskNotRunningException) {
299        throw;
300      }
301      catch (AppDomainNotCreatedException) {
302        throw;
303      }
304    }
305
306    private void HandleAbortTask(object taskIdObj) {
307      Guid taskId = (Guid)taskIdObj;
308      try {
309        taskManager.AbortTask(taskId);
310      }
311      catch (TaskNotFoundException) {
312        throw;
313      }
314    }
315
316    #region TaskManager Events
317    private void RegisterTaskManagerEvents() {
318      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
319      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
320      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
321      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
322      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
323      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
324    }
325
326    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
327      // successfully started, everything is good
328    }
329
330    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
331      try {
332        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
333        heartbeatManager.AwakeHeartBeatThread();
334        Task task = wcfService.GetTask(e.Value.TaskId);
335        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
336        task.ExecutionTime = e.Value.ExecutionTime;
337        TaskData taskData = e.Value2;
338        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
339      }
340      catch (TaskNotFoundException ex) {
341        SlaveClientCom.Instance.LogMessage(ex.ToString());
342      }
343      catch (Exception ex) {
344        SlaveClientCom.Instance.LogMessage(ex.ToString());
345      }
346    }
347
348    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
349      try {
350        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
351        heartbeatManager.AwakeHeartBeatThread();
352        Task task = wcfService.GetTask(e.Value.TaskId);
353        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
354        task.ExecutionTime = e.Value.ExecutionTime;
355        TaskData taskData = e.Value2;
356        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
357      }
358      catch (TaskNotFoundException ex) {
359        SlaveClientCom.Instance.LogMessage(ex.ToString());
360      }
361      catch (Exception ex) {
362        SlaveClientCom.Instance.LogMessage(ex.ToString());
363      }
364    }
365
366    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
367      try {
368        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
369        heartbeatManager.AwakeHeartBeatThread();
370        SlaveTask slaveTask = e.Value.Item1;
371        TaskData taskData = e.Value.Item2;
372        Exception exception = e.Value.Item3;
373
374        Task task = wcfService.GetTask(slaveTask.TaskId);
375        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
376        task.ExecutionTime = slaveTask.ExecutionTime;
377        if (taskData != null) {
378          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
379        } else {
380          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
381        }
382        SlaveClientCom.Instance.LogMessage(exception.Message);
383      }
384      catch (TaskNotFoundException ex) {
385        SlaveStatusInfo.IncrementExceptionOccured();
386        SlaveClientCom.Instance.LogMessage(ex.ToString());
387      }
388      catch (Exception ex) {
389        SlaveStatusInfo.IncrementExceptionOccured();
390        SlaveClientCom.Instance.LogMessage(ex.ToString());
391      }
392    }
393
394    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
395      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
396      SlaveStatusInfo.IncrementExceptionOccured();
397      heartbeatManager.AwakeHeartBeatThread();
398      SlaveClientCom.Instance.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
399      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
400    }
401
402    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
403      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
404    }
405    #endregion
406
407    #region Log Events
408    private void log_MessageAdded(object sender, EventArgs<string> e) {
409      try {
410        SlaveClientCom.Instance.LogMessage(e.Value.Split('\t')[1]);
411      }
412      catch { }
413    }
414    #endregion
415
416    /// <summary>
417    /// aborts all running jobs, no results are sent back
418    /// </summary>
419    private void DoAbortAll() {
420      SlaveClientCom.Instance.LogMessage("Aborting all jobs.");
421      foreach (Guid taskId in taskManager.TaskIds) {
422        AbortTaskAsync(taskId);
423      }
424    }
425
426    /// <summary>
427    /// wait for jobs to finish, then pause client
428    /// </summary>
429    private void DoPauseAll() {
430      SlaveClientCom.Instance.LogMessage("Pausing all jobs.");
431      foreach (Guid taskId in taskManager.TaskIds) {
432        PauseTaskAsync(taskId);
433      }
434    }
435
436    /// <summary>
437    /// pause slave immediately
438    /// </summary>
439    private void DoStopAll() {
440      SlaveClientCom.Instance.LogMessage("Stopping all jobs.");
441      foreach (Guid taskId in taskManager.TaskIds) {
442        StopTaskAsync(taskId);
443      }
444    }
445
446    #region Slave Lifecycle Methods
447    /// <summary>
448    /// completly shudown slave
449    /// </summary>
450    public void Shutdown() {
451      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
452      MessageQueue.GetInstance().AddMessage(mc);
453      waitShutdownSem.WaitOne();
454    }
455
456    private void ShutdownComputer() {
457      var t = TS.Task.Factory.StartNew(new Action(Shutdown));
458      t.ContinueWith(c => {
459        try {
460          //we assume that *.exe means an executable in the current directory, otherwise it is a command
461          if (SlaveCore.Properties.Settings.Default.ShutdownCommand.EndsWith(".exe")) {
462            string dirName = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
463            Process.Start(Path.Combine(dirName, SlaveCore.Properties.Settings.Default.ShutdownCommand));
464          } else {
465            Process.Start(SlaveCore.Properties.Settings.Default.ShutdownCommand);
466          }
467        }
468        catch (Exception ex) {
469          if (ServiceEventLog != null) {
470            EventLogManager.LogException(ex);
471          } else
472            throw ex;
473        }
474      });
475    }
476
477    /// <summary>
478    /// complete shutdown, should be called before the the application is exited
479    /// </summary>
480    private void ShutdownCore() {
481      SlaveClientCom.Instance.LogMessage("Shutdown signal received");
482      SlaveClientCom.Instance.LogMessage("Stopping heartbeat");
483      heartbeatManager.StopHeartBeat();
484      abortRequested = true;
485
486      DoAbortAll();
487
488      SlaveClientCom.Instance.LogMessage("Logging out");
489      WcfService.Instance.Disconnect();
490      SlaveClientCom.Instance.ClientCom.Shutdown();
491      SlaveClientCom.Close();
492
493      if (slaveComm.State != CommunicationState.Closed)
494        slaveComm.Close();
495    }
496
497    /// <summary>
498    /// reinitializes everything and continues operation,
499    /// can be called after Sleep()
500    /// </summary> 
501    private void DoStartSlave() {
502      SlaveClientCom.Instance.LogMessage("Restart received");
503      configManager.Asleep = false;
504    }
505
506    /// <summary>
507    /// stop slave, except for client gui communication,
508    /// primarily used by gui if core is running as windows service
509    /// </summary>   
510    private void Sleep() {
511      SlaveClientCom.Instance.LogMessage("Sleep received - not accepting any new jobs");
512      configManager.Asleep = true;
513      DoPauseAll();
514    }
515    #endregion
516  }
517}
Note: See TracBrowser for help on using the repository browser.