Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive.Azure/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 6989

Last change on this file since 6989 was 6989, checked in by spimming, 12 years ago

#1680:

  • start client communication service optionally
  • make slave client communication interchangeable ('PipeCom', 'TraceCom')
File size: 18.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.ServiceModel;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive.SlaveCore.Properties;
28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using TS = System.Threading.Tasks;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    private static HeartbeatManager heartbeatManager;
41    public static HeartbeatManager HeartbeatManager {
42      get { return heartbeatManager; }
43    }
44
45    public EventLog ServiceEventLog { get; set; }
46
47    private Semaphore waitShutdownSem = new Semaphore(0, 1);
48    private bool abortRequested;
49    private ISlaveCommunication clientCom;
50    private ServiceHost slaveComm;
51    private WcfService wcfService;
52    private TaskManager taskManager;
53    private ConfigManager configManager;
54    private PluginManager pluginManager;
55    private bool startClientComService;
56
57    public Core()
58      : this(true) {
59    }
60
61    public Core(bool startClientComService) {
62      var log = new ThreadSafeLog(Settings.Default.MaxLogCount);
63      this.pluginManager = new PluginManager(WcfService.Instance, log);
64      this.taskManager = new TaskManager(pluginManager, log);
65      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
66
67      RegisterTaskManagerEvents();
68
69      this.configManager = new ConfigManager(taskManager);
70      ConfigManager.Instance = this.configManager;
71
72      this.startClientComService = startClientComService;
73    }
74
75    /// <summary>
76    /// Main method for the client
77    /// </summary>
78    public void Start() {
79      abortRequested = false;
80      EventLogManager.ServiceEventLog = ServiceEventLog;
81
82      try {
83        if (startClientComService) {
84          //start the client communication service (pipe between slave and slave gui)
85          slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
86          slaveComm.Open();
87        }
88        clientCom = SlaveClientCom.Instance.ClientCom;
89
90        // delete all left over task directories
91        pluginManager.CleanPluginTemp();
92        clientCom.LogMessage("Hive Slave started");
93
94        wcfService = WcfService.Instance;
95        RegisterServiceEvents();
96
97        StartHeartbeats(); // Start heartbeats thread       
98        DispatchMessageQueue(); // dispatch messages until abortRequested
99      }
100      catch (Exception ex) {
101        if (ServiceEventLog != null) {
102          EventLogManager.LogException(ex);
103        } else {
104          //try to log with clientCom. if this works the user sees at least a message,
105          //else an exception will be thrown anyways.
106          clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
107        }
108        ShutdownCore();
109      }
110      finally {
111        DeregisterServiceEvents();
112        waitShutdownSem.Release();
113      }
114    }
115
116    private void StartHeartbeats() {
117      //Initialize the heartbeat     
118      if (heartbeatManager == null) {
119        heartbeatManager = new HeartbeatManager();
120        heartbeatManager.StartHeartbeat();
121      }
122    }
123
124    private void DispatchMessageQueue() {
125      MessageQueue queue = MessageQueue.GetInstance();
126      while (!abortRequested) {
127        MessageContainer container = queue.GetMessage();
128        DetermineAction(container);
129        if (!abortRequested) {
130          clientCom.StatusChanged(configManager.GetStatusForClientConsole());
131        }
132      }
133    }
134
135    private void RegisterServiceEvents() {
136      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
137      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
138    }
139
140    private void DeregisterServiceEvents() {
141      WcfService.Instance.Connected -= WcfService_Connected;
142      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
143    }
144
145    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
146      clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
147    }
148
149    private void WcfService_Connected(object sender, EventArgs e) {
150      clientCom.LogMessage("Connected successfully to Hive server");
151    }
152
153    /// <summary>
154    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
155    /// </summary>
156    /// <param name="container">The container, containing the message</param>
157    private void DetermineAction(MessageContainer container) {
158      clientCom.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
159
160      if (container is ExecutorMessageContainer<Guid>) {
161        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
162        c.execute();
163      } else if (container is MessageContainer) {
164        switch (container.Message) {
165          case MessageContainer.MessageType.CalculateTask:
166            CalculateTaskAsync(container.TaskId);
167            break;
168          case MessageContainer.MessageType.AbortTask:
169            AbortTaskAsync(container.TaskId);
170            break;
171          case MessageContainer.MessageType.StopTask:
172            StopTaskAsync(container.TaskId);
173            break;
174          case MessageContainer.MessageType.PauseTask:
175            PauseTaskAsync(container.TaskId);
176            break;
177          case MessageContainer.MessageType.StopAll:
178            DoStopAll();
179            break;
180          case MessageContainer.MessageType.PauseAll:
181            DoPauseAll();
182            break;
183          case MessageContainer.MessageType.AbortAll:
184            DoAbortAll();
185            break;
186          case MessageContainer.MessageType.ShutdownSlave:
187            ShutdownCore();
188            break;
189          case MessageContainer.MessageType.Restart:
190            DoStartSlave();
191            break;
192          case MessageContainer.MessageType.Sleep:
193            Sleep();
194            break;
195          case MessageContainer.MessageType.SayHello:
196            wcfService.Connect(configManager.GetClientInfo());
197            break;
198          case MessageContainer.MessageType.NewHBInterval:
199            int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
200            if (interval != -1) {
201              HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
202            }
203            break;
204        }
205      } else {
206        clientCom.LogMessage("Unknown MessageContainer: " + container);
207      }
208    }
209
210    private void CalculateTaskAsync(Guid jobId) {
211      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
212      .ContinueWith((t) => {
213        SlaveStatusInfo.IncrementExceptionOccured();
214        clientCom.LogMessage(t.Exception.ToString());
215      }, TaskContinuationOptions.OnlyOnFaulted);
216    }
217
218    private void StopTaskAsync(Guid jobId) {
219      TS.Task.Factory.StartNew(HandleStopTask, jobId)
220       .ContinueWith((t) => {
221         SlaveStatusInfo.IncrementExceptionOccured();
222         clientCom.LogMessage(t.Exception.ToString());
223       }, TaskContinuationOptions.OnlyOnFaulted);
224    }
225
226    private void PauseTaskAsync(Guid jobId) {
227      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
228       .ContinueWith((t) => {
229         SlaveStatusInfo.IncrementExceptionOccured();
230         clientCom.LogMessage(t.Exception.ToString());
231       }, TaskContinuationOptions.OnlyOnFaulted);
232    }
233
234    private void AbortTaskAsync(Guid jobId) {
235      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
236       .ContinueWith((t) => {
237         SlaveStatusInfo.IncrementExceptionOccured();
238         clientCom.LogMessage(t.Exception.ToString());
239       }, TaskContinuationOptions.OnlyOnFaulted);
240    }
241
242    private void HandleCalculateTask(object taskIdObj) {
243      Guid taskId = (Guid)taskIdObj;
244      Task task = null;
245      int usedCores = 0;
246      try {
247        task = wcfService.GetTask(taskId);
248        if (task == null) throw new TaskNotFoundException(taskId);
249        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
250        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
251        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
252        TaskData taskData = wcfService.GetTaskData(taskId);
253        if (taskData == null) throw new TaskDataNotFoundException(taskId);
254        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
255        if (task == null) throw new TaskNotFoundException(taskId);
256        taskManager.StartTaskAsync(task, taskData);
257      }
258      catch (TaskNotFoundException) {
259        SlaveStatusInfo.DecrementUsedCores(usedCores);
260        throw;
261      }
262      catch (TaskDataNotFoundException) {
263        SlaveStatusInfo.DecrementUsedCores(usedCores);
264        throw;
265      }
266      catch (TaskAlreadyRunningException) {
267        SlaveStatusInfo.DecrementUsedCores(usedCores);
268        throw;
269      }
270      catch (OutOfCoresException) {
271        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
272        throw;
273      }
274      catch (OutOfMemoryException) {
275        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
276        throw;
277      }
278      catch (Exception e) {
279        SlaveStatusInfo.DecrementUsedCores(usedCores);
280        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
281        throw;
282      }
283    }
284
285    private void HandleStopTask(object taskIdObj) {
286      Guid taskId = (Guid)taskIdObj;
287      try {
288        Task task = wcfService.GetTask(taskId);
289        if (task == null) throw new TaskNotFoundException(taskId);
290        taskManager.StopTaskAsync(taskId);
291      }
292      catch (TaskNotFoundException) {
293        throw;
294      }
295      catch (TaskNotRunningException) {
296        throw;
297      }
298      catch (AppDomainNotCreatedException) {
299        throw;
300      }
301    }
302
303    private void HandlePauseTask(object taskIdObj) {
304      Guid taskId = (Guid)taskIdObj;
305      try {
306        Task task = wcfService.GetTask(taskId);
307        if (task == null) throw new TaskNotFoundException(taskId);
308        taskManager.PauseTaskAsync(taskId);
309      }
310      catch (TaskNotFoundException) {
311        throw;
312      }
313      catch (TaskNotRunningException) {
314        throw;
315      }
316      catch (AppDomainNotCreatedException) {
317        throw;
318      }
319    }
320
321    private void HandleAbortTask(object taskIdObj) {
322      Guid taskId = (Guid)taskIdObj;
323      try {
324        taskManager.AbortTask(taskId);
325      }
326      catch (TaskNotFoundException) {
327        throw;
328      }
329    }
330
331    #region TaskManager Events
332    private void RegisterTaskManagerEvents() {
333      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
334      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
335      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
336      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
337      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
338      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
339    }
340
341    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
342      // successfully started, everything is good
343    }
344
345    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
346      try {
347        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
348        heartbeatManager.AwakeHeartBeatThread();
349        Task task = wcfService.GetTask(e.Value.TaskId);
350        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
351        task.ExecutionTime = e.Value.ExecutionTime;
352        TaskData taskData = e.Value.GetTaskData();
353        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
354      }
355      catch (TaskNotFoundException ex) {
356        clientCom.LogMessage(ex.ToString());
357      }
358      catch (Exception ex) {
359        clientCom.LogMessage(ex.ToString());
360      }
361    }
362
363    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
364      try {
365        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
366        heartbeatManager.AwakeHeartBeatThread();
367        Task task = wcfService.GetTask(e.Value.TaskId);
368        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
369        task.ExecutionTime = e.Value.ExecutionTime;
370        TaskData taskData = e.Value.GetTaskData();
371        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
372      }
373      catch (TaskNotFoundException ex) {
374        clientCom.LogMessage(ex.ToString());
375      }
376      catch (Exception ex) {
377        clientCom.LogMessage(ex.ToString());
378      }
379    }
380
381    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
382      try {
383        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
384        heartbeatManager.AwakeHeartBeatThread();
385        SlaveTask slaveTask = e.Value.Item1;
386        TaskData taskData = e.Value.Item2;
387        Exception exception = e.Value.Item3;
388
389        Task task = wcfService.GetTask(slaveTask.TaskId);
390        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
391        task.ExecutionTime = slaveTask.ExecutionTime;
392        if (taskData != null) {
393          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
394        } else {
395          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
396        }
397        clientCom.LogMessage(exception.Message);
398      }
399      catch (TaskNotFoundException ex) {
400        SlaveStatusInfo.IncrementExceptionOccured();
401        clientCom.LogMessage(ex.ToString());
402      }
403      catch (Exception ex) {
404        SlaveStatusInfo.IncrementExceptionOccured();
405        clientCom.LogMessage(ex.ToString());
406      }
407    }
408
409    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
410      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
411      SlaveStatusInfo.IncrementExceptionOccured();
412      heartbeatManager.AwakeHeartBeatThread();
413      clientCom.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
414      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
415    }
416
417    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
418      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
419    }
420    #endregion
421
422    #region Log Events
423    private void log_MessageAdded(object sender, EventArgs<string> e) {
424      clientCom.LogMessage(e.Value.Split('\t')[1]);
425    }
426    #endregion
427
428    /// <summary>
429    /// aborts all running jobs, no results are sent back
430    /// </summary>
431    private void DoAbortAll() {
432      clientCom.LogMessage("Aborting all jobs.");
433      foreach (Guid taskId in taskManager.TaskIds) {
434        AbortTaskAsync(taskId);
435      }
436    }
437
438    /// <summary>
439    /// wait for jobs to finish, then pause client
440    /// </summary>
441    private void DoPauseAll() {
442      clientCom.LogMessage("Pausing all jobs.");
443      foreach (Guid taskId in taskManager.TaskIds) {
444        PauseTaskAsync(taskId);
445      }
446    }
447
448    /// <summary>
449    /// pause slave immediately
450    /// </summary>
451    private void DoStopAll() {
452      clientCom.LogMessage("Stopping all jobs.");
453      foreach (Guid taskId in taskManager.TaskIds) {
454        StopTaskAsync(taskId);
455      }
456    }
457
458    #region Slave Lifecycle Methods
459    /// <summary>
460    /// completly shudown slave
461    /// </summary>
462    public void Shutdown() {
463      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
464      MessageQueue.GetInstance().AddMessage(mc);
465      waitShutdownSem.WaitOne();
466    }
467
468    /// <summary>
469    /// complete shutdown, should be called before the the application is exited
470    /// </summary>
471    private void ShutdownCore() {
472      clientCom.LogMessage("Shutdown signal received");
473      clientCom.LogMessage("Stopping heartbeat");
474      heartbeatManager.StopHeartBeat();
475      abortRequested = true;
476
477      DoAbortAll();
478
479      clientCom.LogMessage("Logging out");
480      WcfService.Instance.Disconnect();
481      clientCom.Shutdown();
482      SlaveClientCom.Close();
483
484      if (startClientComService) {
485        if (slaveComm.State != CommunicationState.Closed)
486          slaveComm.Close();
487      }
488    }
489
490    /// <summary>
491    /// reinitializes everything and continues operation,
492    /// can be called after Sleep()
493    /// </summary> 
494    private void DoStartSlave() {
495      clientCom.LogMessage("Restart received");
496      configManager.Asleep = false;
497    }
498
499    /// <summary>
500    /// stop slave, except for client gui communication,
501    /// primarily used by gui if core is running as windows service
502    /// </summary>   
503    private void Sleep() {
504      clientCom.LogMessage("Sleep received - not accepting any new jobs");
505      configManager.Asleep = true;
506      DoPauseAll();
507    }
508    #endregion
509  }
510}
Note: See TracBrowser for help on using the repository browser.