Free cookie consent management tool by TermsFeed Policy Generator

source: branches/OaaS/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 10689

Last change on this file since 10689 was 9363, checked in by spimming, 12 years ago

#1888:

  • Merged revisions from trunk
File size: 20.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.IO;
25using System.Reflection;
26using System.ServiceModel;
27using System.Threading;
28using System.Threading.Tasks;
29using HeuristicLab.Clients.Hive.SlaveCore.Properties;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32using TS = System.Threading.Tasks;
33
34
35namespace HeuristicLab.Clients.Hive.SlaveCore {
36  /// <summary>
37  /// The core component of the Hive Slave.
38  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
39  /// </summary>
40  public class Core : MarshalByRefObject {
41    private static HeartbeatManager heartbeatManager;
42    public static HeartbeatManager HeartbeatManager {
43      get { return heartbeatManager; }
44    }
45
46    public EventLog ServiceEventLog { get; set; }
47
48    private Semaphore waitShutdownSem = new Semaphore(0, 1);
49    private bool abortRequested;
50    private ServiceHost slaveComm;
51    private WcfService wcfService;
52    private TaskManager taskManager;
53    private ConfigManager configManager;
54    private PluginManager pluginManager;
55    private bool startClientComService;
56
57    public Core()
58      : this(true) {
59    }
60
61    public Core(bool startClientComService) {
62      var log = new ThreadSafeLog(SlaveCore.Properties.Settings.Default.MaxLogCount);
63      this.pluginManager = new PluginManager(WcfService.Instance, log);
64      this.taskManager = new TaskManager(pluginManager, log);
65      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
66
67      RegisterTaskManagerEvents();
68
69      this.configManager = new ConfigManager(taskManager);
70      ConfigManager.Instance = this.configManager;
71
72      this.startClientComService = startClientComService;
73    }
74
75    /// <summary>
76    /// Main method for the client
77    /// </summary>
78    public void Start() {
79      abortRequested = false;
80      EventLogManager.ServiceEventLog = ServiceEventLog;
81
82      try {
83        if (startClientComService) {
84          //start the client communication service (pipe between slave and slave gui)
85          slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
86          slaveComm.Open();
87        }
88
89        // delete all left over task directories
90        pluginManager.CleanPluginTemp();
91        SlaveClientCom.Instance.LogMessage("Hive Slave started");
92
93        wcfService = WcfService.Instance;
94        RegisterServiceEvents();
95
96        StartHeartbeats(); // Start heartbeats thread       
97        DispatchMessageQueue(); // dispatch messages until abortRequested
98      }
99      catch (Exception ex) {
100        if (ServiceEventLog != null) {
101          EventLogManager.LogException(ex);
102        } else {
103          //try to log with SlaveClientCom.Instance. if this works the user sees at least a message,
104          //else an exception will be thrown anyways.
105          SlaveClientCom.Instance.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
106        }
107        ShutdownCore();
108      }
109      finally {
110        DeregisterServiceEvents();
111        waitShutdownSem.Release();
112      }
113    }
114
115    private void StartHeartbeats() {
116      //Initialize the heartbeat     
117      if (heartbeatManager == null) {
118        heartbeatManager = new HeartbeatManager();
119        heartbeatManager.StartHeartbeat();
120      }
121    }
122
123    private void DispatchMessageQueue() {
124      MessageQueue queue = MessageQueue.GetInstance();
125      while (!abortRequested) {
126        MessageContainer container = queue.GetMessage();
127        DetermineAction(container);
128        if (!abortRequested) {
129          SlaveClientCom.Instance.StatusChanged(configManager.GetStatusForClientConsole());
130        }
131      }
132    }
133
134    private void RegisterServiceEvents() {
135      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
136      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
137    }
138
139    private void DeregisterServiceEvents() {
140      WcfService.Instance.Connected -= WcfService_Connected;
141      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
142    }
143
144    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
145      SlaveClientCom.Instance.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
146    }
147
148    private void WcfService_Connected(object sender, EventArgs e) {
149      SlaveClientCom.Instance.LogMessage("Connected successfully to Hive server");
150    }
151
152    /// <summary>
153    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
154    /// </summary>
155    /// <param name="container">The container, containing the message</param>
156    private void DetermineAction(MessageContainer container) {
157      SlaveClientCom.Instance.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
158
159      switch (container.Message) {
160        case MessageContainer.MessageType.CalculateTask:
161          CalculateTaskAsync(container.TaskId);
162          break;
163        case MessageContainer.MessageType.AbortTask:
164          AbortTaskAsync(container.TaskId);
165          break;
166        case MessageContainer.MessageType.StopTask:
167          StopTaskAsync(container.TaskId);
168          break;
169        case MessageContainer.MessageType.PauseTask:
170          PauseTaskAsync(container.TaskId);
171          break;
172        case MessageContainer.MessageType.StopAll:
173          DoStopAll();
174          break;
175        case MessageContainer.MessageType.PauseAll:
176          DoPauseAll();
177          break;
178        case MessageContainer.MessageType.AbortAll:
179          DoAbortAll();
180          break;
181        case MessageContainer.MessageType.ShutdownSlave:
182          ShutdownCore();
183          break;
184        case MessageContainer.MessageType.Restart:
185          DoStartSlave();
186          break;
187        case MessageContainer.MessageType.Sleep:
188          Sleep();
189          break;
190        case MessageContainer.MessageType.SayHello:
191          wcfService.Connect(configManager.GetClientInfo());
192          break;
193        case MessageContainer.MessageType.NewHBInterval:
194          int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
195          if (interval != -1) {
196            HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
197          }
198          break;
199        case MessageContainer.MessageType.ShutdownComputer:
200          ShutdownComputer();
201          break;
202      }
203    }
204
205    private void CalculateTaskAsync(Guid jobId) {
206      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
207      .ContinueWith((t) => {
208        SlaveStatusInfo.IncrementExceptionOccured();
209        SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
210      }, TaskContinuationOptions.OnlyOnFaulted);
211    }
212
213    private void StopTaskAsync(Guid jobId) {
214      TS.Task.Factory.StartNew(HandleStopTask, jobId)
215       .ContinueWith((t) => {
216         SlaveStatusInfo.IncrementExceptionOccured();
217         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
218       }, TaskContinuationOptions.OnlyOnFaulted);
219    }
220
221    private void PauseTaskAsync(Guid jobId) {
222      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
223       .ContinueWith((t) => {
224         SlaveStatusInfo.IncrementExceptionOccured();
225         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
226       }, TaskContinuationOptions.OnlyOnFaulted);
227    }
228
229    private void AbortTaskAsync(Guid jobId) {
230      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
231       .ContinueWith((t) => {
232         SlaveStatusInfo.IncrementExceptionOccured();
233         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
234       }, TaskContinuationOptions.OnlyOnFaulted);
235    }
236
237    private void HandleCalculateTask(object taskIdObj) {
238      Guid taskId = (Guid)taskIdObj;
239      Task task = null;
240      int usedCores = 0;
241      try {
242        task = wcfService.GetTask(taskId);
243        if (task == null) throw new TaskNotFoundException(taskId);
244        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
245        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
246        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
247        TaskData taskData = wcfService.GetTaskData(taskId);
248        if (taskData == null) throw new TaskDataNotFoundException(taskId);
249        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
250        if (task == null) throw new TaskNotFoundException(taskId);
251        taskManager.StartTaskAsync(task, taskData);
252      }
253      catch (TaskNotFoundException) {
254        SlaveStatusInfo.DecrementUsedCores(usedCores);
255        throw;
256      }
257      catch (TaskDataNotFoundException) {
258        SlaveStatusInfo.DecrementUsedCores(usedCores);
259        throw;
260      }
261      catch (TaskAlreadyRunningException) {
262        SlaveStatusInfo.DecrementUsedCores(usedCores);
263        throw;
264      }
265      catch (OutOfCoresException) {
266        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
267        throw;
268      }
269      catch (OutOfMemoryException) {
270        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
271        throw;
272      }
273      catch (Exception e) {
274        SlaveStatusInfo.DecrementUsedCores(usedCores);
275        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
276        throw;
277      }
278    }
279
280    private void HandleStopTask(object taskIdObj) {
281      Guid taskId = (Guid)taskIdObj;
282      try {
283        Task task = wcfService.GetTask(taskId);
284        if (task == null) throw new TaskNotFoundException(taskId);
285        taskManager.StopTaskAsync(taskId);
286      }
287      catch (TaskNotFoundException) {
288        throw;
289      }
290      catch (TaskNotRunningException) {
291        throw;
292      }
293      catch (AppDomainNotCreatedException) {
294        throw;
295      }
296    }
297
298    private void HandlePauseTask(object taskIdObj) {
299      Guid taskId = (Guid)taskIdObj;
300      try {
301        Task task = wcfService.GetTask(taskId);
302        if (task == null) throw new TaskNotFoundException(taskId);
303        taskManager.PauseTaskAsync(taskId);
304      }
305      catch (TaskNotFoundException) {
306        throw;
307      }
308      catch (TaskNotRunningException) {
309        throw;
310      }
311      catch (AppDomainNotCreatedException) {
312        throw;
313      }
314    }
315
316    private void HandleAbortTask(object taskIdObj) {
317      Guid taskId = (Guid)taskIdObj;
318      try {
319        taskManager.AbortTask(taskId);
320      }
321      catch (TaskNotFoundException) {
322        throw;
323      }
324    }
325
326    #region TaskManager Events
327    private void RegisterTaskManagerEvents() {
328      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
329      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
330      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
331      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
332      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
333      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
334    }
335
336    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
337      // successfully started, everything is good
338    }
339
340    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
341      try {
342        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
343        heartbeatManager.AwakeHeartBeatThread();
344        Task task = wcfService.GetTask(e.Value.TaskId);
345        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
346        task.ExecutionTime = e.Value.ExecutionTime;
347        TaskData taskData = e.Value2;
348        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
349      }
350      catch (TaskNotFoundException ex) {
351        SlaveClientCom.Instance.LogMessage(ex.ToString());
352      }
353      catch (Exception ex) {
354        SlaveClientCom.Instance.LogMessage(ex.ToString());
355      }
356    }
357
358    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
359      try {
360        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
361        heartbeatManager.AwakeHeartBeatThread();
362        Task task = wcfService.GetTask(e.Value.TaskId);
363        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
364        task.ExecutionTime = e.Value.ExecutionTime;
365        TaskData taskData = e.Value2;
366        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
367      }
368      catch (TaskNotFoundException ex) {
369        SlaveClientCom.Instance.LogMessage(ex.ToString());
370      }
371      catch (Exception ex) {
372        SlaveClientCom.Instance.LogMessage(ex.ToString());
373      }
374    }
375
376    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
377      try {
378        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
379        heartbeatManager.AwakeHeartBeatThread();
380        SlaveTask slaveTask = e.Value.Item1;
381        TaskData taskData = e.Value.Item2;
382        Exception exception = e.Value.Item3;
383
384        Task task = wcfService.GetTask(slaveTask.TaskId);
385        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
386        task.ExecutionTime = slaveTask.ExecutionTime;
387        if (taskData != null) {
388          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
389        } else {
390          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
391        }
392        SlaveClientCom.Instance.LogMessage(exception.Message);
393      }
394      catch (TaskNotFoundException ex) {
395        SlaveStatusInfo.IncrementExceptionOccured();
396        SlaveClientCom.Instance.LogMessage(ex.ToString());
397      }
398      catch (Exception ex) {
399        SlaveStatusInfo.IncrementExceptionOccured();
400        SlaveClientCom.Instance.LogMessage(ex.ToString());
401      }
402    }
403
404    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
405      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
406      SlaveStatusInfo.IncrementExceptionOccured();
407      heartbeatManager.AwakeHeartBeatThread();
408      SlaveClientCom.Instance.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
409      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
410    }
411
412    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
413      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
414    }
415    #endregion
416
417    #region Log Events
418    private void log_MessageAdded(object sender, EventArgs<string> e) {
419      try {
420        SlaveClientCom.Instance.LogMessage(e.Value.Split('\t')[1]);
421      }
422      catch { }
423    }
424    #endregion
425
426    /// <summary>
427    /// aborts all running jobs, no results are sent back
428    /// </summary>
429    private void DoAbortAll() {
430      SlaveClientCom.Instance.LogMessage("Aborting all jobs.");
431      foreach (Guid taskId in taskManager.TaskIds) {
432        AbortTaskAsync(taskId);
433      }
434    }
435
436    /// <summary>
437    /// wait for jobs to finish, then pause client
438    /// </summary>
439    private void DoPauseAll() {
440      SlaveClientCom.Instance.LogMessage("Pausing all jobs.");
441      foreach (Guid taskId in taskManager.TaskIds) {
442        PauseTaskAsync(taskId);
443      }
444    }
445
446    /// <summary>
447    /// pause slave immediately
448    /// </summary>
449    private void DoStopAll() {
450      SlaveClientCom.Instance.LogMessage("Stopping all jobs.");
451      foreach (Guid taskId in taskManager.TaskIds) {
452        StopTaskAsync(taskId);
453      }
454    }
455
456    #region Slave Lifecycle Methods
457    /// <summary>
458    /// completly shudown slave
459    /// </summary>
460    public void Shutdown() {
461      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
462      MessageQueue.GetInstance().AddMessage(mc);
463      waitShutdownSem.WaitOne();
464    }
465
466    private void ShutdownComputer() {
467      var t = TS.Task.Factory.StartNew(new Action(Shutdown));
468      t.ContinueWith(c => {
469        try {
470          //we assume that *.exe means an executable in the current directory, otherwise it is a command
471          if (SlaveCore.Properties.Settings.Default.ShutdownCommand.EndsWith(".exe")) {
472            string dirName = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
473            Process.Start(Path.Combine(dirName, SlaveCore.Properties.Settings.Default.ShutdownCommand));
474          } else {
475            Process.Start(SlaveCore.Properties.Settings.Default.ShutdownCommand);
476          }
477        }
478        catch (Exception ex) {
479          if (ServiceEventLog != null) {
480            EventLogManager.LogException(ex);
481          } else
482            throw ex;
483        }
484      });
485    }
486
487    /// <summary>
488    /// complete shutdown, should be called before the the application is exited
489    /// </summary>
490    private void ShutdownCore() {
491      SlaveClientCom.Instance.LogMessage("Shutdown signal received");
492      SlaveClientCom.Instance.LogMessage("Stopping heartbeat");
493      heartbeatManager.StopHeartBeat();
494      abortRequested = true;
495
496      DoAbortAll();
497
498      SlaveClientCom.Instance.LogMessage("Logging out");
499      WcfService.Instance.Disconnect();
500      SlaveClientCom.Instance.ClientCom.Shutdown();
501      SlaveClientCom.Close();
502
503      if (startClientComService) {
504        if (slaveComm.State != CommunicationState.Closed)
505          slaveComm.Close();
506      }
507    }
508
509    /// <summary>
510    /// reinitializes everything and continues operation,
511    /// can be called after Sleep()
512    /// </summary> 
513    private void DoStartSlave() {
514      SlaveClientCom.Instance.LogMessage("Restart received");
515      configManager.Asleep = false;
516    }
517
518    /// <summary>
519    /// stop slave, except for client gui communication,
520    /// primarily used by gui if core is running as windows service
521    /// </summary>   
522    private void Sleep() {
523      SlaveClientCom.Instance.LogMessage("Sleep received - not accepting any new jobs");
524      configManager.Asleep = true;
525      DoPauseAll();
526    }
527    #endregion
528
529    public void SetNewHiveServer(string remote, string identity) {
530      SlaveClientCom.Instance.LogMessage("Set new hive server address and identity");
531      HiveServiceLocator.Instance.RemoteAddress = remote;
532      HiveServiceLocator.Instance.IdentityCertificate = identity;
533    }
534
535    public void ChangeHiveServerAndRestart(string remote, string identity) {
536      SlaveClientCom.Instance.LogMessage("Change hive server");
537      ShutdownCore();
538      SetNewHiveServer(remote, identity);
539      Start();
540    }
541  }
542}
Note: See TracBrowser for help on using the repository browser.