Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 6902

Last change on this file since 6902 was 6893, checked in by ascheibe, 13 years ago

#1233 server can now control the slave heartbeat interval

File size: 18.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.ServiceModel;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive.SlaveCore.Properties;
28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using TS = System.Threading.Tasks;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    private static HeartbeatManager heartbeatManager;
41    public static HeartbeatManager HeartbeatManager {
42      get { return heartbeatManager; }
43    }
44
45    public EventLog ServiceEventLog { get; set; }
46
47    private Semaphore waitShutdownSem = new Semaphore(0, 1);
48    private bool abortRequested;
49    private ISlaveCommunication clientCom;
50    private ServiceHost slaveComm;
51    private WcfService wcfService;
52    private TaskManager taskManager;
53    private ConfigManager configManager;
54    private PluginManager pluginManager;
55
56    public Core() {
57      var log = new ThreadSafeLog(Settings.Default.MaxLogCount);
58      this.pluginManager = new PluginManager(WcfService.Instance, log);
59      this.taskManager = new TaskManager(pluginManager, log);
60      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
61
62      RegisterTaskManagerEvents();
63
64      this.configManager = new ConfigManager(taskManager);
65      ConfigManager.Instance = this.configManager;
66    }
67
68    /// <summary>
69    /// Main method for the client
70    /// </summary>
71    public void Start() {
72      abortRequested = false;
73
74      try {
75        //start the client communication service (pipe between slave and slave gui)
76        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
77        slaveComm.Open();
78        clientCom = SlaveClientCom.Instance.ClientCom;
79
80        // delete all left over task directories
81        pluginManager.CleanPluginTemp();
82        clientCom.LogMessage("Hive Slave started");
83
84        wcfService = WcfService.Instance;
85        RegisterServiceEvents();
86
87        StartHeartbeats(); // Start heartbeats thread       
88        DispatchMessageQueue(); // dispatch messages until abortRequested
89      }
90      catch (Exception ex) {
91        if (ServiceEventLog != null) {
92          try {
93            ServiceEventLog.WriteEntry(string.Format("Hive Slave threw exception: {0} with stack trace: {1}", ex.ToString(), ex.StackTrace), EventLogEntryType.Error);
94          }
95          catch (Exception) { }
96        } else {
97          //try to log with clientCom. if this works the user sees at least a message,
98          //else an exception will be thrown anyways.
99          clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
100        }
101        ShutdownCore();
102      }
103      finally {
104        DeregisterServiceEvents();
105        waitShutdownSem.Release();
106      }
107    }
108
109    private void StartHeartbeats() {
110      //Initialize the heartbeat     
111      if (heartbeatManager == null) {
112        heartbeatManager = new HeartbeatManager();
113        heartbeatManager.StartHeartbeat();
114      }
115    }
116
117    private void DispatchMessageQueue() {
118      MessageQueue queue = MessageQueue.GetInstance();
119      while (!abortRequested) {
120        MessageContainer container = queue.GetMessage();
121        DetermineAction(container);
122        if (!abortRequested) {
123          clientCom.StatusChanged(configManager.GetStatusForClientConsole());
124        }
125      }
126    }
127
128    private void RegisterServiceEvents() {
129      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
130      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
131    }
132
133    private void DeregisterServiceEvents() {
134      WcfService.Instance.Connected -= WcfService_Connected;
135      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
136    }
137
138    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
139      clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
140    }
141
142    private void WcfService_Connected(object sender, EventArgs e) {
143      clientCom.LogMessage("Connected successfully to Hive server");
144    }
145
146    /// <summary>
147    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
148    /// </summary>
149    /// <param name="container">The container, containing the message</param>
150    private void DetermineAction(MessageContainer container) {
151      clientCom.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
152
153      if (container is ExecutorMessageContainer<Guid>) {
154        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
155        c.execute();
156      } else if (container is MessageContainer) {
157        switch (container.Message) {
158          case MessageContainer.MessageType.CalculateTask:
159            CalculateTaskAsync(container.TaskId);
160            break;
161          case MessageContainer.MessageType.AbortTask:
162            AbortTaskAsync(container.TaskId);
163            break;
164          case MessageContainer.MessageType.StopTask:
165            StopTaskAsync(container.TaskId);
166            break;
167          case MessageContainer.MessageType.PauseTask:
168            PauseTaskAsync(container.TaskId);
169            break;
170          case MessageContainer.MessageType.StopAll:
171            DoStopAll();
172            break;
173          case MessageContainer.MessageType.PauseAll:
174            DoPauseAll();
175            break;
176          case MessageContainer.MessageType.AbortAll:
177            DoAbortAll();
178            break;
179          case MessageContainer.MessageType.ShutdownSlave:
180            ShutdownCore();
181            break;
182          case MessageContainer.MessageType.Restart:
183            DoStartSlave();
184            break;
185          case MessageContainer.MessageType.Sleep:
186            Sleep();
187            break;
188          case MessageContainer.MessageType.SayHello:
189            wcfService.Connect(configManager.GetClientInfo());
190            break;
191          case MessageContainer.MessageType.NewHBInterval:
192            int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
193            if (interval != -1) {
194              HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
195            }
196            break;
197        }
198      } else {
199        clientCom.LogMessage("Unknown MessageContainer: " + container);
200      }
201    }
202
203    private void CalculateTaskAsync(Guid jobId) {
204      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
205      .ContinueWith((t) => {
206        SlaveStatusInfo.IncrementExceptionOccured();
207        clientCom.LogMessage(t.Exception.ToString());
208      }, TaskContinuationOptions.OnlyOnFaulted);
209    }
210
211    private void StopTaskAsync(Guid jobId) {
212      TS.Task.Factory.StartNew(HandleStopTask, jobId)
213       .ContinueWith((t) => {
214         SlaveStatusInfo.IncrementExceptionOccured();
215         clientCom.LogMessage(t.Exception.ToString());
216       }, TaskContinuationOptions.OnlyOnFaulted);
217    }
218
219    private void PauseTaskAsync(Guid jobId) {
220      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
221       .ContinueWith((t) => {
222         SlaveStatusInfo.IncrementExceptionOccured();
223         clientCom.LogMessage(t.Exception.ToString());
224       }, TaskContinuationOptions.OnlyOnFaulted);
225    }
226
227    private void AbortTaskAsync(Guid jobId) {
228      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
229       .ContinueWith((t) => {
230         SlaveStatusInfo.IncrementExceptionOccured();
231         clientCom.LogMessage(t.Exception.ToString());
232       }, TaskContinuationOptions.OnlyOnFaulted);
233    }
234
235    private void HandleCalculateTask(object taskIdObj) {
236      Guid taskId = (Guid)taskIdObj;
237      Task task = null;
238      int usedCores = 0;
239      try {
240        task = wcfService.GetTask(taskId);
241        if (task == null) throw new TaskNotFoundException(taskId);
242        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
243        if (ConfigManager.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
244        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
245        TaskData taskData = wcfService.GetTaskData(taskId);
246        if (taskData == null) throw new TaskDataNotFoundException(taskId);
247        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
248        if (task == null) throw new TaskNotFoundException(taskId);
249        taskManager.StartTaskAsync(task, taskData);
250      }
251      catch (TaskNotFoundException) {
252        SlaveStatusInfo.DecrementUsedCores(usedCores);
253        throw;
254      }
255      catch (TaskDataNotFoundException) {
256        SlaveStatusInfo.DecrementUsedCores(usedCores);
257        throw;
258      }
259      catch (TaskAlreadyRunningException) {
260        SlaveStatusInfo.DecrementUsedCores(usedCores);
261        throw;
262      }
263      catch (OutOfCoresException) {
264        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
265        throw;
266      }
267      catch (OutOfMemoryException) {
268        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
269        throw;
270      }
271      catch (Exception e) {
272        SlaveStatusInfo.DecrementUsedCores(usedCores);
273        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
274        throw;
275      }
276    }
277
278    private void HandleStopTask(object taskIdObj) {
279      Guid taskId = (Guid)taskIdObj;
280      try {
281        Task task = wcfService.GetTask(taskId);
282        if (task == null) throw new TaskNotFoundException(taskId);
283        taskManager.StopTaskAsync(taskId);
284      }
285      catch (TaskNotFoundException) {
286        throw;
287      }
288      catch (TaskNotRunningException) {
289        throw;
290      }
291      catch (AppDomainNotCreatedException) {
292        throw;
293      }
294    }
295
296    private void HandlePauseTask(object taskIdObj) {
297      Guid taskId = (Guid)taskIdObj;
298      try {
299        Task task = wcfService.GetTask(taskId);
300        if (task == null) throw new TaskNotFoundException(taskId);
301        taskManager.PauseTaskAsync(taskId);
302      }
303      catch (TaskNotFoundException) {
304        throw;
305      }
306      catch (TaskNotRunningException) {
307        throw;
308      }
309      catch (AppDomainNotCreatedException) {
310        throw;
311      }
312    }
313
314    private void HandleAbortTask(object taskIdObj) {
315      Guid taskId = (Guid)taskIdObj;
316      try {
317        taskManager.AbortTask(taskId);
318      }
319      catch (TaskNotFoundException) {
320        throw;
321      }
322    }
323
324    #region TaskManager Events
325    private void RegisterTaskManagerEvents() {
326      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
327      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
328      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
329      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
330      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
331      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
332    }
333
334    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
335      // successfully started, everything is good
336    }
337
338    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
339      try {
340        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
341        heartbeatManager.AwakeHeartBeatThread();
342        Task task = wcfService.GetTask(e.Value.TaskId);
343        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
344        task.ExecutionTime = e.Value.ExecutionTime;
345        TaskData taskData = e.Value.GetTaskData();
346        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
347      }
348      catch (TaskNotFoundException ex) {
349        clientCom.LogMessage(ex.ToString());
350      }
351      catch (Exception ex) {
352        clientCom.LogMessage(ex.ToString());
353      }
354    }
355
356    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
357      try {
358        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
359        heartbeatManager.AwakeHeartBeatThread();
360        Task task = wcfService.GetTask(e.Value.TaskId);
361        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
362        task.ExecutionTime = e.Value.ExecutionTime;
363        TaskData taskData = e.Value.GetTaskData();
364        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
365      }
366      catch (TaskNotFoundException ex) {
367        clientCom.LogMessage(ex.ToString());
368      }
369      catch (Exception ex) {
370        clientCom.LogMessage(ex.ToString());
371      }
372    }
373
374    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
375      try {
376        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
377        heartbeatManager.AwakeHeartBeatThread();
378        SlaveTask slaveTask = e.Value.Item1;
379        TaskData taskData = e.Value.Item2;
380        Exception exception = e.Value.Item3;
381
382        Task task = wcfService.GetTask(slaveTask.TaskId);
383        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
384        task.ExecutionTime = slaveTask.ExecutionTime;
385        if (taskData != null) {
386          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
387        } else {
388          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
389        }
390        clientCom.LogMessage(exception.Message);
391      }
392      catch (TaskNotFoundException ex) {
393        SlaveStatusInfo.IncrementExceptionOccured();
394        clientCom.LogMessage(ex.ToString());
395      }
396      catch (Exception ex) {
397        SlaveStatusInfo.IncrementExceptionOccured();
398        clientCom.LogMessage(ex.ToString());
399      }
400    }
401
402    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
403      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
404      SlaveStatusInfo.IncrementExceptionOccured();
405      heartbeatManager.AwakeHeartBeatThread();
406      clientCom.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
407      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
408    }
409
410    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
411      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
412    }
413    #endregion
414
415    #region Log Events
416    private void log_MessageAdded(object sender, EventArgs<string> e) {
417      clientCom.LogMessage(e.Value.Split('\t')[1]);
418    }
419    #endregion
420
421    /// <summary>
422    /// aborts all running jobs, no results are sent back
423    /// </summary>
424    private void DoAbortAll() {
425      clientCom.LogMessage("Aborting all jobs.");
426      foreach (Guid taskId in taskManager.TaskIds) {
427        AbortTaskAsync(taskId);
428      }
429    }
430
431    /// <summary>
432    /// wait for jobs to finish, then pause client
433    /// </summary>
434    private void DoPauseAll() {
435      clientCom.LogMessage("Pausing all jobs.");
436      foreach (Guid taskId in taskManager.TaskIds) {
437        PauseTaskAsync(taskId);
438      }
439    }
440
441    /// <summary>
442    /// pause slave immediately
443    /// </summary>
444    private void DoStopAll() {
445      clientCom.LogMessage("Stopping all jobs.");
446      foreach (Guid taskId in taskManager.TaskIds) {
447        StopTaskAsync(taskId);
448      }
449    }
450
451    #region Slave Lifecycle Methods
452    /// <summary>
453    /// completly shudown slave
454    /// </summary>
455    public void Shutdown() {
456      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
457      MessageQueue.GetInstance().AddMessage(mc);
458      waitShutdownSem.WaitOne();
459    }
460
461    /// <summary>
462    /// complete shutdown, should be called before the the application is exited
463    /// </summary>
464    private void ShutdownCore() {
465      clientCom.LogMessage("Shutdown signal received");
466      clientCom.LogMessage("Stopping heartbeat");
467      heartbeatManager.StopHeartBeat();
468      abortRequested = true;
469
470      DoAbortAll();
471
472      clientCom.LogMessage("Logging out");
473      WcfService.Instance.Disconnect();
474      clientCom.Shutdown();
475      SlaveClientCom.Close();
476
477      if (slaveComm.State != CommunicationState.Closed)
478        slaveComm.Close();
479    }
480
481    /// <summary>
482    /// reinitializes everything and continues operation,
483    /// can be called after Sleep()
484    /// </summary> 
485    private void DoStartSlave() {
486      clientCom.LogMessage("Restart received");
487      configManager.Asleep = false;
488    }
489
490    /// <summary>
491    /// stop slave, except for client gui communication,
492    /// primarily used by gui if core is running as windows service
493    /// </summary>   
494    private void Sleep() {
495      clientCom.LogMessage("Sleep received - not accepting any new jobs");
496      configManager.Asleep = true;
497      DoPauseAll();
498    }
499    #endregion
500  }
501}
Note: See TracBrowser for help on using the repository browser.