Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 6945

Last change on this file since 6945 was 6945, checked in by ascheibe, 12 years ago

#1233 slave: catch more errors and log them to the windows event log

File size: 18.3 KB
RevLine 
[5105]1#region License Information
2/* HeuristicLab
[6371]3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
[5105]4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
[5778]23using System.Diagnostics;
[5280]24using System.ServiceModel;
[5105]25using System.Threading;
[5721]26using System.Threading.Tasks;
[6546]27using HeuristicLab.Clients.Hive.SlaveCore.Properties;
[5599]28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
[5105]29using HeuristicLab.Common;
30using HeuristicLab.Core;
[6721]31using TS = System.Threading.Tasks;
[5105]32
33
[5599]34namespace HeuristicLab.Clients.Hive.SlaveCore {
[5105]35  /// <summary>
[5826]36  /// The core component of the Hive Slave.
[6371]37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
[5105]38  /// </summary>
39  public class Core : MarshalByRefObject {
[6258]40    private static HeartbeatManager heartbeatManager;
41    public static HeartbeatManager HeartbeatManager {
42      get { return heartbeatManager; }
43    }
44
[6371]45    public EventLog ServiceEventLog { get; set; }
46
[6357]47    private Semaphore waitShutdownSem = new Semaphore(0, 1);
48    private bool abortRequested;
[6216]49    private ISlaveCommunication clientCom;
50    private ServiceHost slaveComm;
[5105]51    private WcfService wcfService;
[6725]52    private TaskManager taskManager;
[6357]53    private ConfigManager configManager;
54    private PluginManager pluginManager;
[6216]55
[6357]56    public Core() {
[6863]57      var log = new ThreadSafeLog(Settings.Default.MaxLogCount);
[6357]58      this.pluginManager = new PluginManager(WcfService.Instance, log);
[6725]59      this.taskManager = new TaskManager(pluginManager, log);
[6357]60      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
[5137]61
[6725]62      RegisterTaskManagerEvents();
[6357]63
[6725]64      this.configManager = new ConfigManager(taskManager);
[6357]65      ConfigManager.Instance = this.configManager;
66    }
67
[5105]68    /// <summary>
[6371]69    /// Main method for the client
[5105]70    /// </summary>
71    public void Start() {
[6357]72      abortRequested = false;
[6945]73      EventLogManager.ServiceEventLog = ServiceEventLog;
[5105]74
[5778]75      try {
76        //start the client communication service (pipe between slave and slave gui)
77        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
78        slaveComm.Open();
[6101]79        clientCom = SlaveClientCom.Instance.ClientCom;
[5280]80
[6725]81        // delete all left over task directories
[6357]82        pluginManager.CleanPluginTemp();
[5786]83        clientCom.LogMessage("Hive Slave started");
[5156]84
[5778]85        wcfService = WcfService.Instance;
86        RegisterServiceEvents();
[5105]87
[5826]88        StartHeartbeats(); // Start heartbeats thread       
[5778]89        DispatchMessageQueue(); // dispatch messages until abortRequested
90      }
91      catch (Exception ex) {
92        if (ServiceEventLog != null) {
[6945]93          EventLogManager.LogException(ex);
[5778]94        } else {
[6100]95          //try to log with clientCom. if this works the user sees at least a message,
96          //else an exception will be thrown anyways.
[6216]97          clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
[5778]98        }
[6112]99        ShutdownCore();
[5778]100      }
101      finally {
[6258]102        DeregisterServiceEvents();
[5778]103        waitShutdownSem.Release();
104      }
[5105]105    }
106
107    private void StartHeartbeats() {
[5778]108      //Initialize the heartbeat     
[5780]109      if (heartbeatManager == null) {
[6456]110        heartbeatManager = new HeartbeatManager();
[5778]111        heartbeatManager.StartHeartbeat();
112      }
[5105]113    }
114
115    private void DispatchMessageQueue() {
116      MessageQueue queue = MessageQueue.GetInstance();
[6357]117      while (!abortRequested) {
[5105]118        MessageContainer container = queue.GetMessage();
119        DetermineAction(container);
[6357]120        if (!abortRequested) {
121          clientCom.StatusChanged(configManager.GetStatusForClientConsole());
[6204]122        }
[5105]123      }
124    }
125
126    private void RegisterServiceEvents() {
[5472]127      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
128      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
[5105]129    }
130
[6258]131    private void DeregisterServiceEvents() {
[5472]132      WcfService.Instance.Connected -= WcfService_Connected;
133      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
[5105]134    }
135
[6258]136    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
[6216]137      clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
[5156]138    }
139
[6258]140    private void WcfService_Connected(object sender, EventArgs e) {
[5786]141      clientCom.LogMessage("Connected successfully to Hive server");
[5156]142    }
143
[5105]144    /// <summary>
145    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
146    /// </summary>
[5826]147    /// <param name="container">The container, containing the message</param>
[5105]148    private void DetermineAction(MessageContainer container) {
[6725]149      clientCom.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
[5721]150
[5137]151      if (container is ExecutorMessageContainer<Guid>) {
152        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
153        c.execute();
154      } else if (container is MessageContainer) {
155        switch (container.Message) {
[6725]156          case MessageContainer.MessageType.CalculateTask:
157            CalculateTaskAsync(container.TaskId);
[5137]158            break;
[6725]159          case MessageContainer.MessageType.AbortTask:
160            AbortTaskAsync(container.TaskId);
[5137]161            break;
[6725]162          case MessageContainer.MessageType.StopTask:
163            StopTaskAsync(container.TaskId);
[6357]164            break;
[6725]165          case MessageContainer.MessageType.PauseTask:
166            PauseTaskAsync(container.TaskId);
[6357]167            break;
[5450]168          case MessageContainer.MessageType.StopAll:
169            DoStopAll();
[5314]170            break;
[5450]171          case MessageContainer.MessageType.PauseAll:
172            DoPauseAll();
[5314]173            break;
[5450]174          case MessageContainer.MessageType.AbortAll:
175            DoAbortAll();
176            break;
[6357]177          case MessageContainer.MessageType.ShutdownSlave:
178            ShutdownCore();
[5450]179            break;
[5314]180          case MessageContainer.MessageType.Restart:
[5450]181            DoStartSlave();
[5314]182            break;
[5451]183          case MessageContainer.MessageType.Sleep:
184            Sleep();
185            break;
[5541]186          case MessageContainer.MessageType.SayHello:
[6357]187            wcfService.Connect(configManager.GetClientInfo());
[5541]188            break;
[6893]189          case MessageContainer.MessageType.NewHBInterval:
190            int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
191            if (interval != -1) {
192              HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
193            }
194            break;
[5137]195        }
196      } else {
[5786]197        clientCom.LogMessage("Unknown MessageContainer: " + container);
[5105]198      }
199    }
200
[6725]201    private void CalculateTaskAsync(Guid jobId) {
202      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
[6357]203      .ContinueWith((t) => {
204        SlaveStatusInfo.IncrementExceptionOccured();
205        clientCom.LogMessage(t.Exception.ToString());
206      }, TaskContinuationOptions.OnlyOnFaulted);
207    }
208
[6725]209    private void StopTaskAsync(Guid jobId) {
210      TS.Task.Factory.StartNew(HandleStopTask, jobId)
[6357]211       .ContinueWith((t) => {
212         SlaveStatusInfo.IncrementExceptionOccured();
213         clientCom.LogMessage(t.Exception.ToString());
214       }, TaskContinuationOptions.OnlyOnFaulted);
215    }
216
[6725]217    private void PauseTaskAsync(Guid jobId) {
218      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
[6357]219       .ContinueWith((t) => {
220         SlaveStatusInfo.IncrementExceptionOccured();
221         clientCom.LogMessage(t.Exception.ToString());
222       }, TaskContinuationOptions.OnlyOnFaulted);
223    }
224
[6725]225    private void AbortTaskAsync(Guid jobId) {
226      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
[6357]227       .ContinueWith((t) => {
228         SlaveStatusInfo.IncrementExceptionOccured();
229         clientCom.LogMessage(t.Exception.ToString());
230       }, TaskContinuationOptions.OnlyOnFaulted);
231    }
232
[6725]233    private void HandleCalculateTask(object taskIdObj) {
234      Guid taskId = (Guid)taskIdObj;
235      Task task = null;
[6381]236      int usedCores = 0;
[6357]237      try {
[6743]238        task = wcfService.GetTask(taskId);
[6725]239        if (task == null) throw new TaskNotFoundException(taskId);
240        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
[6910]241        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
[6725]242        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
[6743]243        TaskData taskData = wcfService.GetTaskData(taskId);
[6725]244        if (taskData == null) throw new TaskDataNotFoundException(taskId);
245        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
246        if (task == null) throw new TaskNotFoundException(taskId);
247        taskManager.StartTaskAsync(task, taskData);
[6357]248      }
[6725]249      catch (TaskNotFoundException) {
[6381]250        SlaveStatusInfo.DecrementUsedCores(usedCores);
[6357]251        throw;
252      }
[6725]253      catch (TaskDataNotFoundException) {
[6381]254        SlaveStatusInfo.DecrementUsedCores(usedCores);
[6357]255        throw;
256      }
[6725]257      catch (TaskAlreadyRunningException) {
[6381]258        SlaveStatusInfo.DecrementUsedCores(usedCores);
[6357]259        throw;
260      }
261      catch (OutOfCoresException) {
[6725]262        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
[6357]263        throw;
264      }
265      catch (OutOfMemoryException) {
[6725]266        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
[6357]267        throw;
268      }
[6381]269      catch (Exception e) {
270        SlaveStatusInfo.DecrementUsedCores(usedCores);
[6725]271        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
[6357]272        throw;
273      }
274    }
[6216]275
[6725]276    private void HandleStopTask(object taskIdObj) {
277      Guid taskId = (Guid)taskIdObj;
[6357]278      try {
[6743]279        Task task = wcfService.GetTask(taskId);
[6725]280        if (task == null) throw new TaskNotFoundException(taskId);
281        taskManager.StopTaskAsync(taskId);
[6216]282      }
[6725]283      catch (TaskNotFoundException) {
[6357]284        throw;
285      }
[6725]286      catch (TaskNotRunningException) {
[6357]287        throw;
288      }
289      catch (AppDomainNotCreatedException) {
290        throw;
291      }
292    }
[6216]293
[6725]294    private void HandlePauseTask(object taskIdObj) {
295      Guid taskId = (Guid)taskIdObj;
[6357]296      try {
[6743]297        Task task = wcfService.GetTask(taskId);
[6725]298        if (task == null) throw new TaskNotFoundException(taskId);
299        taskManager.PauseTaskAsync(taskId);
[6216]300      }
[6725]301      catch (TaskNotFoundException) {
[6357]302        throw;
303      }
[6725]304      catch (TaskNotRunningException) {
[6357]305        throw;
306      }
[6464]307      catch (AppDomainNotCreatedException) {
308        throw;
309      }
[6216]310    }
311
[6725]312    private void HandleAbortTask(object taskIdObj) {
313      Guid taskId = (Guid)taskIdObj;
[6357]314      try {
[6725]315        taskManager.AbortTask(taskId);
[6357]316      }
[6725]317      catch (TaskNotFoundException) {
[6357]318        throw;
319      }
320    }
[6216]321
[6725]322    #region TaskManager Events
323    private void RegisterTaskManagerEvents() {
324      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
325      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
326      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
327      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
328      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
329      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
[6357]330    }
331
[6725]332    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
[6357]333      // successfully started, everything is good
334    }
335
[6725]336    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
[6357]337      try {
338        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
339        heartbeatManager.AwakeHeartBeatThread();
[6743]340        Task task = wcfService.GetTask(e.Value.TaskId);
[6725]341        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
342        task.ExecutionTime = e.Value.ExecutionTime;
343        TaskData taskData = e.Value.GetTaskData();
[6743]344        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
[6216]345      }
[6725]346      catch (TaskNotFoundException ex) {
[6357]347        clientCom.LogMessage(ex.ToString());
348      }
349      catch (Exception ex) {
350        clientCom.LogMessage(ex.ToString());
351      }
352    }
[6216]353
[6725]354    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
[6357]355      try {
356        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
357        heartbeatManager.AwakeHeartBeatThread();
[6743]358        Task task = wcfService.GetTask(e.Value.TaskId);
[6725]359        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
360        task.ExecutionTime = e.Value.ExecutionTime;
361        TaskData taskData = e.Value.GetTaskData();
[6743]362        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
[6216]363      }
[6725]364      catch (TaskNotFoundException ex) {
[6357]365        clientCom.LogMessage(ex.ToString());
366      }
367      catch (Exception ex) {
368        clientCom.LogMessage(ex.ToString());
369      }
[6216]370    }
371
[6725]372    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
[6357]373      try {
374        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
375        heartbeatManager.AwakeHeartBeatThread();
[6725]376        SlaveTask slaveTask = e.Value.Item1;
377        TaskData taskData = e.Value.Item2;
[6357]378        Exception exception = e.Value.Item3;
[6216]379
[6743]380        Task task = wcfService.GetTask(slaveTask.TaskId);
[6725]381        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
382        task.ExecutionTime = slaveTask.ExecutionTime;
383        if (taskData != null) {
[6743]384          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
[6216]385        } else {
[6725]386          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
[6216]387        }
[6357]388        clientCom.LogMessage(exception.Message);
[6216]389      }
[6725]390      catch (TaskNotFoundException ex) {
[6357]391        SlaveStatusInfo.IncrementExceptionOccured();
392        clientCom.LogMessage(ex.ToString());
[6216]393      }
[6357]394      catch (Exception ex) {
395        SlaveStatusInfo.IncrementExceptionOccured();
396        clientCom.LogMessage(ex.ToString());
397      }
[6216]398    }
399
[6725]400    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
[6357]401      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
402      SlaveStatusInfo.IncrementExceptionOccured();
403      heartbeatManager.AwakeHeartBeatThread();
[6725]404      clientCom.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
405      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
[6357]406    }
[6216]407
[6725]408    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
[6357]409      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
410    }
411    #endregion
[6216]412
[6357]413    #region Log Events
414    private void log_MessageAdded(object sender, EventArgs<string> e) {
415      clientCom.LogMessage(e.Value.Split('\t')[1]);
416    }
417    #endregion
418
[5450]419    /// <summary>
420    /// aborts all running jobs, no results are sent back
421    /// </summary>
422    private void DoAbortAll() {
[6357]423      clientCom.LogMessage("Aborting all jobs.");
[6725]424      foreach (Guid taskId in taskManager.TaskIds) {
425        AbortTaskAsync(taskId);
[5450]426      }
[5314]427    }
428
429    /// <summary>
[5450]430    /// wait for jobs to finish, then pause client
[5314]431    /// </summary>
[5450]432    private void DoPauseAll() {
[6357]433      clientCom.LogMessage("Pausing all jobs.");
[6725]434      foreach (Guid taskId in taskManager.TaskIds) {
435        PauseTaskAsync(taskId);
[5450]436      }
[5314]437    }
438
[5450]439    /// <summary>
440    /// pause slave immediately
441    /// </summary>
442    private void DoStopAll() {
[6357]443      clientCom.LogMessage("Stopping all jobs.");
[6725]444      foreach (Guid taskId in taskManager.TaskIds) {
445        StopTaskAsync(taskId);
[5314]446      }
447    }
448
[6357]449    #region Slave Lifecycle Methods
[5450]450    /// <summary>
451    /// completly shudown slave
452    /// </summary>
[5280]453    public void Shutdown() {
454      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
455      MessageQueue.GetInstance().AddMessage(mc);
456      waitShutdownSem.WaitOne();
457    }
458
[5314]459    /// <summary>
[5450]460    /// complete shutdown, should be called before the the application is exited
[5314]461    /// </summary>
[5280]462    private void ShutdownCore() {
[6371]463      clientCom.LogMessage("Shutdown signal received");
[5786]464      clientCom.LogMessage("Stopping heartbeat");
[5105]465      heartbeatManager.StopHeartBeat();
[6357]466      abortRequested = true;
[5105]467
[6203]468      DoAbortAll();
[5280]469
[6371]470      clientCom.LogMessage("Logging out");
[5137]471      WcfService.Instance.Disconnect();
[5786]472      clientCom.Shutdown();
[5156]473      SlaveClientCom.Close();
[5280]474
475      if (slaveComm.State != CommunicationState.Closed)
476        slaveComm.Close();
[5105]477    }
478
[5137]479    /// <summary>
[5450]480    /// reinitializes everything and continues operation,
481    /// can be called after Sleep()
482    /// </summary> 
483    private void DoStartSlave() {
[5786]484      clientCom.LogMessage("Restart received");
[6357]485      configManager.Asleep = false;
[5450]486    }
487
488    /// <summary>
489    /// stop slave, except for client gui communication,
490    /// primarily used by gui if core is running as windows service
[5826]491    /// </summary>   
[5450]492    private void Sleep() {
[6257]493      clientCom.LogMessage("Sleep received - not accepting any new jobs");
[6357]494      configManager.Asleep = true;
[6371]495      DoPauseAll();
[5450]496    }
[6357]497    #endregion
[5137]498  }
[5105]499}
Note: See TracBrowser for help on using the repository browser.