Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 6910

Last change on this file since 6910 was 6910, checked in by ascheibe, 13 years ago

#1233 fixed a bug where the slave didn't report the cpu utilization correctly

File size: 18.4 KB
RevLine 
[5105]1#region License Information
2/* HeuristicLab
[6371]3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
[5105]4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
[5778]23using System.Diagnostics;
[5280]24using System.ServiceModel;
[5105]25using System.Threading;
[5721]26using System.Threading.Tasks;
[6546]27using HeuristicLab.Clients.Hive.SlaveCore.Properties;
[5599]28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
[5105]29using HeuristicLab.Common;
30using HeuristicLab.Core;
[6721]31using TS = System.Threading.Tasks;
[5105]32
33
[5599]34namespace HeuristicLab.Clients.Hive.SlaveCore {
[5105]35  /// <summary>
[5826]36  /// The core component of the Hive Slave.
[6371]37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
[5105]38  /// </summary>
39  public class Core : MarshalByRefObject {
[6258]40    private static HeartbeatManager heartbeatManager;
41    public static HeartbeatManager HeartbeatManager {
42      get { return heartbeatManager; }
43    }
44
[6371]45    public EventLog ServiceEventLog { get; set; }
46
[6357]47    private Semaphore waitShutdownSem = new Semaphore(0, 1);
48    private bool abortRequested;
[6216]49    private ISlaveCommunication clientCom;
50    private ServiceHost slaveComm;
[5105]51    private WcfService wcfService;
[6725]52    private TaskManager taskManager;
[6357]53    private ConfigManager configManager;
54    private PluginManager pluginManager;
[6216]55
[6357]56    public Core() {
[6863]57      var log = new ThreadSafeLog(Settings.Default.MaxLogCount);
[6357]58      this.pluginManager = new PluginManager(WcfService.Instance, log);
[6725]59      this.taskManager = new TaskManager(pluginManager, log);
[6357]60      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
[5137]61
[6725]62      RegisterTaskManagerEvents();
[6357]63
[6725]64      this.configManager = new ConfigManager(taskManager);
[6357]65      ConfigManager.Instance = this.configManager;
66    }
67
[5105]68    /// <summary>
[6371]69    /// Main method for the client
[5105]70    /// </summary>
71    public void Start() {
[6357]72      abortRequested = false;
[5105]73
[5778]74      try {
75        //start the client communication service (pipe between slave and slave gui)
76        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
77        slaveComm.Open();
[6101]78        clientCom = SlaveClientCom.Instance.ClientCom;
[5280]79
[6725]80        // delete all left over task directories
[6357]81        pluginManager.CleanPluginTemp();
[5786]82        clientCom.LogMessage("Hive Slave started");
[5156]83
[5778]84        wcfService = WcfService.Instance;
85        RegisterServiceEvents();
[5105]86
[5826]87        StartHeartbeats(); // Start heartbeats thread       
[5778]88        DispatchMessageQueue(); // dispatch messages until abortRequested
89      }
90      catch (Exception ex) {
91        if (ServiceEventLog != null) {
92          try {
[6263]93            ServiceEventLog.WriteEntry(string.Format("Hive Slave threw exception: {0} with stack trace: {1}", ex.ToString(), ex.StackTrace), EventLogEntryType.Error);
[5778]94          }
95          catch (Exception) { }
96        } else {
[6100]97          //try to log with clientCom. if this works the user sees at least a message,
98          //else an exception will be thrown anyways.
[6216]99          clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
[5778]100        }
[6112]101        ShutdownCore();
[5778]102      }
103      finally {
[6258]104        DeregisterServiceEvents();
[5778]105        waitShutdownSem.Release();
106      }
[5105]107    }
108
109    private void StartHeartbeats() {
[5778]110      //Initialize the heartbeat     
[5780]111      if (heartbeatManager == null) {
[6456]112        heartbeatManager = new HeartbeatManager();
[5778]113        heartbeatManager.StartHeartbeat();
114      }
[5105]115    }
116
117    private void DispatchMessageQueue() {
118      MessageQueue queue = MessageQueue.GetInstance();
[6357]119      while (!abortRequested) {
[5105]120        MessageContainer container = queue.GetMessage();
121        DetermineAction(container);
[6357]122        if (!abortRequested) {
123          clientCom.StatusChanged(configManager.GetStatusForClientConsole());
[6204]124        }
[5105]125      }
126    }
127
128    private void RegisterServiceEvents() {
[5472]129      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
130      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
[5105]131    }
132
[6258]133    private void DeregisterServiceEvents() {
[5472]134      WcfService.Instance.Connected -= WcfService_Connected;
135      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
[5105]136    }
137
[6258]138    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
[6216]139      clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
[5156]140    }
141
[6258]142    private void WcfService_Connected(object sender, EventArgs e) {
[5786]143      clientCom.LogMessage("Connected successfully to Hive server");
[5156]144    }
145
[5105]146    /// <summary>
147    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
148    /// </summary>
[5826]149    /// <param name="container">The container, containing the message</param>
[5105]150    private void DetermineAction(MessageContainer container) {
[6725]151      clientCom.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
[5721]152
[5137]153      if (container is ExecutorMessageContainer<Guid>) {
154        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
155        c.execute();
156      } else if (container is MessageContainer) {
157        switch (container.Message) {
[6725]158          case MessageContainer.MessageType.CalculateTask:
159            CalculateTaskAsync(container.TaskId);
[5137]160            break;
[6725]161          case MessageContainer.MessageType.AbortTask:
162            AbortTaskAsync(container.TaskId);
[5137]163            break;
[6725]164          case MessageContainer.MessageType.StopTask:
165            StopTaskAsync(container.TaskId);
[6357]166            break;
[6725]167          case MessageContainer.MessageType.PauseTask:
168            PauseTaskAsync(container.TaskId);
[6357]169            break;
[5450]170          case MessageContainer.MessageType.StopAll:
171            DoStopAll();
[5314]172            break;
[5450]173          case MessageContainer.MessageType.PauseAll:
174            DoPauseAll();
[5314]175            break;
[5450]176          case MessageContainer.MessageType.AbortAll:
177            DoAbortAll();
178            break;
[6357]179          case MessageContainer.MessageType.ShutdownSlave:
180            ShutdownCore();
[5450]181            break;
[5314]182          case MessageContainer.MessageType.Restart:
[5450]183            DoStartSlave();
[5314]184            break;
[5451]185          case MessageContainer.MessageType.Sleep:
186            Sleep();
187            break;
[5541]188          case MessageContainer.MessageType.SayHello:
[6357]189            wcfService.Connect(configManager.GetClientInfo());
[5541]190            break;
[6893]191          case MessageContainer.MessageType.NewHBInterval:
192            int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
193            if (interval != -1) {
194              HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
195            }
196            break;
[5137]197        }
198      } else {
[5786]199        clientCom.LogMessage("Unknown MessageContainer: " + container);
[5105]200      }
201    }
202
[6725]203    private void CalculateTaskAsync(Guid jobId) {
204      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
[6357]205      .ContinueWith((t) => {
206        SlaveStatusInfo.IncrementExceptionOccured();
207        clientCom.LogMessage(t.Exception.ToString());
208      }, TaskContinuationOptions.OnlyOnFaulted);
209    }
210
[6725]211    private void StopTaskAsync(Guid jobId) {
212      TS.Task.Factory.StartNew(HandleStopTask, jobId)
[6357]213       .ContinueWith((t) => {
214         SlaveStatusInfo.IncrementExceptionOccured();
215         clientCom.LogMessage(t.Exception.ToString());
216       }, TaskContinuationOptions.OnlyOnFaulted);
217    }
218
[6725]219    private void PauseTaskAsync(Guid jobId) {
220      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
[6357]221       .ContinueWith((t) => {
222         SlaveStatusInfo.IncrementExceptionOccured();
223         clientCom.LogMessage(t.Exception.ToString());
224       }, TaskContinuationOptions.OnlyOnFaulted);
225    }
226
[6725]227    private void AbortTaskAsync(Guid jobId) {
228      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
[6357]229       .ContinueWith((t) => {
230         SlaveStatusInfo.IncrementExceptionOccured();
231         clientCom.LogMessage(t.Exception.ToString());
232       }, TaskContinuationOptions.OnlyOnFaulted);
233    }
234
[6725]235    private void HandleCalculateTask(object taskIdObj) {
236      Guid taskId = (Guid)taskIdObj;
237      Task task = null;
[6381]238      int usedCores = 0;
[6357]239      try {
[6743]240        task = wcfService.GetTask(taskId);
[6725]241        if (task == null) throw new TaskNotFoundException(taskId);
242        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
[6910]243        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
[6725]244        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
[6743]245        TaskData taskData = wcfService.GetTaskData(taskId);
[6725]246        if (taskData == null) throw new TaskDataNotFoundException(taskId);
247        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
248        if (task == null) throw new TaskNotFoundException(taskId);
249        taskManager.StartTaskAsync(task, taskData);
[6357]250      }
[6725]251      catch (TaskNotFoundException) {
[6381]252        SlaveStatusInfo.DecrementUsedCores(usedCores);
[6357]253        throw;
254      }
[6725]255      catch (TaskDataNotFoundException) {
[6381]256        SlaveStatusInfo.DecrementUsedCores(usedCores);
[6357]257        throw;
258      }
[6725]259      catch (TaskAlreadyRunningException) {
[6381]260        SlaveStatusInfo.DecrementUsedCores(usedCores);
[6357]261        throw;
262      }
263      catch (OutOfCoresException) {
[6725]264        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
[6357]265        throw;
266      }
267      catch (OutOfMemoryException) {
[6725]268        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
[6357]269        throw;
270      }
[6381]271      catch (Exception e) {
272        SlaveStatusInfo.DecrementUsedCores(usedCores);
[6725]273        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
[6357]274        throw;
275      }
276    }
[6216]277
[6725]278    private void HandleStopTask(object taskIdObj) {
279      Guid taskId = (Guid)taskIdObj;
[6357]280      try {
[6743]281        Task task = wcfService.GetTask(taskId);
[6725]282        if (task == null) throw new TaskNotFoundException(taskId);
283        taskManager.StopTaskAsync(taskId);
[6216]284      }
[6725]285      catch (TaskNotFoundException) {
[6357]286        throw;
287      }
[6725]288      catch (TaskNotRunningException) {
[6357]289        throw;
290      }
291      catch (AppDomainNotCreatedException) {
292        throw;
293      }
294    }
[6216]295
[6725]296    private void HandlePauseTask(object taskIdObj) {
297      Guid taskId = (Guid)taskIdObj;
[6357]298      try {
[6743]299        Task task = wcfService.GetTask(taskId);
[6725]300        if (task == null) throw new TaskNotFoundException(taskId);
301        taskManager.PauseTaskAsync(taskId);
[6216]302      }
[6725]303      catch (TaskNotFoundException) {
[6357]304        throw;
305      }
[6725]306      catch (TaskNotRunningException) {
[6357]307        throw;
308      }
[6464]309      catch (AppDomainNotCreatedException) {
310        throw;
311      }
[6216]312    }
313
[6725]314    private void HandleAbortTask(object taskIdObj) {
315      Guid taskId = (Guid)taskIdObj;
[6357]316      try {
[6725]317        taskManager.AbortTask(taskId);
[6357]318      }
[6725]319      catch (TaskNotFoundException) {
[6357]320        throw;
321      }
322    }
[6216]323
[6725]324    #region TaskManager Events
325    private void RegisterTaskManagerEvents() {
326      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
327      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
328      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
329      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
330      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
331      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
[6357]332    }
333
[6725]334    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
[6357]335      // successfully started, everything is good
336    }
337
[6725]338    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
[6357]339      try {
340        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
341        heartbeatManager.AwakeHeartBeatThread();
[6743]342        Task task = wcfService.GetTask(e.Value.TaskId);
[6725]343        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
344        task.ExecutionTime = e.Value.ExecutionTime;
345        TaskData taskData = e.Value.GetTaskData();
[6743]346        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
[6216]347      }
[6725]348      catch (TaskNotFoundException ex) {
[6357]349        clientCom.LogMessage(ex.ToString());
350      }
351      catch (Exception ex) {
352        clientCom.LogMessage(ex.ToString());
353      }
354    }
[6216]355
[6725]356    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
[6357]357      try {
358        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
359        heartbeatManager.AwakeHeartBeatThread();
[6743]360        Task task = wcfService.GetTask(e.Value.TaskId);
[6725]361        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
362        task.ExecutionTime = e.Value.ExecutionTime;
363        TaskData taskData = e.Value.GetTaskData();
[6743]364        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
[6216]365      }
[6725]366      catch (TaskNotFoundException ex) {
[6357]367        clientCom.LogMessage(ex.ToString());
368      }
369      catch (Exception ex) {
370        clientCom.LogMessage(ex.ToString());
371      }
[6216]372    }
373
[6725]374    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
[6357]375      try {
376        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
377        heartbeatManager.AwakeHeartBeatThread();
[6725]378        SlaveTask slaveTask = e.Value.Item1;
379        TaskData taskData = e.Value.Item2;
[6357]380        Exception exception = e.Value.Item3;
[6216]381
[6743]382        Task task = wcfService.GetTask(slaveTask.TaskId);
[6725]383        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
384        task.ExecutionTime = slaveTask.ExecutionTime;
385        if (taskData != null) {
[6743]386          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
[6216]387        } else {
[6725]388          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
[6216]389        }
[6357]390        clientCom.LogMessage(exception.Message);
[6216]391      }
[6725]392      catch (TaskNotFoundException ex) {
[6357]393        SlaveStatusInfo.IncrementExceptionOccured();
394        clientCom.LogMessage(ex.ToString());
[6216]395      }
[6357]396      catch (Exception ex) {
397        SlaveStatusInfo.IncrementExceptionOccured();
398        clientCom.LogMessage(ex.ToString());
399      }
[6216]400    }
401
[6725]402    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
[6357]403      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
404      SlaveStatusInfo.IncrementExceptionOccured();
405      heartbeatManager.AwakeHeartBeatThread();
[6725]406      clientCom.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
407      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
[6357]408    }
[6216]409
[6725]410    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
[6357]411      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
412    }
413    #endregion
[6216]414
[6357]415    #region Log Events
416    private void log_MessageAdded(object sender, EventArgs<string> e) {
417      clientCom.LogMessage(e.Value.Split('\t')[1]);
418    }
419    #endregion
420
[5450]421    /// <summary>
422    /// aborts all running jobs, no results are sent back
423    /// </summary>
424    private void DoAbortAll() {
[6357]425      clientCom.LogMessage("Aborting all jobs.");
[6725]426      foreach (Guid taskId in taskManager.TaskIds) {
427        AbortTaskAsync(taskId);
[5450]428      }
[5314]429    }
430
431    /// <summary>
[5450]432    /// wait for jobs to finish, then pause client
[5314]433    /// </summary>
[5450]434    private void DoPauseAll() {
[6357]435      clientCom.LogMessage("Pausing all jobs.");
[6725]436      foreach (Guid taskId in taskManager.TaskIds) {
437        PauseTaskAsync(taskId);
[5450]438      }
[5314]439    }
440
[5450]441    /// <summary>
442    /// pause slave immediately
443    /// </summary>
444    private void DoStopAll() {
[6357]445      clientCom.LogMessage("Stopping all jobs.");
[6725]446      foreach (Guid taskId in taskManager.TaskIds) {
447        StopTaskAsync(taskId);
[5314]448      }
449    }
450
[6357]451    #region Slave Lifecycle Methods
[5450]452    /// <summary>
453    /// completly shudown slave
454    /// </summary>
[5280]455    public void Shutdown() {
456      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
457      MessageQueue.GetInstance().AddMessage(mc);
458      waitShutdownSem.WaitOne();
459    }
460
[5314]461    /// <summary>
[5450]462    /// complete shutdown, should be called before the the application is exited
[5314]463    /// </summary>
[5280]464    private void ShutdownCore() {
[6371]465      clientCom.LogMessage("Shutdown signal received");
[5786]466      clientCom.LogMessage("Stopping heartbeat");
[5105]467      heartbeatManager.StopHeartBeat();
[6357]468      abortRequested = true;
[5105]469
[6203]470      DoAbortAll();
[5280]471
[6371]472      clientCom.LogMessage("Logging out");
[5137]473      WcfService.Instance.Disconnect();
[5786]474      clientCom.Shutdown();
[5156]475      SlaveClientCom.Close();
[5280]476
477      if (slaveComm.State != CommunicationState.Closed)
478        slaveComm.Close();
[5105]479    }
480
[5137]481    /// <summary>
[5450]482    /// reinitializes everything and continues operation,
483    /// can be called after Sleep()
484    /// </summary> 
485    private void DoStartSlave() {
[5786]486      clientCom.LogMessage("Restart received");
[6357]487      configManager.Asleep = false;
[5450]488    }
489
490    /// <summary>
491    /// stop slave, except for client gui communication,
492    /// primarily used by gui if core is running as windows service
[5826]493    /// </summary>   
[5450]494    private void Sleep() {
[6257]495      clientCom.LogMessage("Sleep received - not accepting any new jobs");
[6357]496      configManager.Asleep = true;
[6371]497      DoPauseAll();
[5450]498    }
[6357]499    #endregion
[5137]500  }
[5105]501}
Note: See TracBrowser for help on using the repository browser.