Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 7887

Last change on this file since 7887 was 7259, checked in by swagner, 13 years ago

Updated year of copyrights to 2012 (#1716)

File size: 18.2 KB
RevLine 
[6983]1#region License Information
2/* HeuristicLab
[7259]3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
[6983]4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.ServiceModel;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive.SlaveCore.Properties;
28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using TS = System.Threading.Tasks;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    private static HeartbeatManager heartbeatManager;
41    public static HeartbeatManager HeartbeatManager {
42      get { return heartbeatManager; }
43    }
44
45    public EventLog ServiceEventLog { get; set; }
46
47    private Semaphore waitShutdownSem = new Semaphore(0, 1);
48    private bool abortRequested;
49    private ServiceHost slaveComm;
50    private WcfService wcfService;
51    private TaskManager taskManager;
52    private ConfigManager configManager;
53    private PluginManager pluginManager;
54
55    public Core() {
56      var log = new ThreadSafeLog(Settings.Default.MaxLogCount);
57      this.pluginManager = new PluginManager(WcfService.Instance, log);
58      this.taskManager = new TaskManager(pluginManager, log);
59      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
60
61      RegisterTaskManagerEvents();
62
63      this.configManager = new ConfigManager(taskManager);
64      ConfigManager.Instance = this.configManager;
65    }
66
67    /// <summary>
68    /// Main method for the client
69    /// </summary>
70    public void Start() {
71      abortRequested = false;
72      EventLogManager.ServiceEventLog = ServiceEventLog;
73
74      try {
75        //start the client communication service (pipe between slave and slave gui)
76        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
77        slaveComm.Open();
[7171]78       
[6983]79        // delete all left over task directories
80        pluginManager.CleanPluginTemp();
[7171]81        SlaveClientCom.Instance.LogMessage("Hive Slave started");
[6983]82
83        wcfService = WcfService.Instance;
84        RegisterServiceEvents();
85
86        StartHeartbeats(); // Start heartbeats thread       
87        DispatchMessageQueue(); // dispatch messages until abortRequested
88      }
89      catch (Exception ex) {
90        if (ServiceEventLog != null) {
91          EventLogManager.LogException(ex);
92        } else {
[7171]93          //try to log with SlaveClientCom.Instance. if this works the user sees at least a message,
[6983]94          //else an exception will be thrown anyways.
[7171]95          SlaveClientCom.Instance.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
[6983]96        }
97        ShutdownCore();
[7166]98      } finally {
[6983]99        DeregisterServiceEvents();
100        waitShutdownSem.Release();
101      }
102    }
103
104    private void StartHeartbeats() {
105      //Initialize the heartbeat     
106      if (heartbeatManager == null) {
107        heartbeatManager = new HeartbeatManager();
108        heartbeatManager.StartHeartbeat();
109      }
110    }
111
112    private void DispatchMessageQueue() {
113      MessageQueue queue = MessageQueue.GetInstance();
114      while (!abortRequested) {
115        MessageContainer container = queue.GetMessage();
116        DetermineAction(container);
117        if (!abortRequested) {
[7171]118          SlaveClientCom.Instance.StatusChanged(configManager.GetStatusForClientConsole());
[6983]119        }
120      }
121    }
122
123    private void RegisterServiceEvents() {
124      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
125      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
126    }
127
128    private void DeregisterServiceEvents() {
129      WcfService.Instance.Connected -= WcfService_Connected;
130      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
131    }
132
133    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
[7171]134      SlaveClientCom.Instance.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
[6983]135    }
136
137    private void WcfService_Connected(object sender, EventArgs e) {
[7171]138      SlaveClientCom.Instance.LogMessage("Connected successfully to Hive server");
[6983]139    }
140
141    /// <summary>
142    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
143    /// </summary>
144    /// <param name="container">The container, containing the message</param>
145    private void DetermineAction(MessageContainer container) {
[7171]146      SlaveClientCom.Instance.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
[6983]147
[6994]148      switch (container.Message) {
149        case MessageContainer.MessageType.CalculateTask:
150          CalculateTaskAsync(container.TaskId);
151          break;
152        case MessageContainer.MessageType.AbortTask:
153          AbortTaskAsync(container.TaskId);
154          break;
155        case MessageContainer.MessageType.StopTask:
156          StopTaskAsync(container.TaskId);
157          break;
158        case MessageContainer.MessageType.PauseTask:
159          PauseTaskAsync(container.TaskId);
160          break;
161        case MessageContainer.MessageType.StopAll:
162          DoStopAll();
163          break;
164        case MessageContainer.MessageType.PauseAll:
165          DoPauseAll();
166          break;
167        case MessageContainer.MessageType.AbortAll:
168          DoAbortAll();
169          break;
170        case MessageContainer.MessageType.ShutdownSlave:
171          ShutdownCore();
172          break;
173        case MessageContainer.MessageType.Restart:
174          DoStartSlave();
175          break;
176        case MessageContainer.MessageType.Sleep:
177          Sleep();
178          break;
179        case MessageContainer.MessageType.SayHello:
180          wcfService.Connect(configManager.GetClientInfo());
181          break;
182        case MessageContainer.MessageType.NewHBInterval:
183          int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
184          if (interval != -1) {
185            HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
186          }
187          break;
[6983]188      }
189    }
190
191    private void CalculateTaskAsync(Guid jobId) {
192      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
193      .ContinueWith((t) => {
194        SlaveStatusInfo.IncrementExceptionOccured();
[7171]195        SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
[6983]196      }, TaskContinuationOptions.OnlyOnFaulted);
197    }
198
199    private void StopTaskAsync(Guid jobId) {
200      TS.Task.Factory.StartNew(HandleStopTask, jobId)
201       .ContinueWith((t) => {
202         SlaveStatusInfo.IncrementExceptionOccured();
[7171]203         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
[6983]204       }, TaskContinuationOptions.OnlyOnFaulted);
205    }
206
207    private void PauseTaskAsync(Guid jobId) {
208      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
209       .ContinueWith((t) => {
210         SlaveStatusInfo.IncrementExceptionOccured();
[7171]211         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
[6983]212       }, TaskContinuationOptions.OnlyOnFaulted);
213    }
214
215    private void AbortTaskAsync(Guid jobId) {
216      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
217       .ContinueWith((t) => {
218         SlaveStatusInfo.IncrementExceptionOccured();
[7171]219         SlaveClientCom.Instance.LogMessage(t.Exception.ToString());
[6983]220       }, TaskContinuationOptions.OnlyOnFaulted);
221    }
222
223    private void HandleCalculateTask(object taskIdObj) {
224      Guid taskId = (Guid)taskIdObj;
225      Task task = null;
226      int usedCores = 0;
227      try {
228        task = wcfService.GetTask(taskId);
229        if (task == null) throw new TaskNotFoundException(taskId);
230        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
231        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
232        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
233        TaskData taskData = wcfService.GetTaskData(taskId);
234        if (taskData == null) throw new TaskDataNotFoundException(taskId);
235        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
236        if (task == null) throw new TaskNotFoundException(taskId);
237        taskManager.StartTaskAsync(task, taskData);
238      }
239      catch (TaskNotFoundException) {
240        SlaveStatusInfo.DecrementUsedCores(usedCores);
241        throw;
242      }
243      catch (TaskDataNotFoundException) {
244        SlaveStatusInfo.DecrementUsedCores(usedCores);
245        throw;
246      }
247      catch (TaskAlreadyRunningException) {
248        SlaveStatusInfo.DecrementUsedCores(usedCores);
249        throw;
250      }
251      catch (OutOfCoresException) {
252        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
253        throw;
254      }
255      catch (OutOfMemoryException) {
256        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
257        throw;
258      }
259      catch (Exception e) {
260        SlaveStatusInfo.DecrementUsedCores(usedCores);
261        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
262        throw;
263      }
264    }
265
266    private void HandleStopTask(object taskIdObj) {
267      Guid taskId = (Guid)taskIdObj;
268      try {
269        Task task = wcfService.GetTask(taskId);
270        if (task == null) throw new TaskNotFoundException(taskId);
271        taskManager.StopTaskAsync(taskId);
272      }
273      catch (TaskNotFoundException) {
274        throw;
275      }
276      catch (TaskNotRunningException) {
277        throw;
278      }
279      catch (AppDomainNotCreatedException) {
280        throw;
281      }
282    }
283
284    private void HandlePauseTask(object taskIdObj) {
285      Guid taskId = (Guid)taskIdObj;
286      try {
287        Task task = wcfService.GetTask(taskId);
288        if (task == null) throw new TaskNotFoundException(taskId);
289        taskManager.PauseTaskAsync(taskId);
290      }
291      catch (TaskNotFoundException) {
292        throw;
293      }
294      catch (TaskNotRunningException) {
295        throw;
296      }
297      catch (AppDomainNotCreatedException) {
298        throw;
299      }
300    }
301
302    private void HandleAbortTask(object taskIdObj) {
303      Guid taskId = (Guid)taskIdObj;
304      try {
305        taskManager.AbortTask(taskId);
306      }
307      catch (TaskNotFoundException) {
308        throw;
309      }
310    }
311
312    #region TaskManager Events
313    private void RegisterTaskManagerEvents() {
314      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
315      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
316      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
317      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
318      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
319      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
320    }
321
322    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
323      // successfully started, everything is good
324    }
325
326    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
327      try {
328        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
329        heartbeatManager.AwakeHeartBeatThread();
330        Task task = wcfService.GetTask(e.Value.TaskId);
331        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
332        task.ExecutionTime = e.Value.ExecutionTime;
[7166]333        TaskData taskData = e.Value2;
[6983]334        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
335      }
336      catch (TaskNotFoundException ex) {
[7171]337        SlaveClientCom.Instance.LogMessage(ex.ToString());
[6983]338      }
339      catch (Exception ex) {
[7171]340        SlaveClientCom.Instance.LogMessage(ex.ToString());
[6983]341      }
342    }
343
344    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
345      try {
346        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
347        heartbeatManager.AwakeHeartBeatThread();
348        Task task = wcfService.GetTask(e.Value.TaskId);
349        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
350        task.ExecutionTime = e.Value.ExecutionTime;
[7166]351        TaskData taskData = e.Value2;
[6983]352        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
353      }
354      catch (TaskNotFoundException ex) {
[7171]355        SlaveClientCom.Instance.LogMessage(ex.ToString());
[6983]356      }
357      catch (Exception ex) {
[7171]358        SlaveClientCom.Instance.LogMessage(ex.ToString());
[6983]359      }
360    }
361
362    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
363      try {
364        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
365        heartbeatManager.AwakeHeartBeatThread();
366        SlaveTask slaveTask = e.Value.Item1;
367        TaskData taskData = e.Value.Item2;
368        Exception exception = e.Value.Item3;
369
370        Task task = wcfService.GetTask(slaveTask.TaskId);
371        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
372        task.ExecutionTime = slaveTask.ExecutionTime;
373        if (taskData != null) {
374          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
375        } else {
376          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
377        }
[7171]378        SlaveClientCom.Instance.LogMessage(exception.Message);
[6983]379      }
380      catch (TaskNotFoundException ex) {
381        SlaveStatusInfo.IncrementExceptionOccured();
[7171]382        SlaveClientCom.Instance.LogMessage(ex.ToString());
[6983]383      }
384      catch (Exception ex) {
385        SlaveStatusInfo.IncrementExceptionOccured();
[7171]386        SlaveClientCom.Instance.LogMessage(ex.ToString());
[6983]387      }
388    }
389
390    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
391      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
392      SlaveStatusInfo.IncrementExceptionOccured();
393      heartbeatManager.AwakeHeartBeatThread();
[7171]394      SlaveClientCom.Instance.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
[6983]395      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
396    }
397
398    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
399      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
400    }
401    #endregion
402
403    #region Log Events
404    private void log_MessageAdded(object sender, EventArgs<string> e) {
[6998]405      try {
[7171]406        SlaveClientCom.Instance.LogMessage(e.Value.Split('\t')[1]);
[6998]407      }
408      catch { }
[6983]409    }
410    #endregion
411
412    /// <summary>
413    /// aborts all running jobs, no results are sent back
414    /// </summary>
415    private void DoAbortAll() {
[7171]416      SlaveClientCom.Instance.LogMessage("Aborting all jobs.");
[6983]417      foreach (Guid taskId in taskManager.TaskIds) {
418        AbortTaskAsync(taskId);
419      }
420    }
421
422    /// <summary>
423    /// wait for jobs to finish, then pause client
424    /// </summary>
425    private void DoPauseAll() {
[7171]426      SlaveClientCom.Instance.LogMessage("Pausing all jobs.");
[6983]427      foreach (Guid taskId in taskManager.TaskIds) {
428        PauseTaskAsync(taskId);
429      }
430    }
431
432    /// <summary>
433    /// pause slave immediately
434    /// </summary>
435    private void DoStopAll() {
[7171]436      SlaveClientCom.Instance.LogMessage("Stopping all jobs.");
[6983]437      foreach (Guid taskId in taskManager.TaskIds) {
438        StopTaskAsync(taskId);
439      }
440    }
441
442    #region Slave Lifecycle Methods
443    /// <summary>
444    /// completly shudown slave
445    /// </summary>
446    public void Shutdown() {
447      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
448      MessageQueue.GetInstance().AddMessage(mc);
449      waitShutdownSem.WaitOne();
450    }
451
452    /// <summary>
453    /// complete shutdown, should be called before the the application is exited
454    /// </summary>
455    private void ShutdownCore() {
[7171]456      SlaveClientCom.Instance.LogMessage("Shutdown signal received");
457      SlaveClientCom.Instance.LogMessage("Stopping heartbeat");
[6983]458      heartbeatManager.StopHeartBeat();
459      abortRequested = true;
460
461      DoAbortAll();
462
[7171]463      SlaveClientCom.Instance.LogMessage("Logging out");
[6983]464      WcfService.Instance.Disconnect();
[7171]465      SlaveClientCom.Instance.ClientCom.Shutdown();
[6983]466      SlaveClientCom.Close();
467
468      if (slaveComm.State != CommunicationState.Closed)
469        slaveComm.Close();
470    }
471
472    /// <summary>
473    /// reinitializes everything and continues operation,
474    /// can be called after Sleep()
475    /// </summary> 
476    private void DoStartSlave() {
[7171]477      SlaveClientCom.Instance.LogMessage("Restart received");
[6983]478      configManager.Asleep = false;
479    }
480
481    /// <summary>
482    /// stop slave, except for client gui communication,
483    /// primarily used by gui if core is running as windows service
484    /// </summary>   
485    private void Sleep() {
[7171]486      SlaveClientCom.Instance.LogMessage("Sleep received - not accepting any new jobs");
[6983]487      configManager.Asleep = true;
488      DoPauseAll();
489    }
490    #endregion
491  }
492}
Note: See TracBrowser for help on using the repository browser.