Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 6721

Last change on this file since 6721 was 6721, checked in by ascheibe, 13 years ago

#1233 Review comments: renamed Job to Task

File size: 17.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.ServiceModel;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive.SlaveCore.Properties;
28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using TS = System.Threading.Tasks;
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Slave.
37  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    private static HeartbeatManager heartbeatManager;
41    public static HeartbeatManager HeartbeatManager {
42      get { return heartbeatManager; }
43    }
44
45    public EventLog ServiceEventLog { get; set; }
46
47    private Semaphore waitShutdownSem = new Semaphore(0, 1);
48    private bool abortRequested;
49    private ISlaveCommunication clientCom;
50    private ServiceHost slaveComm;
51    private WcfService wcfService;
52    private JobManager jobManager;
53    private ConfigManager configManager;
54    private PluginManager pluginManager;
55
56    public Core() {
57      var log = new ThreadSafeLog(new Log(Settings.Default.MaxLogCount));
58      this.pluginManager = new PluginManager(WcfService.Instance, log);
59      this.jobManager = new JobManager(pluginManager, log);
60      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
61
62      RegisterJobManagerEvents();
63
64      this.configManager = new ConfigManager(jobManager);
65      ConfigManager.Instance = this.configManager;
66    }
67
68    /// <summary>
69    /// Main method for the client
70    /// </summary>
71    public void Start() {
72      abortRequested = false;
73
74      try {
75        //start the client communication service (pipe between slave and slave gui)
76        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
77        slaveComm.Open();
78        clientCom = SlaveClientCom.Instance.ClientCom;
79
80        // delete all left over job directories
81        pluginManager.CleanPluginTemp();
82        clientCom.LogMessage("Hive Slave started");
83
84        wcfService = WcfService.Instance;
85        RegisterServiceEvents();
86
87        StartHeartbeats(); // Start heartbeats thread       
88        DispatchMessageQueue(); // dispatch messages until abortRequested
89      }
90      catch (Exception ex) {
91        if (ServiceEventLog != null) {
92          try {
93            ServiceEventLog.WriteEntry(string.Format("Hive Slave threw exception: {0} with stack trace: {1}", ex.ToString(), ex.StackTrace), EventLogEntryType.Error);
94          }
95          catch (Exception) { }
96        } else {
97          //try to log with clientCom. if this works the user sees at least a message,
98          //else an exception will be thrown anyways.
99          clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
100        }
101        ShutdownCore();
102      }
103      finally {
104        DeregisterServiceEvents();
105        waitShutdownSem.Release();
106      }
107    }
108
109    private void StartHeartbeats() {
110      //Initialize the heartbeat     
111      if (heartbeatManager == null) {
112        heartbeatManager = new HeartbeatManager();
113        heartbeatManager.StartHeartbeat();
114      }
115    }
116
117    private void DispatchMessageQueue() {
118      MessageQueue queue = MessageQueue.GetInstance();
119      while (!abortRequested) {
120        MessageContainer container = queue.GetMessage();
121        DetermineAction(container);
122        if (!abortRequested) {
123          clientCom.StatusChanged(configManager.GetStatusForClientConsole());
124        }
125      }
126    }
127
128    private void RegisterServiceEvents() {
129      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
130      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
131    }
132
133    private void DeregisterServiceEvents() {
134      WcfService.Instance.Connected -= WcfService_Connected;
135      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
136    }
137
138    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
139      clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
140    }
141
142    private void WcfService_Connected(object sender, EventArgs e) {
143      clientCom.LogMessage("Connected successfully to Hive server");
144    }
145
146    /// <summary>
147    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
148    /// </summary>
149    /// <param name="container">The container, containing the message</param>
150    private void DetermineAction(MessageContainer container) {
151      clientCom.LogMessage(string.Format("Message: {0} for job: {1} ", container.Message.ToString(), container.TaskId));
152
153      if (container is ExecutorMessageContainer<Guid>) {
154        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
155        c.execute();
156      } else if (container is MessageContainer) {
157        switch (container.Message) {
158          case MessageContainer.MessageType.CalculateJob:
159            CalculateJobAsync(container.TaskId);
160            break;
161          case MessageContainer.MessageType.AbortJob:
162            AbortJobAsync(container.TaskId);
163            break;
164          case MessageContainer.MessageType.StopJob:
165            StopJobAsync(container.TaskId);
166            break;
167          case MessageContainer.MessageType.PauseJob:
168            PauseJobAsync(container.TaskId);
169            break;
170          case MessageContainer.MessageType.StopAll:
171            DoStopAll();
172            break;
173          case MessageContainer.MessageType.PauseAll:
174            DoPauseAll();
175            break;
176          case MessageContainer.MessageType.AbortAll:
177            DoAbortAll();
178            break;
179          case MessageContainer.MessageType.ShutdownSlave:
180            ShutdownCore();
181            break;
182          case MessageContainer.MessageType.Restart:
183            DoStartSlave();
184            break;
185          case MessageContainer.MessageType.Sleep:
186            Sleep();
187            break;
188          case MessageContainer.MessageType.SayHello:
189            wcfService.Connect(configManager.GetClientInfo());
190            break;
191        }
192      } else {
193        clientCom.LogMessage("Unknown MessageContainer: " + container);
194      }
195    }
196
197    private void CalculateJobAsync(Guid jobId) {
198      TS.Task.Factory.StartNew(HandleCalculateJob, jobId)
199      .ContinueWith((t) => {
200        SlaveStatusInfo.IncrementExceptionOccured();
201        clientCom.LogMessage(t.Exception.ToString());
202      }, TaskContinuationOptions.OnlyOnFaulted);
203    }
204
205    private void StopJobAsync(Guid jobId) {
206      TS.Task.Factory.StartNew(HandleStopJob, jobId)
207       .ContinueWith((t) => {
208         SlaveStatusInfo.IncrementExceptionOccured();
209         clientCom.LogMessage(t.Exception.ToString());
210       }, TaskContinuationOptions.OnlyOnFaulted);
211    }
212
213    private void PauseJobAsync(Guid jobId) {
214      TS.Task.Factory.StartNew(HandlePauseJob, jobId)
215       .ContinueWith((t) => {
216         SlaveStatusInfo.IncrementExceptionOccured();
217         clientCom.LogMessage(t.Exception.ToString());
218       }, TaskContinuationOptions.OnlyOnFaulted);
219    }
220
221    private void AbortJobAsync(Guid jobId) {
222      TS.Task.Factory.StartNew(HandleAbortJob, jobId)
223       .ContinueWith((t) => {
224         SlaveStatusInfo.IncrementExceptionOccured();
225         clientCom.LogMessage(t.Exception.ToString());
226       }, TaskContinuationOptions.OnlyOnFaulted);
227    }
228
229    private void HandleCalculateJob(object jobIdObj) {
230      Guid jobId = (Guid)jobIdObj;
231      Task job = null;
232      int usedCores = 0;
233      try {
234        job = wcfService.GetJob(jobId);
235        if (job == null) throw new JobNotFoundException(jobId);
236        if (ConfigManager.Instance.GetFreeCores() < job.CoresNeeded) throw new OutOfCoresException();
237        if (ConfigManager.GetFreeMemory() < job.MemoryNeeded) throw new OutOfMemoryException();
238        SlaveStatusInfo.IncrementUsedCores(job.CoresNeeded); usedCores = job.CoresNeeded;
239        TaskData jobData = wcfService.GetJobData(jobId);
240        if (jobData == null) throw new JobDataNotFoundException(jobId);
241        job = wcfService.UpdateJobState(jobId, TaskState.Calculating, null);
242        if (job == null) throw new JobNotFoundException(jobId);
243        jobManager.StartJobAsync(job, jobData);
244      }
245      catch (JobNotFoundException) {
246        SlaveStatusInfo.DecrementUsedCores(usedCores);
247        throw;
248      }
249      catch (JobDataNotFoundException) {
250        SlaveStatusInfo.DecrementUsedCores(usedCores);
251        throw;
252      }
253      catch (JobAlreadyRunningException) {
254        SlaveStatusInfo.DecrementUsedCores(usedCores);
255        throw;
256      }
257      catch (OutOfCoresException) {
258        wcfService.UpdateJobState(jobId, TaskState.Waiting, "No more cores available");
259        throw;
260      }
261      catch (OutOfMemoryException) {
262        wcfService.UpdateJobState(jobId, TaskState.Waiting, "No more memory available");
263        throw;
264      }
265      catch (Exception e) {
266        SlaveStatusInfo.DecrementUsedCores(usedCores);
267        wcfService.UpdateJobState(jobId, TaskState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
268        throw;
269      }
270    }
271
272    private void HandleStopJob(object jobIdObj) {
273      Guid jobId = (Guid)jobIdObj;
274      try {
275        Task job = wcfService.GetJob(jobId);
276        if (job == null) throw new JobNotFoundException(jobId);
277        jobManager.StopJobAsync(jobId);
278      }
279      catch (JobNotFoundException) {
280        throw;
281      }
282      catch (JobNotRunningException) {
283        throw;
284      }
285      catch (AppDomainNotCreatedException) {
286        throw;
287      }
288    }
289
290    private void HandlePauseJob(object jobIdObj) {
291      Guid jobId = (Guid)jobIdObj;
292      try {
293        Task job = wcfService.GetJob(jobId);
294        if (job == null) throw new JobNotFoundException(jobId);
295        jobManager.PauseJobAsync(jobId);
296      }
297      catch (JobNotFoundException) {
298        throw;
299      }
300      catch (JobNotRunningException) {
301        throw;
302      }
303      catch (AppDomainNotCreatedException) {
304        throw;
305      }
306    }
307
308    private void HandleAbortJob(object jobIdObj) {
309      Guid jobId = (Guid)jobIdObj;
310      try {
311        jobManager.AbortJob(jobId);
312      }
313      catch (JobNotFoundException) {
314        throw;
315      }
316    }
317
318    #region JobManager Events
319    private void RegisterJobManagerEvents() {
320      this.jobManager.JobStarted += new EventHandler<EventArgs<SlaveJob>>(jobManager_JobStarted);
321      this.jobManager.JobPaused += new EventHandler<EventArgs<SlaveJob, TaskData>>(jobManager_JobPaused);
322      this.jobManager.JobStopped += new EventHandler<EventArgs<SlaveJob, TaskData>>(jobManager_JobStopped);
323      this.jobManager.JobFailed += new EventHandler<EventArgs<Tuple<SlaveJob, TaskData, Exception>>>(jobManager_JobFailed);
324      this.jobManager.ExceptionOccured += new EventHandler<EventArgs<SlaveJob, Exception>>(jobManager_ExceptionOccured);
325      this.jobManager.JobAborted += new EventHandler<EventArgs<SlaveJob>>(jobManager_JobAborted);
326    }
327
328    private void jobManager_JobStarted(object sender, EventArgs<SlaveJob> e) {
329      // successfully started, everything is good
330    }
331
332    private void jobManager_JobPaused(object sender, EventArgs<SlaveJob, TaskData> e) {
333      try {
334        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
335        heartbeatManager.AwakeHeartBeatThread();
336        Task job = wcfService.GetJob(e.Value.JobId);
337        if (job == null) throw new JobNotFoundException(e.Value.JobId);
338        job.ExecutionTime = e.Value.ExecutionTime;
339        TaskData jobData = e.Value.GetJobData();
340        wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, TaskState.Paused);
341      }
342      catch (JobNotFoundException ex) {
343        clientCom.LogMessage(ex.ToString());
344      }
345      catch (Exception ex) {
346        clientCom.LogMessage(ex.ToString());
347      }
348    }
349
350    private void jobManager_JobStopped(object sender, EventArgs<SlaveJob, TaskData> e) {
351      try {
352        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
353        heartbeatManager.AwakeHeartBeatThread();
354        Task job = wcfService.GetJob(e.Value.JobId);
355        if (job == null) throw new JobNotFoundException(e.Value.JobId);
356        job.ExecutionTime = e.Value.ExecutionTime;
357        TaskData jobData = e.Value.GetJobData();
358        wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, TaskState.Finished);
359      }
360      catch (JobNotFoundException ex) {
361        clientCom.LogMessage(ex.ToString());
362      }
363      catch (Exception ex) {
364        clientCom.LogMessage(ex.ToString());
365      }
366    }
367
368    private void jobManager_JobFailed(object sender, EventArgs<Tuple<SlaveJob, TaskData, Exception>> e) {
369      try {
370        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
371        heartbeatManager.AwakeHeartBeatThread();
372        SlaveJob slaveJob = e.Value.Item1;
373        TaskData jobData = e.Value.Item2;
374        Exception exception = e.Value.Item3;
375
376        Task job = wcfService.GetJob(slaveJob.JobId);
377        if (job == null) throw new JobNotFoundException(slaveJob.JobId);
378        job.ExecutionTime = slaveJob.ExecutionTime;
379        if (jobData != null) {
380          wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, TaskState.Failed, exception.ToString());
381        } else {
382          wcfService.UpdateJobState(job.Id, TaskState.Failed, exception.ToString());
383        }
384        clientCom.LogMessage(exception.Message);
385      }
386      catch (JobNotFoundException ex) {
387        SlaveStatusInfo.IncrementExceptionOccured();
388        clientCom.LogMessage(ex.ToString());
389      }
390      catch (Exception ex) {
391        SlaveStatusInfo.IncrementExceptionOccured();
392        clientCom.LogMessage(ex.ToString());
393      }
394    }
395
396    private void jobManager_ExceptionOccured(object sender, EventArgs<SlaveJob, Exception> e) {
397      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
398      SlaveStatusInfo.IncrementExceptionOccured();
399      heartbeatManager.AwakeHeartBeatThread();
400      clientCom.LogMessage(string.Format("Exception occured for job {0}: {1}", e.Value.JobId, e.Value2.ToString()));
401      wcfService.UpdateJobState(e.Value.JobId, TaskState.Waiting, e.Value2.ToString());
402    }
403
404    private void jobManager_JobAborted(object sender, EventArgs<SlaveJob> e) {
405      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
406    }
407    #endregion
408
409    #region Log Events
410    private void log_MessageAdded(object sender, EventArgs<string> e) {
411      clientCom.LogMessage(e.Value.Split('\t')[1]);
412    }
413    #endregion
414
415    /// <summary>
416    /// aborts all running jobs, no results are sent back
417    /// </summary>
418    private void DoAbortAll() {
419      clientCom.LogMessage("Aborting all jobs.");
420      foreach (Guid jobId in jobManager.JobIds) {
421        AbortJobAsync(jobId);
422      }
423    }
424
425    /// <summary>
426    /// wait for jobs to finish, then pause client
427    /// </summary>
428    private void DoPauseAll() {
429      clientCom.LogMessage("Pausing all jobs.");
430      foreach (Guid jobId in jobManager.JobIds) {
431        PauseJobAsync(jobId);
432      }
433    }
434
435    /// <summary>
436    /// pause slave immediately
437    /// </summary>
438    private void DoStopAll() {
439      clientCom.LogMessage("Stopping all jobs.");
440      foreach (Guid jobId in jobManager.JobIds) {
441        StopJobAsync(jobId);
442      }
443    }
444
445    #region Slave Lifecycle Methods
446    /// <summary>
447    /// completly shudown slave
448    /// </summary>
449    public void Shutdown() {
450      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
451      MessageQueue.GetInstance().AddMessage(mc);
452      waitShutdownSem.WaitOne();
453    }
454
455    /// <summary>
456    /// complete shutdown, should be called before the the application is exited
457    /// </summary>
458    private void ShutdownCore() {
459      clientCom.LogMessage("Shutdown signal received");
460      clientCom.LogMessage("Stopping heartbeat");
461      heartbeatManager.StopHeartBeat();
462      abortRequested = true;
463
464      DoAbortAll();
465
466      clientCom.LogMessage("Logging out");
467      WcfService.Instance.Disconnect();
468      clientCom.Shutdown();
469      SlaveClientCom.Close();
470
471      if (slaveComm.State != CommunicationState.Closed)
472        slaveComm.Close();
473    }
474
475    /// <summary>
476    /// reinitializes everything and continues operation,
477    /// can be called after Sleep()
478    /// </summary> 
479    private void DoStartSlave() {
480      clientCom.LogMessage("Restart received");
481      configManager.Asleep = false;
482    }
483
484    /// <summary>
485    /// stop slave, except for client gui communication,
486    /// primarily used by gui if core is running as windows service
487    /// </summary>   
488    private void Sleep() {
489      clientCom.LogMessage("Sleep received - not accepting any new jobs");
490      configManager.Asleep = true;
491      DoPauseAll();
492    }
493    #endregion
494  }
495}
Note: See TracBrowser for help on using the repository browser.