Free cookie consent management tool by TermsFeed Policy Generator

source: branches/DataPreprocessing/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 10854

Last change on this file since 10854 was 9456, checked in by swagner, 12 years ago

Updated copyright year and added some missing license headers (#1889)

File size: 19.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2013 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.IO;
25using System.Reflection;
26using System.ServiceModel;
27using System.Threading;
28using System.Threading.Tasks;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using TS = System.Threading.Tasks;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    private static HeartbeatManager heartbeatManager;
41    public static HeartbeatManager HeartbeatManager {
42      get { return heartbeatManager; }
43    }
44
45    public EventLog ServiceEventLog { get; set; }
46
47    private Semaphore waitShutdownSem = new Semaphore(0, 1);
48    private bool abortRequested;
49    private ServiceHost slaveComm;
50    private WcfService wcfService;
51    private TaskManager taskManager;
52    private ConfigManager configManager;
53    private PluginManager pluginManager;
54
55    public Core() {
56      var log = new ThreadSafeLog(SlaveCore.Properties.Settings.Default.MaxLogCount);
57      this.pluginManager = new PluginManager(WcfService.Instance, log);
58      this.taskManager = new TaskManager(pluginManager, log);
59      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
60
61      RegisterTaskManagerEvents();
62
63      this.configManager = new ConfigManager(taskManager);
64      ConfigManager.Instance = this.configManager;
65    }
66
67    /// <summary>
68    /// Main method for the client
69    /// </summary>
70    public void Start() {
71      abortRequested = false;
72      EventLogManager.ServiceEventLog = ServiceEventLog;
73
74      try {
75        //start the client communication service (pipe between slave and slave gui)
76        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
77
78        try {
79          slaveComm.Open();
80        }
81        catch (AddressAlreadyInUseException ex) {
82          if (ServiceEventLog != null) {
83            EventLogManager.LogException(ex);
84          }
85        }
86
87        // delete all left over task directories
88        pluginManager.CleanPluginTemp();
89        SlaveClientCom.Instance.LogMessage("Hive Slave started");
90
91        wcfService = WcfService.Instance;
92        RegisterServiceEvents();
93
94        StartHeartbeats(); // Start heartbeats thread       
95        DispatchMessageQueue(); // dispatch messages until abortRequested
96      }
97      catch (Exception ex) {
98        if (ServiceEventLog != null) {
99          EventLogManager.LogException(ex);
100        } else {
101          //try to log with SlaveClientCom.Instance. if this works the user sees at least a message,
102          //else an exception will be thrown anyways.
103          SlaveClientCom.Instance.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
104        }
105        ShutdownCore();
106      }
107      finally {
108        DeregisterServiceEvents();
109        waitShutdownSem.Release();
110      }
111    }
112
113    private void StartHeartbeats() {
114      //Initialize the heartbeat     
115      if (heartbeatManager == null) {
116        heartbeatManager = new HeartbeatManager();
117        heartbeatManager.StartHeartbeat();
118      }
119    }
120
121    private void DispatchMessageQueue() {
122      MessageQueue queue = MessageQueue.GetInstance();
123      while (!abortRequested) {
124        MessageContainer container = queue.GetMessage();
125        DetermineAction(container);
126        if (!abortRequested) {
127          SlaveClientCom.Instance.StatusChanged(configManager.GetStatusForClientConsole());
128        }
129      }
130    }
131
132    private void RegisterServiceEvents() {
133      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
134      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
135    }
136
137    private void DeregisterServiceEvents() {
138      WcfService.Instance.Connected -= WcfService_Connected;
139      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
140    }
141
142    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
143      SlaveClientCom.Instance.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
144    }
145
146    private void WcfService_Connected(object sender, EventArgs e) {
147      SlaveClientCom.Instance.LogMessage("Connected successfully to Hive server");
148    }
149
150    /// <summary>
151    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
152    /// </summary>
153    /// <param name="container">The container, containing the message</param>
154    private void DetermineAction(MessageContainer container) {
155      SlaveClientCom.Instance.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
156
157      switch (container.Message) {
158        case MessageContainer.MessageType.CalculateTask:
159          CalculateTaskAsync(container.TaskId);
160          break;
161        case MessageContainer.MessageType.AbortTask:
162          AbortTaskAsync(container.TaskId);
163          break;
164        case MessageContainer.MessageType.StopTask:
165          StopTaskAsync(container.TaskId);
166          break;
167        case MessageContainer.MessageType.PauseTask:
168          PauseTaskAsync(container.TaskId);
169          break;
170        case MessageContainer.MessageType.StopAll:
171          DoStopAll();
172          break;
173        case MessageContainer.MessageType.PauseAll:
174          DoPauseAll();
175          break;
176        case MessageContainer.MessageType.AbortAll:
177          DoAbortAll();
178          break;
179        case MessageContainer.MessageType.ShutdownSlave:
180          ShutdownCore();
181          break;
182        case MessageContainer.MessageType.Restart:
183          DoStartSlave();
184          break;
185        case MessageContainer.MessageType.Sleep:
186          Sleep();
187          break;
188        case MessageContainer.MessageType.SayHello:
189          wcfService.Connect(configManager.GetClientInfo());
190          break;
191        case MessageContainer.MessageType.NewHBInterval:
192          int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
193          if (interval != -1) {
194            HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
195          }
196          break;
197        case MessageContainer.MessageType.ShutdownComputer:
198          ShutdownComputer();
199          break;
200      }
201    }
202
203    private void CalculateTaskAsync(Guid jobId) {
204      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
205      .ContinueWith((t) => {
206        SlaveStatusInfo.IncrementExceptionOccured();
207        SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
208      }, TaskContinuationOptions.OnlyOnFaulted);
209    }
210
211    private void StopTaskAsync(Guid jobId) {
212      TS.Task.Factory.StartNew(HandleStopTask, jobId)
213       .ContinueWith((t) => {
214         SlaveStatusInfo.IncrementExceptionOccured();
215         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
216       }, TaskContinuationOptions.OnlyOnFaulted);
217    }
218
219    private void PauseTaskAsync(Guid jobId) {
220      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
221       .ContinueWith((t) => {
222         SlaveStatusInfo.IncrementExceptionOccured();
223         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
224       }, TaskContinuationOptions.OnlyOnFaulted);
225    }
226
227    private void AbortTaskAsync(Guid jobId) {
228      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
229       .ContinueWith((t) => {
230         SlaveStatusInfo.IncrementExceptionOccured();
231         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
232       }, TaskContinuationOptions.OnlyOnFaulted);
233    }
234
235    private void HandleCalculateTask(object taskIdObj) {
236      Guid taskId = (Guid)taskIdObj;
237      Task task = null;
238      int usedCores = 0;
239      try {
240        task = wcfService.GetTask(taskId);
241        if (task == null) throw new TaskNotFoundException(taskId);
242        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
243        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
244        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
245        TaskData taskData = wcfService.GetTaskData(taskId);
246        if (taskData == null) throw new TaskDataNotFoundException(taskId);
247        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
248        if (task == null) throw new TaskNotFoundException(taskId);
249        taskManager.StartTaskAsync(task, taskData);
250      }
251      catch (TaskNotFoundException) {
252        SlaveStatusInfo.DecrementUsedCores(usedCores);
253        throw;
254      }
255      catch (TaskDataNotFoundException) {
256        SlaveStatusInfo.DecrementUsedCores(usedCores);
257        throw;
258      }
259      catch (TaskAlreadyRunningException) {
260        SlaveStatusInfo.DecrementUsedCores(usedCores);
261        throw;
262      }
263      catch (OutOfCoresException) {
264        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
265        throw;
266      }
267      catch (OutOfMemoryException) {
268        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
269        throw;
270      }
271      catch (Exception e) {
272        SlaveStatusInfo.DecrementUsedCores(usedCores);
273        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
274        throw;
275      }
276    }
277
278    private void HandleStopTask(object taskIdObj) {
279      Guid taskId = (Guid)taskIdObj;
280      try {
281        Task task = wcfService.GetTask(taskId);
282        if (task == null) throw new TaskNotFoundException(taskId);
283        taskManager.StopTaskAsync(taskId);
284      }
285      catch (TaskNotFoundException) {
286        throw;
287      }
288      catch (TaskNotRunningException) {
289        throw;
290      }
291      catch (AppDomainNotCreatedException) {
292        throw;
293      }
294    }
295
296    private void HandlePauseTask(object taskIdObj) {
297      Guid taskId = (Guid)taskIdObj;
298      try {
299        Task task = wcfService.GetTask(taskId);
300        if (task == null) throw new TaskNotFoundException(taskId);
301        taskManager.PauseTaskAsync(taskId);
302      }
303      catch (TaskNotFoundException) {
304        throw;
305      }
306      catch (TaskNotRunningException) {
307        throw;
308      }
309      catch (AppDomainNotCreatedException) {
310        throw;
311      }
312    }
313
314    private void HandleAbortTask(object taskIdObj) {
315      Guid taskId = (Guid)taskIdObj;
316      try {
317        taskManager.AbortTask(taskId);
318      }
319      catch (TaskNotFoundException) {
320        throw;
321      }
322    }
323
324    #region TaskManager Events
325    private void RegisterTaskManagerEvents() {
326      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
327      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
328      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
329      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
330      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
331      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
332    }
333
334    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
335      // successfully started, everything is good
336    }
337
338    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
339      try {
340        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
341        heartbeatManager.AwakeHeartBeatThread();
342        Task task = wcfService.GetTask(e.Value.TaskId);
343        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
344        task.ExecutionTime = e.Value.ExecutionTime;
345        TaskData taskData = e.Value2;
346        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
347      }
348      catch (TaskNotFoundException ex) {
349        SlaveClientCom.Instance.LogMessage(ex.ToString());
350      }
351      catch (Exception ex) {
352        SlaveClientCom.Instance.LogMessage(ex.ToString());
353      }
354    }
355
356    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
357      try {
358        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
359        heartbeatManager.AwakeHeartBeatThread();
360        Task task = wcfService.GetTask(e.Value.TaskId);
361        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
362        task.ExecutionTime = e.Value.ExecutionTime;
363        TaskData taskData = e.Value2;
364        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
365      }
366      catch (TaskNotFoundException ex) {
367        SlaveClientCom.Instance.LogMessage(ex.ToString());
368      }
369      catch (Exception ex) {
370        SlaveClientCom.Instance.LogMessage(ex.ToString());
371      }
372    }
373
374    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
375      try {
376        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
377        heartbeatManager.AwakeHeartBeatThread();
378        SlaveTask slaveTask = e.Value.Item1;
379        TaskData taskData = e.Value.Item2;
380        Exception exception = e.Value.Item3;
381
382        Task task = wcfService.GetTask(slaveTask.TaskId);
383        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
384        task.ExecutionTime = slaveTask.ExecutionTime;
385        if (taskData != null) {
386          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
387        } else {
388          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
389        }
390        SlaveClientCom.Instance.LogMessage(exception.Message);
391      }
392      catch (TaskNotFoundException ex) {
393        SlaveStatusInfo.IncrementExceptionOccured();
394        SlaveClientCom.Instance.LogMessage(ex.ToString());
395      }
396      catch (Exception ex) {
397        SlaveStatusInfo.IncrementExceptionOccured();
398        SlaveClientCom.Instance.LogMessage(ex.ToString());
399      }
400    }
401
402    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
403      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
404      SlaveStatusInfo.IncrementExceptionOccured();
405      heartbeatManager.AwakeHeartBeatThread();
406      SlaveClientCom.Instance.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
407      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
408    }
409
410    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
411      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
412    }
413    #endregion
414
415    #region Log Events
416    private void log_MessageAdded(object sender, EventArgs<string> e) {
417      try {
418        SlaveClientCom.Instance.LogMessage(e.Value.Split('\t')[1]);
419      }
420      catch { }
421    }
422    #endregion
423
424    /// <summary>
425    /// aborts all running jobs, no results are sent back
426    /// </summary>
427    private void DoAbortAll() {
428      SlaveClientCom.Instance.LogMessage("Aborting all jobs.");
429      foreach (Guid taskId in taskManager.TaskIds) {
430        AbortTaskAsync(taskId);
431      }
432    }
433
434    /// <summary>
435    /// wait for jobs to finish, then pause client
436    /// </summary>
437    private void DoPauseAll() {
438      SlaveClientCom.Instance.LogMessage("Pausing all jobs.");
439      foreach (Guid taskId in taskManager.TaskIds) {
440        PauseTaskAsync(taskId);
441      }
442    }
443
444    /// <summary>
445    /// pause slave immediately
446    /// </summary>
447    private void DoStopAll() {
448      SlaveClientCom.Instance.LogMessage("Stopping all jobs.");
449      foreach (Guid taskId in taskManager.TaskIds) {
450        StopTaskAsync(taskId);
451      }
452    }
453
454    #region Slave Lifecycle Methods
455    /// <summary>
456    /// completly shudown slave
457    /// </summary>
458    public void Shutdown() {
459      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
460      MessageQueue.GetInstance().AddMessage(mc);
461      waitShutdownSem.WaitOne();
462    }
463
464    private void ShutdownComputer() {
465      var t = TS.Task.Factory.StartNew(new Action(Shutdown));
466      t.ContinueWith(c => {
467        try {
468          //we assume that *.exe means an executable in the current directory, otherwise it is a command
469          if (SlaveCore.Properties.Settings.Default.ShutdownCommand.EndsWith(".exe")) {
470            string dirName = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
471            Process.Start(Path.Combine(dirName, SlaveCore.Properties.Settings.Default.ShutdownCommand));
472          } else {
473            Process.Start(SlaveCore.Properties.Settings.Default.ShutdownCommand);
474          }
475        }
476        catch (Exception ex) {
477          if (ServiceEventLog != null) {
478            EventLogManager.LogException(ex);
479          } else
480            throw ex;
481        }
482      });
483    }
484
485    /// <summary>
486    /// complete shutdown, should be called before the the application is exited
487    /// </summary>
488    private void ShutdownCore() {
489      SlaveClientCom.Instance.LogMessage("Shutdown signal received");
490      SlaveClientCom.Instance.LogMessage("Stopping heartbeat");
491      heartbeatManager.StopHeartBeat();
492      abortRequested = true;
493
494      DoAbortAll();
495
496      SlaveClientCom.Instance.LogMessage("Logging out");
497      WcfService.Instance.Disconnect();
498      SlaveClientCom.Instance.ClientCom.Shutdown();
499      SlaveClientCom.Close();
500
501      if (slaveComm.State != CommunicationState.Closed)
502        slaveComm.Close();
503    }
504
505    /// <summary>
506    /// reinitializes everything and continues operation,
507    /// can be called after Sleep()
508    /// </summary> 
509    private void DoStartSlave() {
510      SlaveClientCom.Instance.LogMessage("Restart received");
511      configManager.Asleep = false;
512    }
513
514    /// <summary>
515    /// stop slave, except for client gui communication,
516    /// primarily used by gui if core is running as windows service
517    /// </summary>   
518    private void Sleep() {
519      SlaveClientCom.Instance.LogMessage("Sleep received - not accepting any new jobs");
520      configManager.Asleep = true;
521      DoPauseAll();
522    }
523    #endregion
524  }
525}
Note: See TracBrowser for help on using the repository browser.