Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.3/Core.cs @ 6703

Last change on this file since 6703 was 6546, checked in by ascheibe, 13 years ago

#1233
Don't call Clear() on ThreadSafeLog in log_MessageAdded.
This doesn't work with the changes made in r6536 (LockRecursionPolicy.SupportsRecursion).
Instead use maxLogCount of Core.Log to limit the number of log messages kept in memory.

File size: 17.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.ServiceModel;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive.SlaveCore.Properties;
28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31
32
33namespace HeuristicLab.Clients.Hive.SlaveCore {
34  /// <summary>
35  /// The core component of the Hive Slave.
36  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
37  /// </summary>
38  public class Core : MarshalByRefObject {
39    private static HeartbeatManager heartbeatManager;
40    public static HeartbeatManager HeartbeatManager {
41      get { return heartbeatManager; }
42    }
43
44    public EventLog ServiceEventLog { get; set; }
45
46    private Semaphore waitShutdownSem = new Semaphore(0, 1);
47    private bool abortRequested;
48    private ISlaveCommunication clientCom;
49    private ServiceHost slaveComm;
50    private WcfService wcfService;
51    private JobManager jobManager;
52    private ConfigManager configManager;
53    private PluginManager pluginManager;
54
55    public Core() {
56      var log = new ThreadSafeLog(new Log(Settings.Default.MaxLogCount));
57      this.pluginManager = new PluginManager(WcfService.Instance, log);
58      this.jobManager = new JobManager(pluginManager, log);
59      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
60
61      RegisterJobManagerEvents();
62
63      this.configManager = new ConfigManager(jobManager);
64      ConfigManager.Instance = this.configManager;
65    }
66
67    /// <summary>
68    /// Main method for the client
69    /// </summary>
70    public void Start() {
71      abortRequested = false;
72
73      try {
74        //start the client communication service (pipe between slave and slave gui)
75        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
76        slaveComm.Open();
77        clientCom = SlaveClientCom.Instance.ClientCom;
78
79        // delete all left over job directories
80        pluginManager.CleanPluginTemp();
81        clientCom.LogMessage("Hive Slave started");
82
83        wcfService = WcfService.Instance;
84        RegisterServiceEvents();
85
86        StartHeartbeats(); // Start heartbeats thread       
87        DispatchMessageQueue(); // dispatch messages until abortRequested
88      }
89      catch (Exception ex) {
90        if (ServiceEventLog != null) {
91          try {
92            ServiceEventLog.WriteEntry(string.Format("Hive Slave threw exception: {0} with stack trace: {1}", ex.ToString(), ex.StackTrace), EventLogEntryType.Error);
93          }
94          catch (Exception) { }
95        } else {
96          //try to log with clientCom. if this works the user sees at least a message,
97          //else an exception will be thrown anyways.
98          clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
99        }
100        ShutdownCore();
101      }
102      finally {
103        DeregisterServiceEvents();
104        waitShutdownSem.Release();
105      }
106    }
107
108    private void StartHeartbeats() {
109      //Initialize the heartbeat     
110      if (heartbeatManager == null) {
111        heartbeatManager = new HeartbeatManager();
112        heartbeatManager.StartHeartbeat();
113      }
114    }
115
116    private void DispatchMessageQueue() {
117      MessageQueue queue = MessageQueue.GetInstance();
118      while (!abortRequested) {
119        MessageContainer container = queue.GetMessage();
120        DetermineAction(container);
121        if (!abortRequested) {
122          clientCom.StatusChanged(configManager.GetStatusForClientConsole());
123        }
124      }
125    }
126
127    private void RegisterServiceEvents() {
128      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
129      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
130    }
131
132    private void DeregisterServiceEvents() {
133      WcfService.Instance.Connected -= WcfService_Connected;
134      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
135    }
136
137    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
138      clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
139    }
140
141    private void WcfService_Connected(object sender, EventArgs e) {
142      clientCom.LogMessage("Connected successfully to Hive server");
143    }
144
145    /// <summary>
146    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
147    /// </summary>
148    /// <param name="container">The container, containing the message</param>
149    private void DetermineAction(MessageContainer container) {
150      clientCom.LogMessage(string.Format("Message: {0} for job: {1} ", container.Message.ToString(), container.JobId));
151
152      if (container is ExecutorMessageContainer<Guid>) {
153        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
154        c.execute();
155      } else if (container is MessageContainer) {
156        switch (container.Message) {
157          case MessageContainer.MessageType.CalculateJob:
158            CalculateJobAsync(container.JobId);
159            break;
160          case MessageContainer.MessageType.AbortJob:
161            AbortJobAsync(container.JobId);
162            break;
163          case MessageContainer.MessageType.StopJob:
164            StopJobAsync(container.JobId);
165            break;
166          case MessageContainer.MessageType.PauseJob:
167            PauseJobAsync(container.JobId);
168            break;
169          case MessageContainer.MessageType.StopAll:
170            DoStopAll();
171            break;
172          case MessageContainer.MessageType.PauseAll:
173            DoPauseAll();
174            break;
175          case MessageContainer.MessageType.AbortAll:
176            DoAbortAll();
177            break;
178          case MessageContainer.MessageType.ShutdownSlave:
179            ShutdownCore();
180            break;
181          case MessageContainer.MessageType.Restart:
182            DoStartSlave();
183            break;
184          case MessageContainer.MessageType.Sleep:
185            Sleep();
186            break;
187          case MessageContainer.MessageType.SayHello:
188            wcfService.Connect(configManager.GetClientInfo());
189            break;
190        }
191      } else {
192        clientCom.LogMessage("Unknown MessageContainer: " + container);
193      }
194    }
195
196    private void CalculateJobAsync(Guid jobId) {
197      Task.Factory.StartNew(HandleCalculateJob, jobId)
198      .ContinueWith((t) => {
199        SlaveStatusInfo.IncrementExceptionOccured();
200        clientCom.LogMessage(t.Exception.ToString());
201      }, TaskContinuationOptions.OnlyOnFaulted);
202    }
203
204    private void StopJobAsync(Guid jobId) {
205      Task.Factory.StartNew(HandleStopJob, jobId)
206       .ContinueWith((t) => {
207         SlaveStatusInfo.IncrementExceptionOccured();
208         clientCom.LogMessage(t.Exception.ToString());
209       }, TaskContinuationOptions.OnlyOnFaulted);
210    }
211
212    private void PauseJobAsync(Guid jobId) {
213      Task.Factory.StartNew(HandlePauseJob, jobId)
214       .ContinueWith((t) => {
215         SlaveStatusInfo.IncrementExceptionOccured();
216         clientCom.LogMessage(t.Exception.ToString());
217       }, TaskContinuationOptions.OnlyOnFaulted);
218    }
219
220    private void AbortJobAsync(Guid jobId) {
221      Task.Factory.StartNew(HandleAbortJob, jobId)
222       .ContinueWith((t) => {
223         SlaveStatusInfo.IncrementExceptionOccured();
224         clientCom.LogMessage(t.Exception.ToString());
225       }, TaskContinuationOptions.OnlyOnFaulted);
226    }
227
228    private void HandleCalculateJob(object jobIdObj) {
229      Guid jobId = (Guid)jobIdObj;
230      Job job = null;
231      int usedCores = 0;
232      try {
233        job = wcfService.GetJob(jobId);
234        if (job == null) throw new JobNotFoundException(jobId);
235        if (ConfigManager.Instance.GetFreeCores() < job.CoresNeeded) throw new OutOfCoresException();
236        if (ConfigManager.GetFreeMemory() < job.MemoryNeeded) throw new OutOfMemoryException();
237        SlaveStatusInfo.IncrementUsedCores(job.CoresNeeded); usedCores = job.CoresNeeded;
238        JobData jobData = wcfService.GetJobData(jobId);
239        if (jobData == null) throw new JobDataNotFoundException(jobId);
240        job = wcfService.UpdateJobState(jobId, JobState.Calculating, null);
241        if (job == null) throw new JobNotFoundException(jobId);
242        jobManager.StartJobAsync(job, jobData);
243      }
244      catch (JobNotFoundException) {
245        SlaveStatusInfo.DecrementUsedCores(usedCores);
246        throw;
247      }
248      catch (JobDataNotFoundException) {
249        SlaveStatusInfo.DecrementUsedCores(usedCores);
250        throw;
251      }
252      catch (JobAlreadyRunningException) {
253        SlaveStatusInfo.DecrementUsedCores(usedCores);
254        throw;
255      }
256      catch (OutOfCoresException) {
257        wcfService.UpdateJobState(jobId, JobState.Waiting, "No more cores available");
258        throw;
259      }
260      catch (OutOfMemoryException) {
261        wcfService.UpdateJobState(jobId, JobState.Waiting, "No more memory available");
262        throw;
263      }
264      catch (Exception e) {
265        SlaveStatusInfo.DecrementUsedCores(usedCores);
266        wcfService.UpdateJobState(jobId, JobState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
267        throw;
268      }
269    }
270
271    private void HandleStopJob(object jobIdObj) {
272      Guid jobId = (Guid)jobIdObj;
273      try {
274        Job job = wcfService.GetJob(jobId);
275        if (job == null) throw new JobNotFoundException(jobId);
276        jobManager.StopJobAsync(jobId);
277      }
278      catch (JobNotFoundException) {
279        throw;
280      }
281      catch (JobNotRunningException) {
282        throw;
283      }
284      catch (AppDomainNotCreatedException) {
285        throw;
286      }
287    }
288
289    private void HandlePauseJob(object jobIdObj) {
290      Guid jobId = (Guid)jobIdObj;
291      try {
292        Job job = wcfService.GetJob(jobId);
293        if (job == null) throw new JobNotFoundException(jobId);
294        jobManager.PauseJobAsync(jobId);
295      }
296      catch (JobNotFoundException) {
297        throw;
298      }
299      catch (JobNotRunningException) {
300        throw;
301      }
302      catch (AppDomainNotCreatedException) {
303        throw;
304      }
305    }
306
307    private void HandleAbortJob(object jobIdObj) {
308      Guid jobId = (Guid)jobIdObj;
309      try {
310        jobManager.AbortJob(jobId);
311      }
312      catch (JobNotFoundException) {
313        throw;
314      }
315    }
316
317    #region JobManager Events
318    private void RegisterJobManagerEvents() {
319      this.jobManager.JobStarted += new EventHandler<EventArgs<SlaveJob>>(jobManager_JobStarted);
320      this.jobManager.JobPaused += new EventHandler<EventArgs<SlaveJob, JobData>>(jobManager_JobPaused);
321      this.jobManager.JobStopped += new EventHandler<EventArgs<SlaveJob, JobData>>(jobManager_JobStopped);
322      this.jobManager.JobFailed += new EventHandler<EventArgs<Tuple<SlaveJob, JobData, Exception>>>(jobManager_JobFailed);
323      this.jobManager.ExceptionOccured += new EventHandler<EventArgs<SlaveJob, Exception>>(jobManager_ExceptionOccured);
324      this.jobManager.JobAborted += new EventHandler<EventArgs<SlaveJob>>(jobManager_JobAborted);
325    }
326
327    private void jobManager_JobStarted(object sender, EventArgs<SlaveJob> e) {
328      // successfully started, everything is good
329    }
330
331    private void jobManager_JobPaused(object sender, EventArgs<SlaveJob, JobData> e) {
332      try {
333        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
334        heartbeatManager.AwakeHeartBeatThread();
335        Job job = wcfService.GetJob(e.Value.JobId);
336        if (job == null) throw new JobNotFoundException(e.Value.JobId);
337        job.ExecutionTime = e.Value.ExecutionTime;
338        JobData jobData = e.Value.GetJobData();
339        wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Paused);
340      }
341      catch (JobNotFoundException ex) {
342        clientCom.LogMessage(ex.ToString());
343      }
344      catch (Exception ex) {
345        clientCom.LogMessage(ex.ToString());
346      }
347    }
348
349    private void jobManager_JobStopped(object sender, EventArgs<SlaveJob, JobData> e) {
350      try {
351        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
352        heartbeatManager.AwakeHeartBeatThread();
353        Job job = wcfService.GetJob(e.Value.JobId);
354        if (job == null) throw new JobNotFoundException(e.Value.JobId);
355        job.ExecutionTime = e.Value.ExecutionTime;
356        JobData jobData = e.Value.GetJobData();
357        wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Finished);
358      }
359      catch (JobNotFoundException ex) {
360        clientCom.LogMessage(ex.ToString());
361      }
362      catch (Exception ex) {
363        clientCom.LogMessage(ex.ToString());
364      }
365    }
366
367    private void jobManager_JobFailed(object sender, EventArgs<Tuple<SlaveJob, JobData, Exception>> e) {
368      try {
369        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
370        heartbeatManager.AwakeHeartBeatThread();
371        SlaveJob slaveJob = e.Value.Item1;
372        JobData jobData = e.Value.Item2;
373        Exception exception = e.Value.Item3;
374
375        Job job = wcfService.GetJob(slaveJob.JobId);
376        if (job == null) throw new JobNotFoundException(slaveJob.JobId);
377        job.ExecutionTime = slaveJob.ExecutionTime;
378        if (jobData != null) {
379          wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Failed, exception.ToString());
380        } else {
381          wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
382        }
383        clientCom.LogMessage(exception.Message);
384      }
385      catch (JobNotFoundException ex) {
386        SlaveStatusInfo.IncrementExceptionOccured();
387        clientCom.LogMessage(ex.ToString());
388      }
389      catch (Exception ex) {
390        SlaveStatusInfo.IncrementExceptionOccured();
391        clientCom.LogMessage(ex.ToString());
392      }
393    }
394
395    private void jobManager_ExceptionOccured(object sender, EventArgs<SlaveJob, Exception> e) {
396      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
397      SlaveStatusInfo.IncrementExceptionOccured();
398      heartbeatManager.AwakeHeartBeatThread();
399      clientCom.LogMessage(string.Format("Exception occured for job {0}: {1}", e.Value.JobId, e.Value2.ToString()));
400      wcfService.UpdateJobState(e.Value.JobId, JobState.Waiting, e.Value2.ToString());
401    }
402
403    private void jobManager_JobAborted(object sender, EventArgs<SlaveJob> e) {
404      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
405    }
406    #endregion
407
408    #region Log Events
409    private void log_MessageAdded(object sender, EventArgs<string> e) {
410      clientCom.LogMessage(e.Value.Split('\t')[1]);
411    }
412    #endregion
413
414    /// <summary>
415    /// aborts all running jobs, no results are sent back
416    /// </summary>
417    private void DoAbortAll() {
418      clientCom.LogMessage("Aborting all jobs.");
419      foreach (Guid jobId in jobManager.JobIds) {
420        AbortJobAsync(jobId);
421      }
422    }
423
424    /// <summary>
425    /// wait for jobs to finish, then pause client
426    /// </summary>
427    private void DoPauseAll() {
428      clientCom.LogMessage("Pausing all jobs.");
429      foreach (Guid jobId in jobManager.JobIds) {
430        PauseJobAsync(jobId);
431      }
432    }
433
434    /// <summary>
435    /// pause slave immediately
436    /// </summary>
437    private void DoStopAll() {
438      clientCom.LogMessage("Stopping all jobs.");
439      foreach (Guid jobId in jobManager.JobIds) {
440        StopJobAsync(jobId);
441      }
442    }
443
444    #region Slave Lifecycle Methods
445    /// <summary>
446    /// completly shudown slave
447    /// </summary>
448    public void Shutdown() {
449      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
450      MessageQueue.GetInstance().AddMessage(mc);
451      waitShutdownSem.WaitOne();
452    }
453
454    /// <summary>
455    /// complete shutdown, should be called before the the application is exited
456    /// </summary>
457    private void ShutdownCore() {
458      clientCom.LogMessage("Shutdown signal received");
459      clientCom.LogMessage("Stopping heartbeat");
460      heartbeatManager.StopHeartBeat();
461      abortRequested = true;
462
463      DoAbortAll();
464
465      clientCom.LogMessage("Logging out");
466      WcfService.Instance.Disconnect();
467      clientCom.Shutdown();
468      SlaveClientCom.Close();
469
470      if (slaveComm.State != CommunicationState.Closed)
471        slaveComm.Close();
472    }
473
474    /// <summary>
475    /// reinitializes everything and continues operation,
476    /// can be called after Sleep()
477    /// </summary> 
478    private void DoStartSlave() {
479      clientCom.LogMessage("Restart received");
480      configManager.Asleep = false;
481    }
482
483    /// <summary>
484    /// stop slave, except for client gui communication,
485    /// primarily used by gui if core is running as windows service
486    /// </summary>   
487    private void Sleep() {
488      clientCom.LogMessage("Sleep received - not accepting any new jobs");
489      configManager.Asleep = true;
490      DoPauseAll();
491    }
492    #endregion
493  }
494}
Note: See TracBrowser for help on using the repository browser.