source: trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 6983

Last change on this file since 6983 was 6983, checked in by ascheibe, 11 years ago

#1672

  • added the Hive Services and Slave projects
  • added missing svn ignores
File size: 18.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.ServiceModel;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive.SlaveCore.Properties;
28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using TS = System.Threading.Tasks;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    private static HeartbeatManager heartbeatManager;
41    public static HeartbeatManager HeartbeatManager {
42      get { return heartbeatManager; }
43    }
44
45    public EventLog ServiceEventLog { get; set; }
46
47    private Semaphore waitShutdownSem = new Semaphore(0, 1);
48    private bool abortRequested;
49    private ISlaveCommunication clientCom;
50    private ServiceHost slaveComm;
51    private WcfService wcfService;
52    private TaskManager taskManager;
53    private ConfigManager configManager;
54    private PluginManager pluginManager;
55
56    public Core() {
57      var log = new ThreadSafeLog(Settings.Default.MaxLogCount);
58      this.pluginManager = new PluginManager(WcfService.Instance, log);
59      this.taskManager = new TaskManager(pluginManager, log);
60      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
61
62      RegisterTaskManagerEvents();
63
64      this.configManager = new ConfigManager(taskManager);
65      ConfigManager.Instance = this.configManager;
66    }
67
68    /// <summary>
69    /// Main method for the client
70    /// </summary>
71    public void Start() {
72      abortRequested = false;
73      EventLogManager.ServiceEventLog = ServiceEventLog;
74
75      try {
76        //start the client communication service (pipe between slave and slave gui)
77        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
78        slaveComm.Open();
79        clientCom = SlaveClientCom.Instance.ClientCom;
80
81        // delete all left over task directories
82        pluginManager.CleanPluginTemp();
83        clientCom.LogMessage("Hive Slave started");
84
85        wcfService = WcfService.Instance;
86        RegisterServiceEvents();
87
88        StartHeartbeats(); // Start heartbeats thread       
89        DispatchMessageQueue(); // dispatch messages until abortRequested
90      }
91      catch (Exception ex) {
92        if (ServiceEventLog != null) {
93          EventLogManager.LogException(ex);
94        } else {
95          //try to log with clientCom. if this works the user sees at least a message,
96          //else an exception will be thrown anyways.
97          clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
98        }
99        ShutdownCore();
100      }
101      finally {
102        DeregisterServiceEvents();
103        waitShutdownSem.Release();
104      }
105    }
106
107    private void StartHeartbeats() {
108      //Initialize the heartbeat     
109      if (heartbeatManager == null) {
110        heartbeatManager = new HeartbeatManager();
111        heartbeatManager.StartHeartbeat();
112      }
113    }
114
115    private void DispatchMessageQueue() {
116      MessageQueue queue = MessageQueue.GetInstance();
117      while (!abortRequested) {
118        MessageContainer container = queue.GetMessage();
119        DetermineAction(container);
120        if (!abortRequested) {
121          clientCom.StatusChanged(configManager.GetStatusForClientConsole());
122        }
123      }
124    }
125
126    private void RegisterServiceEvents() {
127      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
128      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
129    }
130
131    private void DeregisterServiceEvents() {
132      WcfService.Instance.Connected -= WcfService_Connected;
133      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
134    }
135
136    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
137      clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
138    }
139
140    private void WcfService_Connected(object sender, EventArgs e) {
141      clientCom.LogMessage("Connected successfully to Hive server");
142    }
143
144    /// <summary>
145    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
146    /// </summary>
147    /// <param name="container">The container, containing the message</param>
148    private void DetermineAction(MessageContainer container) {
149      clientCom.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
150
151      if (container is ExecutorMessageContainer<Guid>) {
152        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
153        c.execute();
154      } else if (container is MessageContainer) {
155        switch (container.Message) {
156          case MessageContainer.MessageType.CalculateTask:
157            CalculateTaskAsync(container.TaskId);
158            break;
159          case MessageContainer.MessageType.AbortTask:
160            AbortTaskAsync(container.TaskId);
161            break;
162          case MessageContainer.MessageType.StopTask:
163            StopTaskAsync(container.TaskId);
164            break;
165          case MessageContainer.MessageType.PauseTask:
166            PauseTaskAsync(container.TaskId);
167            break;
168          case MessageContainer.MessageType.StopAll:
169            DoStopAll();
170            break;
171          case MessageContainer.MessageType.PauseAll:
172            DoPauseAll();
173            break;
174          case MessageContainer.MessageType.AbortAll:
175            DoAbortAll();
176            break;
177          case MessageContainer.MessageType.ShutdownSlave:
178            ShutdownCore();
179            break;
180          case MessageContainer.MessageType.Restart:
181            DoStartSlave();
182            break;
183          case MessageContainer.MessageType.Sleep:
184            Sleep();
185            break;
186          case MessageContainer.MessageType.SayHello:
187            wcfService.Connect(configManager.GetClientInfo());
188            break;
189          case MessageContainer.MessageType.NewHBInterval:
190            int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
191            if (interval != -1) {
192              HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
193            }
194            break;
195        }
196      } else {
197        clientCom.LogMessage("Unknown MessageContainer: " + container);
198      }
199    }
200
201    private void CalculateTaskAsync(Guid jobId) {
202      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
203      .ContinueWith((t) => {
204        SlaveStatusInfo.IncrementExceptionOccured();
205        clientCom.LogMessage(t.Exception.ToString());
206      }, TaskContinuationOptions.OnlyOnFaulted);
207    }
208
209    private void StopTaskAsync(Guid jobId) {
210      TS.Task.Factory.StartNew(HandleStopTask, jobId)
211       .ContinueWith((t) => {
212         SlaveStatusInfo.IncrementExceptionOccured();
213         clientCom.LogMessage(t.Exception.ToString());
214       }, TaskContinuationOptions.OnlyOnFaulted);
215    }
216
217    private void PauseTaskAsync(Guid jobId) {
218      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
219       .ContinueWith((t) => {
220         SlaveStatusInfo.IncrementExceptionOccured();
221         clientCom.LogMessage(t.Exception.ToString());
222       }, TaskContinuationOptions.OnlyOnFaulted);
223    }
224
225    private void AbortTaskAsync(Guid jobId) {
226      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
227       .ContinueWith((t) => {
228         SlaveStatusInfo.IncrementExceptionOccured();
229         clientCom.LogMessage(t.Exception.ToString());
230       }, TaskContinuationOptions.OnlyOnFaulted);
231    }
232
233    private void HandleCalculateTask(object taskIdObj) {
234      Guid taskId = (Guid)taskIdObj;
235      Task task = null;
236      int usedCores = 0;
237      try {
238        task = wcfService.GetTask(taskId);
239        if (task == null) throw new TaskNotFoundException(taskId);
240        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
241        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
242        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
243        TaskData taskData = wcfService.GetTaskData(taskId);
244        if (taskData == null) throw new TaskDataNotFoundException(taskId);
245        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
246        if (task == null) throw new TaskNotFoundException(taskId);
247        taskManager.StartTaskAsync(task, taskData);
248      }
249      catch (TaskNotFoundException) {
250        SlaveStatusInfo.DecrementUsedCores(usedCores);
251        throw;
252      }
253      catch (TaskDataNotFoundException) {
254        SlaveStatusInfo.DecrementUsedCores(usedCores);
255        throw;
256      }
257      catch (TaskAlreadyRunningException) {
258        SlaveStatusInfo.DecrementUsedCores(usedCores);
259        throw;
260      }
261      catch (OutOfCoresException) {
262        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
263        throw;
264      }
265      catch (OutOfMemoryException) {
266        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
267        throw;
268      }
269      catch (Exception e) {
270        SlaveStatusInfo.DecrementUsedCores(usedCores);
271        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
272        throw;
273      }
274    }
275
276    private void HandleStopTask(object taskIdObj) {
277      Guid taskId = (Guid)taskIdObj;
278      try {
279        Task task = wcfService.GetTask(taskId);
280        if (task == null) throw new TaskNotFoundException(taskId);
281        taskManager.StopTaskAsync(taskId);
282      }
283      catch (TaskNotFoundException) {
284        throw;
285      }
286      catch (TaskNotRunningException) {
287        throw;
288      }
289      catch (AppDomainNotCreatedException) {
290        throw;
291      }
292    }
293
294    private void HandlePauseTask(object taskIdObj) {
295      Guid taskId = (Guid)taskIdObj;
296      try {
297        Task task = wcfService.GetTask(taskId);
298        if (task == null) throw new TaskNotFoundException(taskId);
299        taskManager.PauseTaskAsync(taskId);
300      }
301      catch (TaskNotFoundException) {
302        throw;
303      }
304      catch (TaskNotRunningException) {
305        throw;
306      }
307      catch (AppDomainNotCreatedException) {
308        throw;
309      }
310    }
311
312    private void HandleAbortTask(object taskIdObj) {
313      Guid taskId = (Guid)taskIdObj;
314      try {
315        taskManager.AbortTask(taskId);
316      }
317      catch (TaskNotFoundException) {
318        throw;
319      }
320    }
321
322    #region TaskManager Events
323    private void RegisterTaskManagerEvents() {
324      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
325      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
326      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
327      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
328      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
329      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
330    }
331
332    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
333      // successfully started, everything is good
334    }
335
336    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
337      try {
338        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
339        heartbeatManager.AwakeHeartBeatThread();
340        Task task = wcfService.GetTask(e.Value.TaskId);
341        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
342        task.ExecutionTime = e.Value.ExecutionTime;
343        TaskData taskData = e.Value.GetTaskData();
344        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
345      }
346      catch (TaskNotFoundException ex) {
347        clientCom.LogMessage(ex.ToString());
348      }
349      catch (Exception ex) {
350        clientCom.LogMessage(ex.ToString());
351      }
352    }
353
354    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
355      try {
356        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
357        heartbeatManager.AwakeHeartBeatThread();
358        Task task = wcfService.GetTask(e.Value.TaskId);
359        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
360        task.ExecutionTime = e.Value.ExecutionTime;
361        TaskData taskData = e.Value.GetTaskData();
362        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
363      }
364      catch (TaskNotFoundException ex) {
365        clientCom.LogMessage(ex.ToString());
366      }
367      catch (Exception ex) {
368        clientCom.LogMessage(ex.ToString());
369      }
370    }
371
372    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
373      try {
374        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
375        heartbeatManager.AwakeHeartBeatThread();
376        SlaveTask slaveTask = e.Value.Item1;
377        TaskData taskData = e.Value.Item2;
378        Exception exception = e.Value.Item3;
379
380        Task task = wcfService.GetTask(slaveTask.TaskId);
381        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
382        task.ExecutionTime = slaveTask.ExecutionTime;
383        if (taskData != null) {
384          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
385        } else {
386          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
387        }
388        clientCom.LogMessage(exception.Message);
389      }
390      catch (TaskNotFoundException ex) {
391        SlaveStatusInfo.IncrementExceptionOccured();
392        clientCom.LogMessage(ex.ToString());
393      }
394      catch (Exception ex) {
395        SlaveStatusInfo.IncrementExceptionOccured();
396        clientCom.LogMessage(ex.ToString());
397      }
398    }
399
400    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
401      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
402      SlaveStatusInfo.IncrementExceptionOccured();
403      heartbeatManager.AwakeHeartBeatThread();
404      clientCom.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
405      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
406    }
407
408    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
409      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
410    }
411    #endregion
412
413    #region Log Events
414    private void log_MessageAdded(object sender, EventArgs<string> e) {
415      clientCom.LogMessage(e.Value.Split('\t')[1]);
416    }
417    #endregion
418
419    /// <summary>
420    /// aborts all running jobs, no results are sent back
421    /// </summary>
422    private void DoAbortAll() {
423      clientCom.LogMessage("Aborting all jobs.");
424      foreach (Guid taskId in taskManager.TaskIds) {
425        AbortTaskAsync(taskId);
426      }
427    }
428
429    /// <summary>
430    /// wait for jobs to finish, then pause client
431    /// </summary>
432    private void DoPauseAll() {
433      clientCom.LogMessage("Pausing all jobs.");
434      foreach (Guid taskId in taskManager.TaskIds) {
435        PauseTaskAsync(taskId);
436      }
437    }
438
439    /// <summary>
440    /// pause slave immediately
441    /// </summary>
442    private void DoStopAll() {
443      clientCom.LogMessage("Stopping all jobs.");
444      foreach (Guid taskId in taskManager.TaskIds) {
445        StopTaskAsync(taskId);
446      }
447    }
448
449    #region Slave Lifecycle Methods
450    /// <summary>
451    /// completly shudown slave
452    /// </summary>
453    public void Shutdown() {
454      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
455      MessageQueue.GetInstance().AddMessage(mc);
456      waitShutdownSem.WaitOne();
457    }
458
459    /// <summary>
460    /// complete shutdown, should be called before the the application is exited
461    /// </summary>
462    private void ShutdownCore() {
463      clientCom.LogMessage("Shutdown signal received");
464      clientCom.LogMessage("Stopping heartbeat");
465      heartbeatManager.StopHeartBeat();
466      abortRequested = true;
467
468      DoAbortAll();
469
470      clientCom.LogMessage("Logging out");
471      WcfService.Instance.Disconnect();
472      clientCom.Shutdown();
473      SlaveClientCom.Close();
474
475      if (slaveComm.State != CommunicationState.Closed)
476        slaveComm.Close();
477    }
478
479    /// <summary>
480    /// reinitializes everything and continues operation,
481    /// can be called after Sleep()
482    /// </summary> 
483    private void DoStartSlave() {
484      clientCom.LogMessage("Restart received");
485      configManager.Asleep = false;
486    }
487
488    /// <summary>
489    /// stop slave, except for client gui communication,
490    /// primarily used by gui if core is running as windows service
491    /// </summary>   
492    private void Sleep() {
493      clientCom.LogMessage("Sleep received - not accepting any new jobs");
494      configManager.Asleep = true;
495      DoPauseAll();
496    }
497    #endregion
498  }
499}
Note: See TracBrowser for help on using the repository browser.