Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive.Azure/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 7461

Last change on this file since 7461 was 7270, checked in by spimming, 13 years ago

#1680:

  • merged changes from trunk into branch
File size: 18.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.ServiceModel;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive.SlaveCore.Properties;
28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using TS = System.Threading.Tasks;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    private static HeartbeatManager heartbeatManager;
41    public static HeartbeatManager HeartbeatManager {
42      get { return heartbeatManager; }
43    }
44
45    public EventLog ServiceEventLog { get; set; }
46
47    private Semaphore waitShutdownSem = new Semaphore(0, 1);
48    private bool abortRequested;
49    private ServiceHost slaveComm;
50    private WcfService wcfService;
51    private TaskManager taskManager;
52    private ConfigManager configManager;
53    private PluginManager pluginManager;
54    private bool startClientComService;
55
56    public Core()
57      : this(true) {
58    }
59
60    public Core(bool startClientComService) {
61      var log = new ThreadSafeLog(Settings.Default.MaxLogCount);
62      this.pluginManager = new PluginManager(WcfService.Instance, log);
63      this.taskManager = new TaskManager(pluginManager, log);
64      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
65
66      RegisterTaskManagerEvents();
67
68      this.configManager = new ConfigManager(taskManager);
69      ConfigManager.Instance = this.configManager;
70
71      this.startClientComService = startClientComService;
72    }
73
74    /// <summary>
75    /// Main method for the client
76    /// </summary>
77    public void Start() {
78      abortRequested = false;
79      EventLogManager.ServiceEventLog = ServiceEventLog;
80
81      try {
82        if (startClientComService) {
83          //start the client communication service (pipe between slave and slave gui)
84          slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
85          slaveComm.Open();
86        }
87       
88        // delete all left over task directories
89        pluginManager.CleanPluginTemp();
90        SlaveClientCom.Instance.LogMessage("Hive Slave started");
91
92        wcfService = WcfService.Instance;
93        RegisterServiceEvents();
94
95        StartHeartbeats(); // Start heartbeats thread       
96        DispatchMessageQueue(); // dispatch messages until abortRequested
97      }
98      catch (Exception ex) {
99        if (ServiceEventLog != null) {
100          EventLogManager.LogException(ex);
101        } else {
102          //try to log with SlaveClientCom.Instance. if this works the user sees at least a message,
103          //else an exception will be thrown anyways.
104          SlaveClientCom.Instance.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
105        }
106        ShutdownCore();
107      } finally {
108        DeregisterServiceEvents();
109        waitShutdownSem.Release();
110      }
111    }
112
113    private void StartHeartbeats() {
114      //Initialize the heartbeat     
115      if (heartbeatManager == null) {
116        heartbeatManager = new HeartbeatManager();
117        heartbeatManager.StartHeartbeat();
118      }
119    }
120
121    private void DispatchMessageQueue() {
122      MessageQueue queue = MessageQueue.GetInstance();
123      while (!abortRequested) {
124        MessageContainer container = queue.GetMessage();
125        DetermineAction(container);
126        if (!abortRequested) {
127          SlaveClientCom.Instance.StatusChanged(configManager.GetStatusForClientConsole());
128        }
129      }
130    }
131
132    private void RegisterServiceEvents() {
133      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
134      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
135    }
136
137    private void DeregisterServiceEvents() {
138      WcfService.Instance.Connected -= WcfService_Connected;
139      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
140    }
141
142    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
143      SlaveClientCom.Instance.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
144    }
145
146    private void WcfService_Connected(object sender, EventArgs e) {
147      SlaveClientCom.Instance.LogMessage("Connected successfully to Hive server");
148    }
149
150    /// <summary>
151    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
152    /// </summary>
153    /// <param name="container">The container, containing the message</param>
154    private void DetermineAction(MessageContainer container) {
155      SlaveClientCom.Instance.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
156
157      switch (container.Message) {
158        case MessageContainer.MessageType.CalculateTask:
159          CalculateTaskAsync(container.TaskId);
160          break;
161        case MessageContainer.MessageType.AbortTask:
162          AbortTaskAsync(container.TaskId);
163          break;
164        case MessageContainer.MessageType.StopTask:
165          StopTaskAsync(container.TaskId);
166          break;
167        case MessageContainer.MessageType.PauseTask:
168          PauseTaskAsync(container.TaskId);
169          break;
170        case MessageContainer.MessageType.StopAll:
171          DoStopAll();
172          break;
173        case MessageContainer.MessageType.PauseAll:
174          DoPauseAll();
175          break;
176        case MessageContainer.MessageType.AbortAll:
177          DoAbortAll();
178          break;
179        case MessageContainer.MessageType.ShutdownSlave:
180          ShutdownCore();
181          break;
182        case MessageContainer.MessageType.Restart:
183          DoStartSlave();
184          break;
185        case MessageContainer.MessageType.Sleep:
186          Sleep();
187          break;
188        case MessageContainer.MessageType.SayHello:
189          wcfService.Connect(configManager.GetClientInfo());
190          break;
191        case MessageContainer.MessageType.NewHBInterval:
192          int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
193          if (interval != -1) {
194            HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
195          }
196          break;
197      }
198    }
199
200    private void CalculateTaskAsync(Guid jobId) {
201      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
202      .ContinueWith((t) => {
203        SlaveStatusInfo.IncrementExceptionOccured();
204        SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
205      }, TaskContinuationOptions.OnlyOnFaulted);
206    }
207
208    private void StopTaskAsync(Guid jobId) {
209      TS.Task.Factory.StartNew(HandleStopTask, jobId)
210       .ContinueWith((t) => {
211         SlaveStatusInfo.IncrementExceptionOccured();
212         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
213       }, TaskContinuationOptions.OnlyOnFaulted);
214    }
215
216    private void PauseTaskAsync(Guid jobId) {
217      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
218       .ContinueWith((t) => {
219         SlaveStatusInfo.IncrementExceptionOccured();
220         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
221       }, TaskContinuationOptions.OnlyOnFaulted);
222    }
223
224    private void AbortTaskAsync(Guid jobId) {
225      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
226       .ContinueWith((t) => {
227         SlaveStatusInfo.IncrementExceptionOccured();
228         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
229       }, TaskContinuationOptions.OnlyOnFaulted);
230    }
231
232    private void HandleCalculateTask(object taskIdObj) {
233      Guid taskId = (Guid)taskIdObj;
234      Task task = null;
235      int usedCores = 0;
236      try {
237        task = wcfService.GetTask(taskId);
238        if (task == null) throw new TaskNotFoundException(taskId);
239        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
240        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
241        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
242        TaskData taskData = wcfService.GetTaskData(taskId);
243        if (taskData == null) throw new TaskDataNotFoundException(taskId);
244        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
245        if (task == null) throw new TaskNotFoundException(taskId);
246        taskManager.StartTaskAsync(task, taskData);
247      }
248      catch (TaskNotFoundException) {
249        SlaveStatusInfo.DecrementUsedCores(usedCores);
250        throw;
251      }
252      catch (TaskDataNotFoundException) {
253        SlaveStatusInfo.DecrementUsedCores(usedCores);
254        throw;
255      }
256      catch (TaskAlreadyRunningException) {
257        SlaveStatusInfo.DecrementUsedCores(usedCores);
258        throw;
259      }
260      catch (OutOfCoresException) {
261        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
262        throw;
263      }
264      catch (OutOfMemoryException) {
265        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
266        throw;
267      }
268      catch (Exception e) {
269        SlaveStatusInfo.DecrementUsedCores(usedCores);
270        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
271        throw;
272      }
273    }
274
275    private void HandleStopTask(object taskIdObj) {
276      Guid taskId = (Guid)taskIdObj;
277      try {
278        Task task = wcfService.GetTask(taskId);
279        if (task == null) throw new TaskNotFoundException(taskId);
280        taskManager.StopTaskAsync(taskId);
281      }
282      catch (TaskNotFoundException) {
283        throw;
284      }
285      catch (TaskNotRunningException) {
286        throw;
287      }
288      catch (AppDomainNotCreatedException) {
289        throw;
290      }
291    }
292
293    private void HandlePauseTask(object taskIdObj) {
294      Guid taskId = (Guid)taskIdObj;
295      try {
296        Task task = wcfService.GetTask(taskId);
297        if (task == null) throw new TaskNotFoundException(taskId);
298        taskManager.PauseTaskAsync(taskId);
299      }
300      catch (TaskNotFoundException) {
301        throw;
302      }
303      catch (TaskNotRunningException) {
304        throw;
305      }
306      catch (AppDomainNotCreatedException) {
307        throw;
308      }
309    }
310
311    private void HandleAbortTask(object taskIdObj) {
312      Guid taskId = (Guid)taskIdObj;
313      try {
314        taskManager.AbortTask(taskId);
315      }
316      catch (TaskNotFoundException) {
317        throw;
318      }
319    }
320
321    #region TaskManager Events
322    private void RegisterTaskManagerEvents() {
323      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
324      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
325      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
326      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
327      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
328      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
329    }
330
331    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
332      // successfully started, everything is good
333    }
334
335    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
336      try {
337        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
338        heartbeatManager.AwakeHeartBeatThread();
339        Task task = wcfService.GetTask(e.Value.TaskId);
340        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
341        task.ExecutionTime = e.Value.ExecutionTime;
342        TaskData taskData = e.Value2;
343        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
344      }
345      catch (TaskNotFoundException ex) {
346        SlaveClientCom.Instance.LogMessage(ex.ToString());
347      }
348      catch (Exception ex) {
349        SlaveClientCom.Instance.LogMessage(ex.ToString());
350      }
351    }
352
353    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
354      try {
355        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
356        heartbeatManager.AwakeHeartBeatThread();
357        Task task = wcfService.GetTask(e.Value.TaskId);
358        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
359        task.ExecutionTime = e.Value.ExecutionTime;
360        TaskData taskData = e.Value2;
361        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
362      }
363      catch (TaskNotFoundException ex) {
364        SlaveClientCom.Instance.LogMessage(ex.ToString());
365      }
366      catch (Exception ex) {
367        SlaveClientCom.Instance.LogMessage(ex.ToString());
368      }
369    }
370
371    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
372      try {
373        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
374        heartbeatManager.AwakeHeartBeatThread();
375        SlaveTask slaveTask = e.Value.Item1;
376        TaskData taskData = e.Value.Item2;
377        Exception exception = e.Value.Item3;
378
379        Task task = wcfService.GetTask(slaveTask.TaskId);
380        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
381        task.ExecutionTime = slaveTask.ExecutionTime;
382        if (taskData != null) {
383          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
384        } else {
385          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
386        }
387        SlaveClientCom.Instance.LogMessage(exception.Message);
388      }
389      catch (TaskNotFoundException ex) {
390        SlaveStatusInfo.IncrementExceptionOccured();
391        SlaveClientCom.Instance.LogMessage(ex.ToString());
392      }
393      catch (Exception ex) {
394        SlaveStatusInfo.IncrementExceptionOccured();
395        SlaveClientCom.Instance.LogMessage(ex.ToString());
396      }
397    }
398
399    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
400      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
401      SlaveStatusInfo.IncrementExceptionOccured();
402      heartbeatManager.AwakeHeartBeatThread();
403      SlaveClientCom.Instance.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
404      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
405    }
406
407    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
408      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
409    }
410    #endregion
411
412    #region Log Events
413    private void log_MessageAdded(object sender, EventArgs<string> e) {
414      try {
415        SlaveClientCom.Instance.LogMessage(e.Value.Split('\t')[1]);
416      }
417      catch { }
418    }
419    #endregion
420
421    /// <summary>
422    /// aborts all running jobs, no results are sent back
423    /// </summary>
424    private void DoAbortAll() {
425      SlaveClientCom.Instance.LogMessage("Aborting all jobs.");
426      foreach (Guid taskId in taskManager.TaskIds) {
427        AbortTaskAsync(taskId);
428      }
429    }
430
431    /// <summary>
432    /// wait for jobs to finish, then pause client
433    /// </summary>
434    private void DoPauseAll() {
435      SlaveClientCom.Instance.LogMessage("Pausing all jobs.");
436      foreach (Guid taskId in taskManager.TaskIds) {
437        PauseTaskAsync(taskId);
438      }
439    }
440
441    /// <summary>
442    /// pause slave immediately
443    /// </summary>
444    private void DoStopAll() {
445      SlaveClientCom.Instance.LogMessage("Stopping all jobs.");
446      foreach (Guid taskId in taskManager.TaskIds) {
447        StopTaskAsync(taskId);
448      }
449    }
450
451    #region Slave Lifecycle Methods
452    /// <summary>
453    /// completly shudown slave
454    /// </summary>
455    public void Shutdown() {
456      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
457      MessageQueue.GetInstance().AddMessage(mc);
458      waitShutdownSem.WaitOne();
459    }
460
461    /// <summary>
462    /// complete shutdown, should be called before the the application is exited
463    /// </summary>
464    private void ShutdownCore() {
465      SlaveClientCom.Instance.LogMessage("Shutdown signal received");
466      SlaveClientCom.Instance.LogMessage("Stopping heartbeat");
467      heartbeatManager.StopHeartBeat();
468      abortRequested = true;
469
470      DoAbortAll();
471
472      SlaveClientCom.Instance.LogMessage("Logging out");
473      WcfService.Instance.Disconnect();
474      SlaveClientCom.Instance.ClientCom.Shutdown();
475      SlaveClientCom.Close();
476
477      if (startClientComService) {
478        if (slaveComm.State != CommunicationState.Closed)
479          slaveComm.Close();
480      }
481    }
482
483    /// <summary>
484    /// reinitializes everything and continues operation,
485    /// can be called after Sleep()
486    /// </summary> 
487    private void DoStartSlave() {
488      SlaveClientCom.Instance.LogMessage("Restart received");
489      configManager.Asleep = false;
490    }
491
492    /// <summary>
493    /// stop slave, except for client gui communication,
494    /// primarily used by gui if core is running as windows service
495    /// </summary>   
496    private void Sleep() {
497      SlaveClientCom.Instance.LogMessage("Sleep received - not accepting any new jobs");
498      configManager.Asleep = true;
499      DoPauseAll();
500    }
501    #endregion
502  }
503}
Note: See TracBrowser for help on using the repository browser.