Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 6451

Last change on this file since 6451 was 6381, checked in by cneumuel, 14 years ago

#1233

  • locking for childHiveJobs in OptimizerHiveJob avoid multi threaded access issues
  • added IsPrivileged to gui
  • minor changes
File size: 17.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Diagnostics;
24using System.ServiceModel;
25using System.Threading;
26using System.Threading.Tasks;
27using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
28using HeuristicLab.Common;
29using HeuristicLab.Core;
30
31
32namespace HeuristicLab.Clients.Hive.SlaveCore {
33  /// <summary>
34  /// The core component of the Hive Slave.
35  /// Handles commands sent from the Hive Server and does all webservice calls for jobs.
36  /// </summary>
37  public class Core : MarshalByRefObject {
38    private static HeartbeatManager heartbeatManager;
39    public static HeartbeatManager HeartbeatManager {
40      get { return heartbeatManager; }
41    }
42
43    public EventLog ServiceEventLog { get; set; }
44
45    private Semaphore waitShutdownSem = new Semaphore(0, 1);
46    private bool abortRequested;
47    private ISlaveCommunication clientCom;
48    private ServiceHost slaveComm;
49    private WcfService wcfService;
50    private JobManager jobManager;
51    private ConfigManager configManager;
52    private PluginManager pluginManager;
53
54    public Core() {
55      var log = new ThreadSafeLog(new Log());
56      this.pluginManager = new PluginManager(WcfService.Instance, log);
57      this.jobManager = new JobManager(pluginManager, log);
58      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
59
60      RegisterJobManagerEvents();
61
62      this.configManager = new ConfigManager(jobManager);
63      ConfigManager.Instance = this.configManager;
64    }
65
66    /// <summary>
67    /// Main method for the client
68    /// </summary>
69    public void Start() {
70      abortRequested = false;
71
72      try {
73        //start the client communication service (pipe between slave and slave gui)
74        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
75        slaveComm.Open();
76        clientCom = SlaveClientCom.Instance.ClientCom;
77
78        // delete all left over job directories
79        pluginManager.CleanPluginTemp();
80        clientCom.LogMessage("Hive Slave started");
81
82        wcfService = WcfService.Instance;
83        RegisterServiceEvents();
84
85        StartHeartbeats(); // Start heartbeats thread       
86        DispatchMessageQueue(); // dispatch messages until abortRequested
87      }
88      catch (Exception ex) {
89        if (ServiceEventLog != null) {
90          try {
91            ServiceEventLog.WriteEntry(string.Format("Hive Slave threw exception: {0} with stack trace: {1}", ex.ToString(), ex.StackTrace), EventLogEntryType.Error);
92          }
93          catch (Exception) { }
94        } else {
95          //try to log with clientCom. if this works the user sees at least a message,
96          //else an exception will be thrown anyways.
97          clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
98        }
99        ShutdownCore();
100      }
101      finally {
102        DeregisterServiceEvents();
103        waitShutdownSem.Release();
104      }
105    }
106
107    private void StartHeartbeats() {
108      //Initialize the heartbeat     
109      if (heartbeatManager == null) {
110        heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
111        heartbeatManager.StartHeartbeat();
112      }
113    }
114
115    private void DispatchMessageQueue() {
116      MessageQueue queue = MessageQueue.GetInstance();
117      while (!abortRequested) {
118        MessageContainer container = queue.GetMessage();
119        DetermineAction(container);
120        if (!abortRequested) {
121          clientCom.StatusChanged(configManager.GetStatusForClientConsole());
122        }
123      }
124    }
125
126    private void RegisterServiceEvents() {
127      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
128      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
129    }
130
131    private void DeregisterServiceEvents() {
132      WcfService.Instance.Connected -= WcfService_Connected;
133      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
134    }
135
136    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
137      clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
138    }
139
140    private void WcfService_Connected(object sender, EventArgs e) {
141      clientCom.LogMessage("Connected successfully to Hive server");
142    }
143
144    /// <summary>
145    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
146    /// </summary>
147    /// <param name="container">The container, containing the message</param>
148    private void DetermineAction(MessageContainer container) {
149      clientCom.LogMessage(string.Format("Message: {0} for job: {1} ", container.Message.ToString(), container.JobId));
150
151      if (container is ExecutorMessageContainer<Guid>) {
152        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
153        c.execute();
154      } else if (container is MessageContainer) {
155        switch (container.Message) {
156          case MessageContainer.MessageType.CalculateJob:
157            CalculateJobAsync(container.JobId);
158            break;
159          case MessageContainer.MessageType.AbortJob:
160            AbortJobAsync(container.JobId);
161            break;
162          case MessageContainer.MessageType.StopJob:
163            StopJobAsync(container.JobId);
164            break;
165          case MessageContainer.MessageType.PauseJob:
166            PauseJobAsync(container.JobId);
167            break;
168          case MessageContainer.MessageType.StopAll:
169            DoStopAll();
170            break;
171          case MessageContainer.MessageType.PauseAll:
172            DoPauseAll();
173            break;
174          case MessageContainer.MessageType.AbortAll:
175            DoAbortAll();
176            break;
177          case MessageContainer.MessageType.ShutdownSlave:
178            ShutdownCore();
179            break;
180          case MessageContainer.MessageType.Restart:
181            DoStartSlave();
182            break;
183          case MessageContainer.MessageType.Sleep:
184            Sleep();
185            break;
186          case MessageContainer.MessageType.SayHello:
187            wcfService.Connect(configManager.GetClientInfo());
188            break;
189        }
190      } else {
191        clientCom.LogMessage("Unknown MessageContainer: " + container);
192      }
193    }
194
195    private void CalculateJobAsync(Guid jobId) {
196      Task.Factory.StartNew(HandleCalculateJob, jobId)
197      .ContinueWith((t) => {
198        SlaveStatusInfo.IncrementExceptionOccured();
199        clientCom.LogMessage(t.Exception.ToString());
200      }, TaskContinuationOptions.OnlyOnFaulted);
201    }
202
203    private void StopJobAsync(Guid jobId) {
204      Task.Factory.StartNew(HandleStopJob, jobId)
205       .ContinueWith((t) => {
206         SlaveStatusInfo.IncrementExceptionOccured();
207         clientCom.LogMessage(t.Exception.ToString());
208       }, TaskContinuationOptions.OnlyOnFaulted);
209    }
210
211    private void PauseJobAsync(Guid jobId) {
212      Task.Factory.StartNew(HandlePauseJob, jobId)
213       .ContinueWith((t) => {
214         SlaveStatusInfo.IncrementExceptionOccured();
215         clientCom.LogMessage(t.Exception.ToString());
216       }, TaskContinuationOptions.OnlyOnFaulted);
217    }
218
219    private void AbortJobAsync(Guid jobId) {
220      Task.Factory.StartNew(HandleAbortJob, jobId)
221       .ContinueWith((t) => {
222         SlaveStatusInfo.IncrementExceptionOccured();
223         clientCom.LogMessage(t.Exception.ToString());
224       }, TaskContinuationOptions.OnlyOnFaulted);
225    }
226
227    private void HandleCalculateJob(object jobIdObj) {
228      Guid jobId = (Guid)jobIdObj;
229      Job job = null;
230      int usedCores = 0;
231      try {
232        job = wcfService.GetJob(jobId);
233        if (job == null) throw new JobNotFoundException(jobId);
234        if (ConfigManager.Instance.GetFreeCores() < job.CoresNeeded) throw new OutOfCoresException();
235        if (ConfigManager.GetFreeMemory() < job.MemoryNeeded) throw new OutOfMemoryException();
236        SlaveStatusInfo.IncrementUsedCores(job.CoresNeeded); usedCores = job.CoresNeeded;
237        JobData jobData = wcfService.GetJobData(jobId);
238        if (jobData == null) throw new JobDataNotFoundException(jobId);
239        job = wcfService.UpdateJobState(jobId, JobState.Calculating, null);
240        if (job == null) throw new JobNotFoundException(jobId);
241        jobManager.StartJobAsync(job, jobData);
242      }
243      catch (JobNotFoundException) {
244        SlaveStatusInfo.DecrementUsedCores(usedCores);
245        throw;
246      }
247      catch (JobDataNotFoundException) {
248        SlaveStatusInfo.DecrementUsedCores(usedCores);
249        throw;
250      }
251      catch (JobAlreadyRunningException) {
252        SlaveStatusInfo.DecrementUsedCores(usedCores);
253        throw;
254      }
255      catch (OutOfCoresException) {
256        wcfService.UpdateJobState(jobId, JobState.Waiting, "No more cores available");
257        throw;
258      }
259      catch (OutOfMemoryException) {
260        wcfService.UpdateJobState(jobId, JobState.Waiting, "No more memory available");
261        throw;
262      }
263      catch (Exception e) {
264        SlaveStatusInfo.DecrementUsedCores(usedCores);
265        wcfService.UpdateJobState(jobId, JobState.Waiting, e.ToString()); // unknown internal error - report and set waiting again
266        throw;
267      }
268    }
269
270    private void HandleStopJob(object jobIdObj) {
271      Guid jobId = (Guid)jobIdObj;
272      try {
273        Job job = wcfService.GetJob(jobId);
274        if (job == null) throw new JobNotFoundException(jobId);
275        jobManager.StopJobAsync(jobId);
276      }
277      catch (JobNotFoundException) {
278        throw;
279      }
280      catch (JobNotRunningException) {
281        throw;
282      }
283      catch (AppDomainNotCreatedException) {
284        throw;
285      }
286    }
287
288    private void HandlePauseJob(object jobIdObj) {
289      Guid jobId = (Guid)jobIdObj;
290      try {
291        Job job = wcfService.GetJob(jobId);
292        if (job == null) throw new JobNotFoundException(jobId);
293        jobManager.PauseJobAsync(jobId);
294      }
295      catch (JobNotFoundException) {
296        throw;
297      }
298      catch (JobNotRunningException) {
299        throw;
300      }
301    }
302
303    private void HandleAbortJob(object jobIdObj) {
304      Guid jobId = (Guid)jobIdObj;
305      try {
306        jobManager.AbortJob(jobId);
307      }
308      catch (JobNotFoundException) {
309        throw;
310      }
311    }
312
313    #region JobManager Events
314    private void RegisterJobManagerEvents() {
315      this.jobManager.JobStarted += new EventHandler<EventArgs<SlaveJob>>(jobManager_JobStarted);
316      this.jobManager.JobPaused += new EventHandler<EventArgs<SlaveJob, JobData>>(jobManager_JobPaused);
317      this.jobManager.JobStopped += new EventHandler<EventArgs<SlaveJob, JobData>>(jobManager_JobStopped);
318      this.jobManager.JobFailed += new EventHandler<EventArgs<Tuple<SlaveJob, JobData, Exception>>>(jobManager_JobFailed);
319      this.jobManager.ExceptionOccured += new EventHandler<EventArgs<SlaveJob, Exception>>(jobManager_ExceptionOccured);
320      this.jobManager.JobAborted += new EventHandler<EventArgs<SlaveJob>>(jobManager_JobAborted);
321    }
322
323    private void jobManager_JobStarted(object sender, EventArgs<SlaveJob> e) {
324      // successfully started, everything is good
325    }
326
327    private void jobManager_JobPaused(object sender, EventArgs<SlaveJob, JobData> e) {
328      try {
329        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
330        heartbeatManager.AwakeHeartBeatThread();
331        Job job = wcfService.GetJob(e.Value.JobId);
332        if (job == null) throw new JobNotFoundException(e.Value.JobId);
333        job.ExecutionTime = e.Value.ExecutionTime;
334        JobData jobData = e.Value.GetJobData();
335        wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Paused);
336      }
337      catch (JobNotFoundException ex) {
338        clientCom.LogMessage(ex.ToString());
339      }
340      catch (Exception ex) {
341        clientCom.LogMessage(ex.ToString());
342      }
343    }
344
345    private void jobManager_JobStopped(object sender, EventArgs<SlaveJob, JobData> e) {
346      try {
347        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
348        heartbeatManager.AwakeHeartBeatThread();
349        Job job = wcfService.GetJob(e.Value.JobId);
350        if (job == null) throw new JobNotFoundException(e.Value.JobId);
351        job.ExecutionTime = e.Value.ExecutionTime;
352        JobData jobData = e.Value.GetJobData();
353        wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Finished);
354      }
355      catch (JobNotFoundException ex) {
356        clientCom.LogMessage(ex.ToString());
357      }
358      catch (Exception ex) {
359        clientCom.LogMessage(ex.ToString());
360      }
361    }
362
363    private void jobManager_JobFailed(object sender, EventArgs<Tuple<SlaveJob, JobData, Exception>> e) {
364      try {
365        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
366        heartbeatManager.AwakeHeartBeatThread();
367        SlaveJob slaveJob = e.Value.Item1;
368        JobData jobData = e.Value.Item2;
369        Exception exception = e.Value.Item3;
370
371        Job job = wcfService.GetJob(slaveJob.JobId);
372        if (job == null) throw new JobNotFoundException(slaveJob.JobId);
373        job.ExecutionTime = slaveJob.ExecutionTime;
374        if (jobData != null) {
375          wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Failed, exception.ToString());
376        } else {
377          wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
378        }
379        clientCom.LogMessage(exception.Message);
380      }
381      catch (JobNotFoundException ex) {
382        SlaveStatusInfo.IncrementExceptionOccured();
383        clientCom.LogMessage(ex.ToString());
384      }
385      catch (Exception ex) {
386        SlaveStatusInfo.IncrementExceptionOccured();
387        clientCom.LogMessage(ex.ToString());
388      }
389    }
390
391    private void jobManager_ExceptionOccured(object sender, EventArgs<SlaveJob, Exception> e) {
392      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
393      SlaveStatusInfo.IncrementExceptionOccured();
394      heartbeatManager.AwakeHeartBeatThread();
395      clientCom.LogMessage(string.Format("Exception occured for job {0}: {1}", e.Value.JobId, e.Value2.ToString()));
396    }
397
398    private void jobManager_JobAborted(object sender, EventArgs<SlaveJob> e) {
399      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
400    }
401    #endregion
402
403    #region Log Events
404    private void log_MessageAdded(object sender, EventArgs<string> e) {
405      clientCom.LogMessage(e.Value.Split('\t')[1]);
406      ((ILog)sender).Clear(); // don't let the log take up memory
407    }
408    #endregion
409
410    /// <summary>
411    /// aborts all running jobs, no results are sent back
412    /// </summary>
413    private void DoAbortAll() {
414      clientCom.LogMessage("Aborting all jobs.");
415      foreach (Guid jobId in jobManager.JobIds) {
416        AbortJobAsync(jobId);
417      }
418    }
419
420    /// <summary>
421    /// wait for jobs to finish, then pause client
422    /// </summary>
423    private void DoPauseAll() {
424      clientCom.LogMessage("Pausing all jobs.");
425      foreach (Guid jobId in jobManager.JobIds) {
426        PauseJobAsync(jobId);
427      }
428    }
429
430    /// <summary>
431    /// pause slave immediately
432    /// </summary>
433    private void DoStopAll() {
434      clientCom.LogMessage("Stopping all jobs.");
435      foreach (Guid jobId in jobManager.JobIds) {
436        StopJobAsync(jobId);
437      }
438    }
439
440    #region Slave Lifecycle Methods
441    /// <summary>
442    /// completly shudown slave
443    /// </summary>
444    public void Shutdown() {
445      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
446      MessageQueue.GetInstance().AddMessage(mc);
447      waitShutdownSem.WaitOne();
448    }
449
450    /// <summary>
451    /// complete shutdown, should be called before the the application is exited
452    /// </summary>
453    private void ShutdownCore() {
454      clientCom.LogMessage("Shutdown signal received");
455      clientCom.LogMessage("Stopping heartbeat");
456      heartbeatManager.StopHeartBeat();
457      abortRequested = true;
458
459      DoAbortAll();
460
461      clientCom.LogMessage("Logging out");
462      WcfService.Instance.Disconnect();
463      clientCom.Shutdown();
464      SlaveClientCom.Close();
465
466      if (slaveComm.State != CommunicationState.Closed)
467        slaveComm.Close();
468    }
469
470    /// <summary>
471    /// reinitializes everything and continues operation,
472    /// can be called after Sleep()
473    /// </summary> 
474    private void DoStartSlave() {
475      clientCom.LogMessage("Restart received");
476      configManager.Asleep = false;
477    }
478
479    /// <summary>
480    /// stop slave, except for client gui communication,
481    /// primarily used by gui if core is running as windows service
482    /// </summary>   
483    private void Sleep() {
484      clientCom.LogMessage("Sleep received - not accepting any new jobs");
485      configManager.Asleep = true;
486      DoPauseAll();
487    }
488    #endregion
489  }
490}
Note: See TracBrowser for help on using the repository browser.