Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 7160

Last change on this file since 7160 was 6998, checked in by ascheibe, 13 years ago

#1672 Changed again how plugin discovery works because of Hive. The reason is that it must be possible to move the plugin and working directories away from the original slave working directory. This is needed for the Slave App and also in the future for the windows service because we don't want it to run as the LocalSystem user.
I have removed setting the PrivateBinPath and am now setting the ApplicationBase. This doesn't effect HL (because ApplicationBase is set by default to pluginDir anyway) but makes Hive work. The reason why setting the PrivateBinPath doesn't work with moving plugin and working directories is (from msdn): "Private assemblies are deployed in the same directory structure as the application. If the directories specified for PrivateBinPath are not under ApplicationBase, they are ignored."

File size: 18.0 KB
RevLine 
[6983]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.ServiceModel;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive.SlaveCore.Properties;
28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using TS = System.Threading.Tasks;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    private static HeartbeatManager heartbeatManager;
41    public static HeartbeatManager HeartbeatManager {
42      get { return heartbeatManager; }
43    }
44
45    public EventLog ServiceEventLog { get; set; }
46
47    private Semaphore waitShutdownSem = new Semaphore(0, 1);
48    private bool abortRequested;
49    private ISlaveCommunication clientCom;
50    private ServiceHost slaveComm;
51    private WcfService wcfService;
52    private TaskManager taskManager;
53    private ConfigManager configManager;
54    private PluginManager pluginManager;
55
56    public Core() {
57      var log = new ThreadSafeLog(Settings.Default.MaxLogCount);
58      this.pluginManager = new PluginManager(WcfService.Instance, log);
59      this.taskManager = new TaskManager(pluginManager, log);
60      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
61
62      RegisterTaskManagerEvents();
63
64      this.configManager = new ConfigManager(taskManager);
65      ConfigManager.Instance = this.configManager;
66    }
67
68    /// <summary>
69    /// Main method for the client
70    /// </summary>
71    public void Start() {
72      abortRequested = false;
73      EventLogManager.ServiceEventLog = ServiceEventLog;
74
75      try {
76        //start the client communication service (pipe between slave and slave gui)
77        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
78        slaveComm.Open();
79        clientCom = SlaveClientCom.Instance.ClientCom;
80
81        // delete all left over task directories
82        pluginManager.CleanPluginTemp();
83        clientCom.LogMessage("Hive Slave started");
84
85        wcfService = WcfService.Instance;
86        RegisterServiceEvents();
87
88        StartHeartbeats(); // Start heartbeats thread       
89        DispatchMessageQueue(); // dispatch messages until abortRequested
90      }
91      catch (Exception ex) {
92        if (ServiceEventLog != null) {
93          EventLogManager.LogException(ex);
94        } else {
95          //try to log with clientCom. if this works the user sees at least a message,
96          //else an exception will be thrown anyways.
97          clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
98        }
99        ShutdownCore();
100      }
101      finally {
102        DeregisterServiceEvents();
103        waitShutdownSem.Release();
104      }
105    }
106
107    private void StartHeartbeats() {
108      //Initialize the heartbeat     
109      if (heartbeatManager == null) {
110        heartbeatManager = new HeartbeatManager();
111        heartbeatManager.StartHeartbeat();
112      }
113    }
114
115    private void DispatchMessageQueue() {
116      MessageQueue queue = MessageQueue.GetInstance();
117      while (!abortRequested) {
118        MessageContainer container = queue.GetMessage();
119        DetermineAction(container);
120        if (!abortRequested) {
121          clientCom.StatusChanged(configManager.GetStatusForClientConsole());
122        }
123      }
124    }
125
126    private void RegisterServiceEvents() {
127      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
128      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
129    }
130
131    private void DeregisterServiceEvents() {
132      WcfService.Instance.Connected -= WcfService_Connected;
133      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
134    }
135
136    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
137      clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
138    }
139
140    private void WcfService_Connected(object sender, EventArgs e) {
141      clientCom.LogMessage("Connected successfully to Hive server");
142    }
143
144    /// <summary>
145    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
146    /// </summary>
147    /// <param name="container">The container, containing the message</param>
148    private void DetermineAction(MessageContainer container) {
149      clientCom.LogMessage(string.Format("Message: {0} for task: {1} ", container.Message.ToString(), container.TaskId));
150
[6994]151      switch (container.Message) {
152        case MessageContainer.MessageType.CalculateTask:
153          CalculateTaskAsync(container.TaskId);
154          break;
155        case MessageContainer.MessageType.AbortTask:
156          AbortTaskAsync(container.TaskId);
157          break;
158        case MessageContainer.MessageType.StopTask:
159          StopTaskAsync(container.TaskId);
160          break;
161        case MessageContainer.MessageType.PauseTask:
162          PauseTaskAsync(container.TaskId);
163          break;
164        case MessageContainer.MessageType.StopAll:
165          DoStopAll();
166          break;
167        case MessageContainer.MessageType.PauseAll:
168          DoPauseAll();
169          break;
170        case MessageContainer.MessageType.AbortAll:
171          DoAbortAll();
172          break;
173        case MessageContainer.MessageType.ShutdownSlave:
174          ShutdownCore();
175          break;
176        case MessageContainer.MessageType.Restart:
177          DoStartSlave();
178          break;
179        case MessageContainer.MessageType.Sleep:
180          Sleep();
181          break;
182        case MessageContainer.MessageType.SayHello:
183          wcfService.Connect(configManager.GetClientInfo());
184          break;
185        case MessageContainer.MessageType.NewHBInterval:
186          int interval = wcfService.GetNewHeartbeatInterval(ConfigManager.Instance.GetClientInfo().Id);
187          if (interval != -1) {
188            HeartbeatManager.Interval = TimeSpan.FromSeconds(interval);
189          }
190          break;
[6983]191      }
192    }
193
194    private void CalculateTaskAsync(Guid jobId) {
195      TS.Task.Factory.StartNew(HandleCalculateTask, jobId)
196      .ContinueWith((t) => {
197        SlaveStatusInfo.IncrementExceptionOccured();
198        clientCom.LogMessage(t.Exception.ToString());
199      }, TaskContinuationOptions.OnlyOnFaulted);
200    }
201
202    private void StopTaskAsync(Guid jobId) {
203      TS.Task.Factory.StartNew(HandleStopTask, jobId)
204       .ContinueWith((t) => {
205         SlaveStatusInfo.IncrementExceptionOccured();
206         clientCom.LogMessage(t.Exception.ToString());
207       }, TaskContinuationOptions.OnlyOnFaulted);
208    }
209
210    private void PauseTaskAsync(Guid jobId) {
211      TS.Task.Factory.StartNew(HandlePauseTask, jobId)
212       .ContinueWith((t) => {
213         SlaveStatusInfo.IncrementExceptionOccured();
214         clientCom.LogMessage(t.Exception.ToString());
215       }, TaskContinuationOptions.OnlyOnFaulted);
216    }
217
218    private void AbortTaskAsync(Guid jobId) {
219      TS.Task.Factory.StartNew(HandleAbortTask, jobId)
220       .ContinueWith((t) => {
221         SlaveStatusInfo.IncrementExceptionOccured();
222         clientCom.LogMessage(t.Exception.ToString());
223       }, TaskContinuationOptions.OnlyOnFaulted);
224    }
225
226    private void HandleCalculateTask(object taskIdObj) {
227      Guid taskId = (Guid)taskIdObj;
228      Task task = null;
229      int usedCores = 0;
230      try {
231        task = wcfService.GetTask(taskId);
232        if (task == null) throw new TaskNotFoundException(taskId);
233        if (ConfigManager.Instance.GetFreeCores() < task.CoresNeeded) throw new OutOfCoresException();
234        if (ConfigManager.Instance.GetFreeMemory() < task.MemoryNeeded) throw new OutOfMemoryException();
235        SlaveStatusInfo.IncrementUsedCores(task.CoresNeeded); usedCores = task.CoresNeeded;
236        TaskData taskData = wcfService.GetTaskData(taskId);
237        if (taskData == null) throw new TaskDataNotFoundException(taskId);
238        task = wcfService.UpdateJobState(taskId, TaskState.Calculating, null);
239        if (task == null) throw new TaskNotFoundException(taskId);
240        taskManager.StartTaskAsync(task, taskData);
241      }
242      catch (TaskNotFoundException) {
243        SlaveStatusInfo.DecrementUsedCores(usedCores);
244        throw;
245      }
246      catch (TaskDataNotFoundException) {
247        SlaveStatusInfo.DecrementUsedCores(usedCores);
248        throw;
249      }
250      catch (TaskAlreadyRunningException) {
251        SlaveStatusInfo.DecrementUsedCores(usedCores);
252        throw;
253      }
254      catch (OutOfCoresException) {
255        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more cores available");
256        throw;
257      }
258      catch (OutOfMemoryException) {
259        wcfService.UpdateJobState(taskId, TaskState.Waiting, "No more memory available");
260        throw;
261      }
262      catch (Exception e) {
263        SlaveStatusInfo.DecrementUsedCores(usedCores);
264        wcfService.UpdateJobState(taskId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
265        throw;
266      }
267    }
268
269    private void HandleStopTask(object taskIdObj) {
270      Guid taskId = (Guid)taskIdObj;
271      try {
272        Task task = wcfService.GetTask(taskId);
273        if (task == null) throw new TaskNotFoundException(taskId);
274        taskManager.StopTaskAsync(taskId);
275      }
276      catch (TaskNotFoundException) {
277        throw;
278      }
279      catch (TaskNotRunningException) {
280        throw;
281      }
282      catch (AppDomainNotCreatedException) {
283        throw;
284      }
285    }
286
287    private void HandlePauseTask(object taskIdObj) {
288      Guid taskId = (Guid)taskIdObj;
289      try {
290        Task task = wcfService.GetTask(taskId);
291        if (task == null) throw new TaskNotFoundException(taskId);
292        taskManager.PauseTaskAsync(taskId);
293      }
294      catch (TaskNotFoundException) {
295        throw;
296      }
297      catch (TaskNotRunningException) {
298        throw;
299      }
300      catch (AppDomainNotCreatedException) {
301        throw;
302      }
303    }
304
305    private void HandleAbortTask(object taskIdObj) {
306      Guid taskId = (Guid)taskIdObj;
307      try {
308        taskManager.AbortTask(taskId);
309      }
310      catch (TaskNotFoundException) {
311        throw;
312      }
313    }
314
315    #region TaskManager Events
316    private void RegisterTaskManagerEvents() {
317      this.taskManager.TaskStarted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskStarted);
318      this.taskManager.TaskPaused += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskPaused);
319      this.taskManager.TaskStopped += new EventHandler<EventArgs<SlaveTask, TaskData>>(taskManager_TaskStopped);
320      this.taskManager.TaskFailed += new EventHandler<EventArgs<Tuple<SlaveTask, TaskData, Exception>>>(taskManager_TaskFailed);
321      this.taskManager.ExceptionOccured += new EventHandler<EventArgs<SlaveTask, Exception>>(taskManager_ExceptionOccured);
322      this.taskManager.TaskAborted += new EventHandler<EventArgs<SlaveTask>>(taskManager_TaskAborted);
323    }
324
325    private void taskManager_TaskStarted(object sender, EventArgs<SlaveTask> e) {
326      // successfully started, everything is good
327    }
328
329    private void taskManager_TaskPaused(object sender, EventArgs<SlaveTask, TaskData> e) {
330      try {
331        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
332        heartbeatManager.AwakeHeartBeatThread();
333        Task task = wcfService.GetTask(e.Value.TaskId);
334        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
335        task.ExecutionTime = e.Value.ExecutionTime;
336        TaskData taskData = e.Value.GetTaskData();
337        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Paused);
338      }
339      catch (TaskNotFoundException ex) {
340        clientCom.LogMessage(ex.ToString());
341      }
342      catch (Exception ex) {
343        clientCom.LogMessage(ex.ToString());
344      }
345    }
346
347    private void taskManager_TaskStopped(object sender, EventArgs<SlaveTask, TaskData> e) {
348      try {
349        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
350        heartbeatManager.AwakeHeartBeatThread();
351        Task task = wcfService.GetTask(e.Value.TaskId);
352        if (task == null) throw new TaskNotFoundException(e.Value.TaskId);
353        task.ExecutionTime = e.Value.ExecutionTime;
354        TaskData taskData = e.Value.GetTaskData();
355        wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Finished);
356      }
357      catch (TaskNotFoundException ex) {
358        clientCom.LogMessage(ex.ToString());
359      }
360      catch (Exception ex) {
361        clientCom.LogMessage(ex.ToString());
362      }
363    }
364
365    private void taskManager_TaskFailed(object sender, EventArgs<Tuple<SlaveTask, TaskData, Exception>> e) {
366      try {
367        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
368        heartbeatManager.AwakeHeartBeatThread();
369        SlaveTask slaveTask = e.Value.Item1;
370        TaskData taskData = e.Value.Item2;
371        Exception exception = e.Value.Item3;
372
373        Task task = wcfService.GetTask(slaveTask.TaskId);
374        if (task == null) throw new TaskNotFoundException(slaveTask.TaskId);
375        task.ExecutionTime = slaveTask.ExecutionTime;
376        if (taskData != null) {
377          wcfService.UpdateTaskData(task, taskData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
378        } else {
379          wcfService.UpdateJobState(task.Id, TaskState.Failed, exception.ToString());
380        }
381        clientCom.LogMessage(exception.Message);
382      }
383      catch (TaskNotFoundException ex) {
384        SlaveStatusInfo.IncrementExceptionOccured();
385        clientCom.LogMessage(ex.ToString());
386      }
387      catch (Exception ex) {
388        SlaveStatusInfo.IncrementExceptionOccured();
389        clientCom.LogMessage(ex.ToString());
390      }
391    }
392
393    private void taskManager_ExceptionOccured(object sender, EventArgs<SlaveTask, Exception> e) {
394      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
395      SlaveStatusInfo.IncrementExceptionOccured();
396      heartbeatManager.AwakeHeartBeatThread();
397      clientCom.LogMessage(string.Format("Exception occured for task {0}: {1}", e.Value.TaskId, e.Value2.ToString()));
398      wcfService.UpdateJobState(e.Value.TaskId, TaskState.Waiting, e.Value2.ToString());
399    }
400
401    private void taskManager_TaskAborted(object sender, EventArgs<SlaveTask> e) {
402      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
403    }
404    #endregion
405
406    #region Log Events
407    private void log_MessageAdded(object sender, EventArgs<string> e) {
[6998]408      try {
409        clientCom.LogMessage(e.Value.Split('\t')[1]);
410      }
411      catch { }
[6983]412    }
413    #endregion
414
415    /// <summary>
416    /// aborts all running jobs, no results are sent back
417    /// </summary>
418    private void DoAbortAll() {
419      clientCom.LogMessage("Aborting all jobs.");
420      foreach (Guid taskId in taskManager.TaskIds) {
421        AbortTaskAsync(taskId);
422      }
423    }
424
425    /// <summary>
426    /// wait for jobs to finish, then pause client
427    /// </summary>
428    private void DoPauseAll() {
429      clientCom.LogMessage("Pausing all jobs.");
430      foreach (Guid taskId in taskManager.TaskIds) {
431        PauseTaskAsync(taskId);
432      }
433    }
434
435    /// <summary>
436    /// pause slave immediately
437    /// </summary>
438    private void DoStopAll() {
439      clientCom.LogMessage("Stopping all jobs.");
440      foreach (Guid taskId in taskManager.TaskIds) {
441        StopTaskAsync(taskId);
442      }
443    }
444
445    #region Slave Lifecycle Methods
446    /// <summary>
447    /// completly shudown slave
448    /// </summary>
449    public void Shutdown() {
450      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
451      MessageQueue.GetInstance().AddMessage(mc);
452      waitShutdownSem.WaitOne();
453    }
454
455    /// <summary>
456    /// complete shutdown, should be called before the the application is exited
457    /// </summary>
458    private void ShutdownCore() {
459      clientCom.LogMessage("Shutdown signal received");
460      clientCom.LogMessage("Stopping heartbeat");
461      heartbeatManager.StopHeartBeat();
462      abortRequested = true;
463
464      DoAbortAll();
465
466      clientCom.LogMessage("Logging out");
467      WcfService.Instance.Disconnect();
468      clientCom.Shutdown();
469      SlaveClientCom.Close();
470
471      if (slaveComm.State != CommunicationState.Closed)
472        slaveComm.Close();
473    }
474
475    /// <summary>
476    /// reinitializes everything and continues operation,
477    /// can be called after Sleep()
478    /// </summary> 
479    private void DoStartSlave() {
480      clientCom.LogMessage("Restart received");
481      configManager.Asleep = false;
482    }
483
484    /// <summary>
485    /// stop slave, except for client gui communication,
486    /// primarily used by gui if core is running as windows service
487    /// </summary>   
488    private void Sleep() {
489      clientCom.LogMessage("Sleep received - not accepting any new jobs");
490      configManager.Asleep = true;
491      DoPauseAll();
492    }
493    #endregion
494  }
495}
Note: See TracBrowser for help on using the repository browser.