Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5526

Last change on this file since 5526 was 5526, checked in by cneumuel, 14 years ago

#1233

  • fixed handling of StateLog in DataLayer
  • extended unit tests
  • changed style of service calls to OKB-like style (using delegates)
  • added possibility that parent jobs can be finished immediately when child jobs are finished
File size: 19.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.CompilerServices;
26using System.ServiceModel;
27using System.Threading;
28using HeuristicLab.Clients.Hive.Slave.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using HeuristicLab.Services.Hive.Common;
32using HeuristicLab.Services.Hive.Common.DataTransfer;
33
34
35namespace HeuristicLab.Clients.Hive.Slave {
36  /// <summary>
37  /// The core component of the Hive Client
38  /// </summary>
39  public class Core : MarshalByRefObject {
40
41    //TODO: this class should be a singleton; there is only one instance, the reference is meanwhile save in TheCore
42    public static Core TheCore;
43
44    public static bool abortRequested { get; set; }
45    private Semaphore waitShutdownSem = new Semaphore(0, 1);
46    public static ILog Log { get; set; }
47
48    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
49    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
50    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
51
52    private WcfService wcfService;
53    private HeartbeatManager heartbeatManager;
54    private int coreThreadId;
55
56    private ISlaveCommunication ClientCom;
57    private ServiceHost slaveComm;
58
59    public Dictionary<Guid, Executor> ExecutionEngines {
60      get { return engines; }
61    }
62
63    internal Dictionary<Guid, Job> Jobs {
64      get { return jobs; }
65    }
66
67    public Core() {
68      TheCore = this;
69    }
70
71    /// <summary>
72    /// Main Method for the client
73    /// </summary>
74    public void Start() {
75      coreThreadId = Thread.CurrentThread.ManagedThreadId;
76      abortRequested = false;
77
78      //start the client communication service (pipe between slave and slave gui)
79      slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
80      slaveComm.Open();
81
82      ClientCom = SlaveClientCom.Instance.ClientCom;
83      ClientCom.LogMessage("Hive Slave started");
84
85      ConfigManager manager = ConfigManager.Instance;
86      manager.Core = this;
87
88      wcfService = WcfService.Instance;
89      RegisterServiceEvents();
90
91      StartHeartbeats(); // Start heartbeats thread
92      DispatchMessageQueue(); // dispatch messages until abortRequested
93
94      DeRegisterServiceEvents();
95      waitShutdownSem.Release();
96    }
97
98    private void StartHeartbeats() {
99      //Initialize the heartbeat     
100      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
101      heartbeatManager.StartHeartbeat();
102    }
103
104    private void DispatchMessageQueue() {
105      MessageQueue queue = MessageQueue.GetInstance();
106      while (!abortRequested) {
107        MessageContainer container = queue.GetMessage();
108        DetermineAction(container);
109      }
110    }
111
112    private void RegisterServiceEvents() {
113      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
114      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
115    }
116
117    private void DeRegisterServiceEvents() {
118      WcfService.Instance.Connected -= WcfService_Connected;
119      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
120    }
121
122    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
123      ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
124    }
125
126    void WcfService_Connected(object sender, EventArgs e) {
127      ClientCom.LogMessage("Connected successfully to Hive server");
128    }
129
130    /// <summary>
131    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
132    /// </summary>
133    /// <param name="container">The Container, containing the message</param>
134    private void DetermineAction(MessageContainer container) {
135      ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
136      //TODO: find a better solution
137      if (container is ExecutorMessageContainer<Guid>) {
138        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
139        c.execute();
140      } else if (container is MessageContainer) {
141        switch (container.Message) {
142          case MessageContainer.MessageType.CalculateJob:
143            Job myJob = wcfService.GetJob(container.JobId);
144            //TODO: handle in own thread!!
145            JobData jobData = wcfService.GetJobData(myJob.Id);
146            StartJobInAppDomain(myJob, jobData);
147            break;
148          case MessageContainer.MessageType.ShutdownSlave:
149            ShutdownCore();
150            break;
151          case MessageContainer.MessageType.StopAll:
152            DoStopAll();
153            break;
154          case MessageContainer.MessageType.PauseAll:
155            DoPauseAll();
156            break;
157          case MessageContainer.MessageType.AbortAll:
158            DoAbortAll();
159            break;
160          case MessageContainer.MessageType.AbortJob:
161            KillAppDomain(container.JobId);
162            break;
163          case MessageContainer.MessageType.StopJob:
164            DoStopJob(container.JobId);
165            break;
166          case MessageContainer.MessageType.PauseJob:
167            DoPauseJob(container.JobId);
168            break;
169          case MessageContainer.MessageType.Restart:
170            DoStartSlave();
171            break;
172          case MessageContainer.MessageType.Sleep:
173            Sleep();
174            break;
175        }
176      } else {
177        ClientCom.LogMessage("Unknown MessageContainer: " + container);
178      }
179    }
180
181    private void DoPauseJob(Guid guid) {
182      Job job = Jobs[guid];
183
184      if (job != null) {
185        engines[job.Id].Pause();
186        JobData sJob = engines[job.Id].GetFinishedJob();
187        // job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is paused
188        job.ExecutionTime = engines[job.Id].ExecutionTime;
189
190        try {
191          ClientCom.LogMessage("Sending the paused job with id: " + job.Id);
192          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id);
193          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
194        }
195        catch (Exception e) {
196          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
197        }
198        finally {
199          KillAppDomain(job.Id); // kill app-domain in every case         
200        }
201      }
202    }
203
204    private void DoStopJob(Guid guid) {
205      Job job = Jobs[guid];
206
207      if (job != null) {
208        engines[job.Id].Stop();
209        JobData sJob = engines[job.Id].GetFinishedJob();
210        // job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is stopped regularly
211        job.ExecutionTime = engines[job.Id].ExecutionTime;
212
213        try {
214          ClientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
215          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id);
216          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
217        }
218        catch (Exception e) {
219          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
220        }
221        finally {
222          KillAppDomain(job.Id); // kill app-domain in every case         
223        }
224      }
225    }
226
227    /// <summary>
228    /// aborts all running jobs, no results are sent back
229    /// </summary>
230    private void DoAbortAll() {
231      List<Guid> guids = new List<Guid>();
232      foreach (Guid job in Jobs.Keys) {
233        guids.Add(job);
234      }
235
236      foreach (Guid g in guids) {
237        KillAppDomain(g);
238      }
239
240      ClientCom.LogMessage("Aborted all jobs!");
241    }
242
243    /// <summary>
244    /// wait for jobs to finish, then pause client
245    /// </summary>
246    private void DoPauseAll() {
247      ClientCom.LogMessage("Pause all received");
248
249      //copy guids because there will be removed items from 'Jobs'
250      List<Guid> guids = new List<Guid>();
251      foreach (Guid job in Jobs.Keys) {
252        guids.Add(job);
253      }
254
255      foreach (Guid g in guids) {
256        DoPauseJob(g);
257      }
258    }
259
260    /// <summary>
261    /// pause slave immediately
262    /// </summary>
263    private void DoStopAll() {
264      ClientCom.LogMessage("Stop all received");
265
266      //copy guids because there will be removed items from 'Jobs'
267      List<Guid> guids = new List<Guid>();
268      foreach (Guid job in Jobs.Keys) {
269        guids.Add(job);
270      }
271
272      foreach (Guid g in guids) {
273        DoStopJob(g);
274      }
275    }
276
277    /// <summary>
278    /// completly shudown slave
279    /// </summary>
280    public void Shutdown() {
281      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
282      MessageQueue.GetInstance().AddMessage(mc);
283      waitShutdownSem.WaitOne();
284    }
285
286    /// <summary>
287    /// complete shutdown, should be called before the the application is exited
288    /// </summary>
289    private void ShutdownCore() {
290      ClientCom.LogMessage("Shutdown Signal received");
291      ClientCom.LogMessage("Stopping heartbeat");
292      heartbeatManager.StopHeartBeat();
293      abortRequested = true;
294      ClientCom.LogMessage("Logging out");
295
296
297      lock (engines) {
298        ClientCom.LogMessage("engines locked");
299        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
300          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
301          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
302          AppDomain.Unload(kvp.Value);
303        }
304      }
305      WcfService.Instance.Disconnect();
306      ClientCom.Shutdown();
307      SlaveClientCom.Close();
308
309      if (slaveComm.State != CommunicationState.Closed)
310        slaveComm.Close();
311    }
312
313    /// <summary>
314    /// reinitializes everything and continues operation,
315    /// can be called after Sleep()
316    /// </summary> 
317    private void DoStartSlave() {
318      ClientCom.LogMessage("Restart received");
319      StartHeartbeats();
320      ClientCom.LogMessage("Restart done");
321    }
322
323    /// <summary>
324    /// stop slave, except for client gui communication,
325    /// primarily used by gui if core is running as windows service
326    /// </summary>
327    //TODO: do we need an AbortSleep?
328    private void Sleep() {
329      ClientCom.LogMessage("Sleep received");
330      heartbeatManager.StopHeartBeat();
331      DoStopAll();
332      WcfService.Instance.Disconnect();
333      ClientCom.LogMessage("Sleep done");
334    }
335
336    /// <summary>
337    /// Pauses a job, which means sending it to the server and killing it locally;
338    /// atm only used when executor is waiting for child jobs
339    /// </summary>
340    /// <param name="data"></param>
341    [MethodImpl(MethodImplOptions.Synchronized)]
342    public void PauseWaitJob(JobData data) {
343      if (!Jobs.ContainsKey(data.JobId)) {
344        ClientCom.LogMessage("Can't find job with id " + data.JobId);
345      } else {
346        Job job = Jobs[data.JobId];
347        job.SetState(JobState.Transferring);
348        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id);
349        job.SetState(JobState.Waiting); // todo: what if it was a ResumeOnChildJobsFinished job before? maybe look into StateLog
350        wcfService.UpdateJob(job);
351      }
352      KillAppDomain(data.JobId);
353    }
354
355    /// <summary>
356    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
357    /// once the connection gets reestablished, the job gets submitted
358    /// </summary>
359    /// <param name="jobId"></param>
360    [MethodImpl(MethodImplOptions.Synchronized)]
361    public void SendFinishedJob(Guid jobId) {
362      try {
363        ClientCom.LogMessage("Getting the finished job with id: " + jobId);
364        if (!engines.ContainsKey(jobId)) {
365          ClientCom.LogMessage("Engine doesn't exist");
366          return;
367        }
368        if (!jobs.ContainsKey(jobId)) {
369          ClientCom.LogMessage("Job doesn't exist");
370          return;
371        }
372        Job cJob = jobs[jobId];
373        cJob.State = JobState.Finished; // TODO: what if failed?
374        cJob.ExecutionTime = engines[jobId].ExecutionTime;
375
376        JobData sJob = engines[jobId].GetFinishedJob();
377        // cJob.Exception = engines[jId].CurrentException; // can there be an exception if the job is sent normally. the exception should be entered in the statelog with the corresponding state (Failed)
378        cJob.ExecutionTime = engines[jobId].ExecutionTime;
379
380        try {
381          ClientCom.LogMessage("Sending the finished job with id: " + jobId);
382          wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id);
383          SlaveStatusInfo.JobsProcessed++;
384        }
385        catch (Exception e) {
386          ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
387        }
388        finally {
389          KillAppDomain(jobId); // kill app-domain in every case
390          heartbeatManager.AwakeHeartBeatThread();
391        }
392      }
393      catch (Exception e) {
394        OnExceptionOccured(e);
395      }
396    }
397
398    /// <summary>
399    /// A new Job from the wcfService has been received and will be started within a AppDomain.
400    /// </summary>
401    /// <param name="sender"></param>
402    /// <param name="e"></param>
403    private void StartJobInAppDomain(Job myJob, JobData jobData) {
404      ClientCom.LogMessage("Received new job with id " + myJob.Id);
405      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
406      bool pluginsPrepared = false;
407      string configFileName = string.Empty;
408
409      try {
410        PluginCache.Instance.PreparePlugins(myJob, out configFileName);
411        ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
412        pluginsPrepared = true;
413      }
414      catch (Exception exception) {
415        ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
416      }
417
418      if (pluginsPrepared) {
419        try {
420          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
421          appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
422          lock (engines) {
423            if (!jobs.ContainsKey(myJob.Id)) {
424              jobs.Add(myJob.Id, myJob);
425              appDomains.Add(myJob.Id, appDomain);
426              ClientCom.LogMessage("Creating AppDomain");
427              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
428              ClientCom.LogMessage("Created AppDomain");
429              engine.JobId = myJob.Id;
430              engine.Core = this;
431              ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
432              engines.Add(myJob.Id, engine);
433              engine.Start(jobData.Data);
434              SlaveStatusInfo.JobsFetched++;
435              ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
436            }
437          }
438          heartbeatManager.AwakeHeartBeatThread();
439        }
440        catch (Exception exception) {
441          ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
442          ClientCom.LogMessage("Error thrown is: " + exception.ToString());
443          KillAppDomain(myJob.Id);
444        }
445      }
446    }
447
448    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
449    private void OnExceptionOccured(Exception e) {
450      ClientCom.LogMessage("Error: " + e.ToString());
451      var handler = ExceptionOccured;
452      if (handler != null) handler(this, new EventArgs<Exception>(e));
453    }
454
455    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
456      ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
457      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
458    }
459
460    /// <summary>
461    /// Enqueues messages from the executor to the message queue.
462    /// This is necessary if the core thread has to execute certain actions, e.g.
463    /// killing of an app domain.
464    /// </summary>
465    /// <typeparam name="T"></typeparam>
466    /// <param name="action"></param>
467    /// <param name="parameter"></param>
468    /// <returns>true if the calling method can continue execution, else false</returns>
469    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
470      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
471      container.Callback = action;
472      container.CallbackParameter = parameter;
473      MessageQueue.GetInstance().AddMessage(container);
474    }
475
476    /// <summary>
477    /// Kill a appdomain with a specific id.
478    /// </summary>
479    /// <param name="id">the GUID of the job</param>
480    //[MethodImpl(MethodImplOptions.Synchronized)]
481    public void KillAppDomain(Guid id) {
482      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
483        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
484        return;
485      }
486
487      ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
488      lock (engines) {
489        try {
490          if (engines.ContainsKey(id)) {
491            engines[id].Dispose();
492            engines.Remove(id);
493          }
494
495          if (appDomains.ContainsKey(id)) {
496            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
497
498            int repeat = 5;
499            while (repeat > 0) {
500              try {
501                AppDomain.Unload(appDomains[id]);
502                repeat = 0;
503              }
504              catch (CannotUnloadAppDomainException) {
505                ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
506                Thread.Sleep(1000);
507                repeat--;
508                if (repeat == 0) {
509                  throw; // rethrow and let app crash
510                }
511              }
512            }
513            appDomains.Remove(id);
514          }
515
516          jobs.Remove(id);
517          PluginCache.Instance.DeletePluginsForJob(id);
518          GC.Collect();
519        }
520        catch (Exception ex) {
521          ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
522        }
523      }
524      GC.Collect();
525    }
526  }
527}
Note: See TracBrowser for help on using the repository browser.