Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 6464

Last change on this file since 6464 was 6464, checked in by ascheibe, 13 years ago

#1233

  • some Admin UI bugfixes

Slave:

  • fixed bug when Pause is called immediately after Calculate
  • send exceptions when something goes wrong in Pause or Stop
File size: 17.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.ServiceModel;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
28using HeuristicLab.Common;
29using HeuristicLab.Core;
30
31
32namespace HeuristicLab.Clients.Hive.SlaveCore {
33  /// <summary>
34  /// The core component of the Hive Slave.
35  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
36  /// </summary>
37  public class Core : MarshalByRefObject {
38    private static HeartbeatManager heartbeatManager;
39    public static HeartbeatManager HeartbeatManager {
40      get { return heartbeatManager; }
41    }
42
43    public EventLog ServiceEventLog { get; set; }
44
45    private Semaphore waitShutdownSem = new Semaphore(0, 1);
46    private bool abortRequested;
47    private ISlaveCommunication clientCom;
48    private ServiceHost slaveComm;
49    private WcfService wcfService;
50    private JobManager jobManager;
51    private ConfigManager configManager;
52    private PluginManager pluginManager;
53
54    public Core() {
55      var log = new ThreadSafeLog(new Log());
56      this.pluginManager = new PluginManager(WcfService.Instance, log);
57      this.jobManager = new JobManager(pluginManager, log);
58      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
59
60      RegisterJobManagerEvents();
61
62      this.configManager = new ConfigManager(jobManager);
63      ConfigManager.Instance = this.configManager;
64    }
65
66    /// <summary>
67    /// Main method for the client
68    /// </summary>
69    public void Start() {
70      abortRequested = false;
71
72      try {
73        //start the client communication service (pipe between slave and slave gui)
74        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
75        slaveComm.Open();
76        clientCom = SlaveClientCom.Instance.ClientCom;
77
78        // delete all left over job directories
79        pluginManager.CleanPluginTemp();
80        clientCom.LogMessage("Hive Slave started");
81
82        wcfService = WcfService.Instance;
83        RegisterServiceEvents();
84
85        StartHeartbeats(); // Start heartbeats thread       
86        DispatchMessageQueue(); // dispatch messages until abortRequested
87      }
88      catch (Exception ex) {
89        if (ServiceEventLog != null) {
90          try {
91            ServiceEventLog.WriteEntry(string.Format("Hive Slave threw exception: {0} with stack trace: {1}", ex.ToString(), ex.StackTrace), EventLogEntryType.Error);
92          }
93          catch (Exception) { }
94        } else {
95          //try to log with clientCom. if this works the user sees at least a message,
96          //else an exception will be thrown anyways.
97          clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
98        }
99        ShutdownCore();
100      }
101      finally {
102        DeregisterServiceEvents();
103        waitShutdownSem.Release();
104      }
105    }
106
107    private void StartHeartbeats() {
108      //Initialize the heartbeat     
109      if (heartbeatManager == null) {
110        heartbeatManager = new HeartbeatManager();
111        heartbeatManager.StartHeartbeat();
112      }
113    }
114
115    private void DispatchMessageQueue() {
116      MessageQueue queue = MessageQueue.GetInstance();
117      while (!abortRequested) {
118        MessageContainer container = queue.GetMessage();
119        DetermineAction(container);
120        if (!abortRequested) {
121          clientCom.StatusChanged(configManager.GetStatusForClientConsole());
122        }
123      }
124    }
125
126    private void RegisterServiceEvents() {
127      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
128      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
129    }
130
131    private void DeregisterServiceEvents() {
132      WcfService.Instance.Connected -= WcfService_Connected;
133      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
134    }
135
136    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
137      clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
138    }
139
140    private void WcfService_Connected(object sender, EventArgs e) {
141      clientCom.LogMessage("Connected successfully to Hive server");
142    }
143
144    /// <summary>
145    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
146    /// </summary>
147    /// <param name="container">The container, containing the message</param>
148    private void DetermineAction(MessageContainer container) {
149      clientCom.LogMessage(string.Format("Message: {0} for job: {1} ", container.Message.ToString(), container.JobId));
150
151      if (container is ExecutorMessageContainer<Guid>) {
152        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
153        c.execute();
154      } else if (container is MessageContainer) {
155        switch (container.Message) {
156          case MessageContainer.MessageType.CalculateJob:
157            CalculateJobAsync(container.JobId);
158            break;
159          case MessageContainer.MessageType.AbortJob:
160            AbortJobAsync(container.JobId);
161            break;
162          case MessageContainer.MessageType.StopJob:
163            StopJobAsync(container.JobId);
164            break;
165          case MessageContainer.MessageType.PauseJob:
166            PauseJobAsync(container.JobId);
167            break;
168          case MessageContainer.MessageType.StopAll:
169            DoStopAll();
170            break;
171          case MessageContainer.MessageType.PauseAll:
172            DoPauseAll();
173            break;
174          case MessageContainer.MessageType.AbortAll:
175            DoAbortAll();
176            break;
177          case MessageContainer.MessageType.ShutdownSlave:
178            ShutdownCore();
179            break;
180          case MessageContainer.MessageType.Restart:
181            DoStartSlave();
182            break;
183          case MessageContainer.MessageType.Sleep:
184            Sleep();
185            break;
186          case MessageContainer.MessageType.SayHello:
187            wcfService.Connect(configManager.GetClientInfo());
188            break;
189        }
190      } else {
191        clientCom.LogMessage("Unknown MessageContainer: " + container);
192      }
193    }
194
195    private void CalculateJobAsync(Guid jobId) {
196      Task.Factory.StartNew(HandleCalculateJob, jobId)
197      .ContinueWith((t) => {
198        SlaveStatusInfo.IncrementExceptionOccured();
199        clientCom.LogMessage(t.Exception.ToString());
200      }, TaskContinuationOptions.OnlyOnFaulted);
201    }
202
203    private void StopJobAsync(Guid jobId) {
204      Task.Factory.StartNew(HandleStopJob, jobId)
205       .ContinueWith((t) => {
206         SlaveStatusInfo.IncrementExceptionOccured();
207         clientCom.LogMessage(t.Exception.ToString());
208       }, TaskContinuationOptions.OnlyOnFaulted);
209    }
210
211    private void PauseJobAsync(Guid jobId) {
212      Task.Factory.StartNew(HandlePauseJob, jobId)
213       .ContinueWith((t) => {
214         SlaveStatusInfo.IncrementExceptionOccured();
215         clientCom.LogMessage(t.Exception.ToString());
216       }, TaskContinuationOptions.OnlyOnFaulted);
217    }
218
219    private void AbortJobAsync(Guid jobId) {
220      Task.Factory.StartNew(HandleAbortJob, jobId)
221       .ContinueWith((t) => {
222         SlaveStatusInfo.IncrementExceptionOccured();
223         clientCom.LogMessage(t.Exception.ToString());
224       }, TaskContinuationOptions.OnlyOnFaulted);
225    }
226
227    private void HandleCalculateJob(object jobIdObj) {
228      Guid jobId = (Guid)jobIdObj;
229      Job job = null;
230      int usedCores = 0;
231      try {
232        job = wcfService.GetJob(jobId);
233        if (job == null) throw new JobNotFoundException(jobId);
234        if (ConfigManager.Instance.GetFreeCores() < job.CoresNeeded) throw new OutOfCoresException();
235        if (ConfigManager.GetFreeMemory() < job.MemoryNeeded) throw new OutOfMemoryException();
236        SlaveStatusInfo.IncrementUsedCores(job.CoresNeeded); usedCores = job.CoresNeeded;
237        JobData jobData = wcfService.GetJobData(jobId);
238        if (jobData == null) throw new JobDataNotFoundException(jobId);
239        job = wcfService.UpdateJobState(jobId, JobState.Calculating, null);
240        if (job == null) throw new JobNotFoundException(jobId);
241        jobManager.StartJobAsync(job, jobData);
242      }
243      catch (JobNotFoundException) {
244        SlaveStatusInfo.DecrementUsedCores(usedCores);
245        throw;
246      }
247      catch (JobDataNotFoundException) {
248        SlaveStatusInfo.DecrementUsedCores(usedCores);
249        throw;
250      }
251      catch (JobAlreadyRunningException) {
252        SlaveStatusInfo.DecrementUsedCores(usedCores);
253        throw;
254      }
255      catch (OutOfCoresException) {
256        wcfService.UpdateJobState(jobId, JobState.Waiting, "No more cores available");
257        throw;
258      }
259      catch (OutOfMemoryException) {
260        wcfService.UpdateJobState(jobId, JobState.Waiting, "No more memory available");
261        throw;
262      }
263      catch (Exception e) {
264        SlaveStatusInfo.DecrementUsedCores(usedCores);
265        wcfService.UpdateJobState(jobId, JobState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
266        throw;
267      }
268    }
269
270    private void HandleStopJob(object jobIdObj) {
271      Guid jobId = (Guid)jobIdObj;
272      try {
273        Job job = wcfService.GetJob(jobId);
274        if (job == null) throw new JobNotFoundException(jobId);
275        jobManager.StopJobAsync(jobId);
276      }
277      catch (JobNotFoundException) {
278        throw;
279      }
280      catch (JobNotRunningException) {
281        throw;
282      }
283      catch (AppDomainNotCreatedException) {
284        throw;
285      }
286    }
287
288    private void HandlePauseJob(object jobIdObj) {
289      Guid jobId = (Guid)jobIdObj;
290      try {
291        Job job = wcfService.GetJob(jobId);
292        if (job == null) throw new JobNotFoundException(jobId);
293        jobManager.PauseJobAsync(jobId);
294      }
295      catch (JobNotFoundException) {
296        throw;
297      }
298      catch (JobNotRunningException) {
299        throw;
300      }
301      catch (AppDomainNotCreatedException) {
302        throw;
303      }
304    }
305
306    private void HandleAbortJob(object jobIdObj) {
307      Guid jobId = (Guid)jobIdObj;
308      try {
309        jobManager.AbortJob(jobId);
310      }
311      catch (JobNotFoundException) {
312        throw;
313      }
314    }
315
316    #region JobManager Events
317    private void RegisterJobManagerEvents() {
318      this.jobManager.JobStarted += new EventHandler<EventArgs<SlaveJob>>(jobManager_JobStarted);
319      this.jobManager.JobPaused += new EventHandler<EventArgs<SlaveJob, JobData>>(jobManager_JobPaused);
320      this.jobManager.JobStopped += new EventHandler<EventArgs<SlaveJob, JobData>>(jobManager_JobStopped);
321      this.jobManager.JobFailed += new EventHandler<EventArgs<Tuple<SlaveJob, JobData, Exception>>>(jobManager_JobFailed);
322      this.jobManager.ExceptionOccured += new EventHandler<EventArgs<SlaveJob, Exception>>(jobManager_ExceptionOccured);
323      this.jobManager.JobAborted += new EventHandler<EventArgs<SlaveJob>>(jobManager_JobAborted);
324    }
325
326    private void jobManager_JobStarted(object sender, EventArgs<SlaveJob> e) {
327      // successfully started, everything is good
328    }
329
330    private void jobManager_JobPaused(object sender, EventArgs<SlaveJob, JobData> e) {
331      try {
332        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
333        heartbeatManager.AwakeHeartBeatThread();
334        Job job = wcfService.GetJob(e.Value.JobId);
335        if (job == null) throw new JobNotFoundException(e.Value.JobId);
336        job.ExecutionTime = e.Value.ExecutionTime;
337        JobData jobData = e.Value.GetJobData();
338        wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Paused);
339      }
340      catch (JobNotFoundException ex) {
341        clientCom.LogMessage(ex.ToString());
342      }
343      catch (Exception ex) {
344        clientCom.LogMessage(ex.ToString());
345      }
346    }
347
348    private void jobManager_JobStopped(object sender, EventArgs<SlaveJob, JobData> e) {
349      try {
350        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
351        heartbeatManager.AwakeHeartBeatThread();
352        Job job = wcfService.GetJob(e.Value.JobId);
353        if (job == null) throw new JobNotFoundException(e.Value.JobId);
354        job.ExecutionTime = e.Value.ExecutionTime;
355        JobData jobData = e.Value.GetJobData();
356        wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Finished);
357      }
358      catch (JobNotFoundException ex) {
359        clientCom.LogMessage(ex.ToString());
360      }
361      catch (Exception ex) {
362        clientCom.LogMessage(ex.ToString());
363      }
364    }
365
366    private void jobManager_JobFailed(object sender, EventArgs<Tuple<SlaveJob, JobData, Exception>> e) {
367      try {
368        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
369        heartbeatManager.AwakeHeartBeatThread();
370        SlaveJob slaveJob = e.Value.Item1;
371        JobData jobData = e.Value.Item2;
372        Exception exception = e.Value.Item3;
373
374        Job job = wcfService.GetJob(slaveJob.JobId);
375        if (job == null) throw new JobNotFoundException(slaveJob.JobId);
376        job.ExecutionTime = slaveJob.ExecutionTime;
377        if (jobData != null) {
378          wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Failed, exception.ToString());
379        } else {
380          wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
381        }
382        clientCom.LogMessage(exception.Message);
383      }
384      catch (JobNotFoundException ex) {
385        SlaveStatusInfo.IncrementExceptionOccured();
386        clientCom.LogMessage(ex.ToString());
387      }
388      catch (Exception ex) {
389        SlaveStatusInfo.IncrementExceptionOccured();
390        clientCom.LogMessage(ex.ToString());
391      }
392    }
393
394    private void jobManager_ExceptionOccured(object sender, EventArgs<SlaveJob, Exception> e) {
395      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
396      SlaveStatusInfo.IncrementExceptionOccured();
397      heartbeatManager.AwakeHeartBeatThread();
398      clientCom.LogMessage(string.Format("Exception occured for job {0}: {1}", e.Value.JobId, e.Value2.ToString()));
399      wcfService.UpdateJobState(e.Value.JobId, JobState.Waiting, e.Value2.ToString());
400    }
401
402    private void jobManager_JobAborted(object sender, EventArgs<SlaveJob> e) {
403      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
404    }
405    #endregion
406
407    #region Log Events
408    private void log_MessageAdded(object sender, EventArgs<string> e) {
409      clientCom.LogMessage(e.Value.Split('\t')[1]);
410      ((ILog)sender).Clear(); // don't let the log take up memory
411    }
412    #endregion
413
414    /// <summary>
415    /// aborts all running jobs, no results are sent back
416    /// </summary>
417    private void DoAbortAll() {
418      clientCom.LogMessage("Aborting all jobs.");
419      foreach (Guid jobId in jobManager.JobIds) {
420        AbortJobAsync(jobId);
421      }
422    }
423
424    /// <summary>
425    /// wait for jobs to finish, then pause client
426    /// </summary>
427    private void DoPauseAll() {
428      clientCom.LogMessage("Pausing all jobs.");
429      foreach (Guid jobId in jobManager.JobIds) {
430        PauseJobAsync(jobId);
431      }
432    }
433
434    /// <summary>
435    /// pause slave immediately
436    /// </summary>
437    private void DoStopAll() {
438      clientCom.LogMessage("Stopping all jobs.");
439      foreach (Guid jobId in jobManager.JobIds) {
440        StopJobAsync(jobId);
441      }
442    }
443
444    #region Slave Lifecycle Methods
445    /// <summary>
446    /// completly shudown slave
447    /// </summary>
448    public void Shutdown() {
449      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
450      MessageQueue.GetInstance().AddMessage(mc);
451      waitShutdownSem.WaitOne();
452    }
453
454    /// <summary>
455    /// complete shutdown, should be called before the the application is exited
456    /// </summary>
457    private void ShutdownCore() {
458      clientCom.LogMessage("Shutdown signal received");
459      clientCom.LogMessage("Stopping heartbeat");
460      heartbeatManager.StopHeartBeat();
461      abortRequested = true;
462
463      DoAbortAll();
464
465      clientCom.LogMessage("Logging out");
466      WcfService.Instance.Disconnect();
467      clientCom.Shutdown();
468      SlaveClientCom.Close();
469
470      if (slaveComm.State != CommunicationState.Closed)
471        slaveComm.Close();
472    }
473
474    /// <summary>
475    /// reinitializes everything and continues operation,
476    /// can be called after Sleep()
477    /// </summary> 
478    private void DoStartSlave() {
479      clientCom.LogMessage("Restart received");
480      configManager.Asleep = false;
481    }
482
483    /// <summary>
484    /// stop slave, except for client gui communication,
485    /// primarily used by gui if core is running as windows service
486    /// </summary>   
487    private void Sleep() {
488      clientCom.LogMessage("Sleep received - not accepting any new jobs");
489      configManager.Asleep = true;
490      DoPauseAll();
491    }
492    #endregion
493  }
494}
Note: See TracBrowser for help on using the repository browser.