Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5721

Last change on this file since 5721 was 5721, checked in by ascheibe, 13 years ago

#1233 worked on slave and slave service installer

File size: 19.4 KB
RevLine 
[5105]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
[5137]25using System.Runtime.CompilerServices;
[5280]26using System.ServiceModel;
[5105]27using System.Threading;
[5721]28using System.Threading.Tasks;
[5599]29using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
[5105]30using HeuristicLab.Common;
31using HeuristicLab.Core;
32
33
[5599]34
35namespace HeuristicLab.Clients.Hive.SlaveCore {
[5105]36  /// <summary>
37  /// The core component of the Hive Client
38  /// </summary>
39  public class Core : MarshalByRefObject {
[5315]40
41    //TODO: this class should be a singleton; there is only one instance, the reference is meanwhile save in TheCore
[5450]42    public static Core TheCore;
[5315]43
[5105]44    public static bool abortRequested { get; set; }
[5280]45    private Semaphore waitShutdownSem = new Semaphore(0, 1);
[5105]46    public static ILog Log { get; set; }
47
48    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
49    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
50    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
51
52    private WcfService wcfService;
[5156]53    private HeartbeatManager heartbeatManager;
[5137]54    private int coreThreadId;
[5105]55
[5156]56    private ISlaveCommunication ClientCom;
[5280]57    private ServiceHost slaveComm;
[5156]58
[5105]59    public Dictionary<Guid, Executor> ExecutionEngines {
60      get { return engines; }
61    }
62
[5137]63    internal Dictionary<Guid, Job> Jobs {
[5105]64      get { return jobs; }
65    }
66
[5137]67    public Core() {
[5450]68      TheCore = this;
[5137]69    }
70
[5105]71    /// <summary>
72    /// Main Method for the client
73    /// </summary>
74    public void Start() {
[5280]75      coreThreadId = Thread.CurrentThread.ManagedThreadId;
[5105]76      abortRequested = false;
77
[5280]78      //start the client communication service (pipe between slave and slave gui)
79      slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
80      slaveComm.Open();
81
[5156]82      ClientCom = SlaveClientCom.Instance.ClientCom;
83      ClientCom.LogMessage("Hive Slave started");
84
[5105]85      ConfigManager manager = ConfigManager.Instance;
86      manager.Core = this;
87
88      wcfService = WcfService.Instance;
89      RegisterServiceEvents();
90
91      StartHeartbeats(); // Start heartbeats thread
92      DispatchMessageQueue(); // dispatch messages until abortRequested
93
94      DeRegisterServiceEvents();
[5280]95      waitShutdownSem.Release();
[5105]96    }
97
98    private void StartHeartbeats() {
[5137]99      //Initialize the heartbeat     
[5105]100      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
101      heartbeatManager.StartHeartbeat();
102    }
103
104    private void DispatchMessageQueue() {
105      MessageQueue queue = MessageQueue.GetInstance();
106      while (!abortRequested) {
107        MessageContainer container = queue.GetMessage();
108        DetermineAction(container);
109      }
110    }
111
112    private void RegisterServiceEvents() {
[5472]113      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
114      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
[5105]115    }
116
117    private void DeRegisterServiceEvents() {
[5472]118      WcfService.Instance.Connected -= WcfService_Connected;
119      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
[5105]120    }
121
[5472]122    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
[5156]123      ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
124    }
125
[5472]126    void WcfService_Connected(object sender, EventArgs e) {
[5156]127      ClientCom.LogMessage("Connected successfully to Hive server");
128    }
129
[5105]130    /// <summary>
131    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
132    /// </summary>
133    /// <param name="container">The Container, containing the message</param>
134    private void DetermineAction(MessageContainer container) {
[5156]135      ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
[5721]136
[5137]137      if (container is ExecutorMessageContainer<Guid>) {
138        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
139        c.execute();
140      } else if (container is MessageContainer) {
141        switch (container.Message) {
[5404]142          case MessageContainer.MessageType.CalculateJob:
[5721]143            Task.Factory.StartNew(() => {
144              Job job = wcfService.GetJob(container.JobId);
145              lock (engines) {
146                if (!jobs.ContainsKey(job.Id)) {
147                  jobs.Add(job.Id, job);
148                }
149              }
150              JobData jobData = wcfService.GetJobData(job.Id);
151              job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
152              StartJobInAppDomain(job, jobData);
153            });
[5137]154            break;
155          case MessageContainer.MessageType.ShutdownSlave:
156            ShutdownCore();
157            break;
[5450]158          case MessageContainer.MessageType.StopAll:
159            DoStopAll();
[5314]160            break;
[5450]161          case MessageContainer.MessageType.PauseAll:
162            DoPauseAll();
[5314]163            break;
[5450]164          case MessageContainer.MessageType.AbortAll:
165            DoAbortAll();
166            break;
167          case MessageContainer.MessageType.AbortJob:
168            KillAppDomain(container.JobId);
169            break;
170          case MessageContainer.MessageType.StopJob:
171            DoStopJob(container.JobId);
172            break;
173          case MessageContainer.MessageType.PauseJob:
174            DoPauseJob(container.JobId);
175            break;
[5314]176          case MessageContainer.MessageType.Restart:
[5450]177            DoStartSlave();
[5314]178            break;
[5451]179          case MessageContainer.MessageType.Sleep:
180            Sleep();
181            break;
[5541]182          case MessageContainer.MessageType.SayHello:
183            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
184            break;
[5137]185        }
186      } else {
[5156]187        ClientCom.LogMessage("Unknown MessageContainer: " + container);
[5105]188      }
189    }
190
[5636]191    private void DoPauseJob(Guid jobId) {
192      Job job = Jobs[jobId];
[5314]193
[5450]194      if (job != null) {
195        engines[job.Id].Pause();
196        JobData sJob = engines[job.Id].GetFinishedJob();
[5511]197        // job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is paused
[5450]198        job.ExecutionTime = engines[job.Id].ExecutionTime;
[5314]199
[5450]200        try {
201          ClientCom.LogMessage("Sending the paused job with id: " + job.Id);
[5636]202          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
[5450]203          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
204        }
205        catch (Exception e) {
206          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
207        }
208        finally {
209          KillAppDomain(job.Id); // kill app-domain in every case         
210        }
211      }
[5314]212    }
213
[5450]214    private void DoStopJob(Guid guid) {
215      Job job = Jobs[guid];
[5314]216
[5450]217      if (job != null) {
218        engines[job.Id].Stop();
[5314]219        JobData sJob = engines[job.Id].GetFinishedJob();
[5511]220        // job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is stopped regularly
[5314]221        job.ExecutionTime = engines[job.Id].ExecutionTime;
222
223        try {
[5450]224          ClientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
[5636]225          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
[5314]226          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
227        }
228        catch (Exception e) {
229          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
230        }
231        finally {
232          KillAppDomain(job.Id); // kill app-domain in every case         
233        }
234      }
[5450]235    }
[5314]236
[5450]237    /// <summary>
238    /// aborts all running jobs, no results are sent back
239    /// </summary>
240    private void DoAbortAll() {
241      List<Guid> guids = new List<Guid>();
242      foreach (Guid job in Jobs.Keys) {
243        guids.Add(job);
244      }
245
246      foreach (Guid g in guids) {
247        KillAppDomain(g);
248      }
249
250      ClientCom.LogMessage("Aborted all jobs!");
[5314]251    }
252
253    /// <summary>
[5450]254    /// wait for jobs to finish, then pause client
[5314]255    /// </summary>
[5450]256    private void DoPauseAll() {
257      ClientCom.LogMessage("Pause all received");
258
259      //copy guids because there will be removed items from 'Jobs'
260      List<Guid> guids = new List<Guid>();
261      foreach (Guid job in Jobs.Keys) {
262        guids.Add(job);
263      }
264
265      foreach (Guid g in guids) {
266        DoPauseJob(g);
267      }
[5314]268    }
269
[5450]270    /// <summary>
271    /// pause slave immediately
272    /// </summary>
273    private void DoStopAll() {
274      ClientCom.LogMessage("Stop all received");
[5314]275
[5450]276      //copy guids because there will be removed items from 'Jobs'
277      List<Guid> guids = new List<Guid>();
278      foreach (Guid job in Jobs.Keys) {
279        guids.Add(job);
[5314]280      }
[5450]281
282      foreach (Guid g in guids) {
283        DoStopJob(g);
284      }
[5314]285    }
286
[5450]287    /// <summary>
288    /// completly shudown slave
289    /// </summary>
[5280]290    public void Shutdown() {
291      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
292      MessageQueue.GetInstance().AddMessage(mc);
293      waitShutdownSem.WaitOne();
294    }
295
[5314]296    /// <summary>
[5450]297    /// complete shutdown, should be called before the the application is exited
[5314]298    /// </summary>
[5280]299    private void ShutdownCore() {
[5156]300      ClientCom.LogMessage("Shutdown Signal received");
301      ClientCom.LogMessage("Stopping heartbeat");
[5105]302      heartbeatManager.StopHeartBeat();
[5137]303      abortRequested = true;
[5156]304      ClientCom.LogMessage("Logging out");
[5105]305
[5280]306
[5105]307      lock (engines) {
[5156]308        ClientCom.LogMessage("engines locked");
[5105]309        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
[5156]310          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
[5472]311          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
[5105]312          AppDomain.Unload(kvp.Value);
313        }
314      }
[5137]315      WcfService.Instance.Disconnect();
[5156]316      ClientCom.Shutdown();
317      SlaveClientCom.Close();
[5280]318
319      if (slaveComm.State != CommunicationState.Closed)
320        slaveComm.Close();
[5105]321    }
322
[5137]323    /// <summary>
[5450]324    /// reinitializes everything and continues operation,
325    /// can be called after Sleep()
326    /// </summary> 
327    private void DoStartSlave() {
328      ClientCom.LogMessage("Restart received");
329      StartHeartbeats();
330      ClientCom.LogMessage("Restart done");
331    }
332
333    /// <summary>
334    /// stop slave, except for client gui communication,
335    /// primarily used by gui if core is running as windows service
336    /// </summary>
337    //TODO: do we need an AbortSleep?
338    private void Sleep() {
339      ClientCom.LogMessage("Sleep received");
340      heartbeatManager.StopHeartBeat();
341      DoStopAll();
342      WcfService.Instance.Disconnect();
343      ClientCom.LogMessage("Sleep done");
344    }
345
346    /// <summary>
[5137]347    /// Pauses a job, which means sending it to the server and killing it locally;
348    /// atm only used when executor is waiting for child jobs
349    /// </summary>
350    /// <param name="data"></param>
351    [MethodImpl(MethodImplOptions.Synchronized)]
[5450]352    public void PauseWaitJob(JobData data) {
[5137]353      if (!Jobs.ContainsKey(data.JobId)) {
[5156]354        ClientCom.LogMessage("Can't find job with id " + data.JobId);
[5137]355      } else {
356        Job job = Jobs[data.JobId];
[5636]357        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
[5718]358        wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
[5105]359      }
[5137]360      KillAppDomain(data.JobId);
361    }
[5105]362
363    /// <summary>
364    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
365    /// once the connection gets reestablished, the job gets submitted
366    /// </summary>
367    /// <param name="jobId"></param>
[5137]368    [MethodImpl(MethodImplOptions.Synchronized)]
[5469]369    public void SendFinishedJob(Guid jobId) {
[5105]370      try {
[5469]371        ClientCom.LogMessage("Getting the finished job with id: " + jobId);
372        if (!engines.ContainsKey(jobId)) {
[5156]373          ClientCom.LogMessage("Engine doesn't exist");
[5105]374          return;
[5137]375        }
[5469]376        if (!jobs.ContainsKey(jobId)) {
[5156]377          ClientCom.LogMessage("Job doesn't exist");
[5105]378          return;
379        }
[5469]380        Job cJob = jobs[jobId];
381        cJob.ExecutionTime = engines[jobId].ExecutionTime;
[5105]382
[5512]383        JobData sJob = engines[jobId].GetFinishedJob();
[5511]384        // cJob.Exception = engines[jId].CurrentException; // can there be an exception if the job is sent normally. the exception should be entered in the statelog with the corresponding state (Failed)
[5512]385        cJob.ExecutionTime = engines[jobId].ExecutionTime;
[5137]386
[5105]387        try {
[5469]388          ClientCom.LogMessage("Sending the finished job with id: " + jobId);
[5636]389          wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
[5137]390          SlaveStatusInfo.JobsProcessed++;
[5105]391        }
392        catch (Exception e) {
[5469]393          ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
[5105]394        }
395        finally {
[5469]396          KillAppDomain(jobId); // kill app-domain in every case
[5105]397          heartbeatManager.AwakeHeartBeatThread();
398        }
399      }
400      catch (Exception e) {
401        OnExceptionOccured(e);
402      }
403    }
[5137]404
[5105]405    /// <summary>
406    /// A new Job from the wcfService has been received and will be started within a AppDomain.
407    /// </summary>
408    /// <param name="sender"></param>
409    /// <param name="e"></param>
[5137]410    private void StartJobInAppDomain(Job myJob, JobData jobData) {
[5156]411      ClientCom.LogMessage("Received new job with id " + myJob.Id);
[5137]412      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
413      bool pluginsPrepared = false;
[5458]414      string configFileName = string.Empty;
[5105]415
[5137]416      try {
[5458]417        PluginCache.Instance.PreparePlugins(myJob, out configFileName);
[5156]418        ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
[5137]419        pluginsPrepared = true;
420      }
421      catch (Exception exception) {
[5156]422        ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
[5137]423      }
424
425      if (pluginsPrepared) {
426        try {
[5458]427          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
[5472]428          appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
[5137]429          lock (engines) {
[5721]430            appDomains.Add(myJob.Id, appDomain);
431            ClientCom.LogMessage("Creating AppDomain");
432            Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
433            ClientCom.LogMessage("Created AppDomain");
434            engine.JobId = myJob.Id;
435            engine.Core = this;
436            ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
437            engines.Add(myJob.Id, engine);
438            engine.Start(jobData.Data);
439            SlaveStatusInfo.JobsFetched++;
440            ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
[5137]441          }
[5105]442        }
443        catch (Exception exception) {
[5156]444          ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
445          ClientCom.LogMessage("Error thrown is: " + exception.ToString());
[5137]446          KillAppDomain(myJob.Id);
[5105]447        }
[5137]448      }
[5721]449      heartbeatManager.AwakeHeartBeatThread();
[5137]450    }
[5105]451
452    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
453    private void OnExceptionOccured(Exception e) {
[5156]454      ClientCom.LogMessage("Error: " + e.ToString());
[5105]455      var handler = ExceptionOccured;
456      if (handler != null) handler(this, new EventArgs<Exception>(e));
457    }
458
[5472]459    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
[5156]460      ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
[5105]461      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
462    }
463
464    /// <summary>
[5137]465    /// Enqueues messages from the executor to the message queue.
466    /// This is necessary if the core thread has to execute certain actions, e.g.
467    /// killing of an app domain.
468    /// </summary>
469    /// <typeparam name="T"></typeparam>
470    /// <param name="action"></param>
471    /// <param name="parameter"></param>
472    /// <returns>true if the calling method can continue execution, else false</returns>
[5469]473    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
474      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
475      container.Callback = action;
476      container.CallbackParameter = parameter;
477      MessageQueue.GetInstance().AddMessage(container);
[5137]478    }
479
480    /// <summary>
[5105]481    /// Kill a appdomain with a specific id.
482    /// </summary>
483    /// <param name="id">the GUID of the job</param>
[5469]484    //[MethodImpl(MethodImplOptions.Synchronized)]
[5105]485    public void KillAppDomain(Guid id) {
[5469]486      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
487        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
488        return;
489      }
[5105]490
[5469]491      ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
492      lock (engines) {
493        try {
494          if (engines.ContainsKey(id)) {
495            engines[id].Dispose();
496            engines.Remove(id);
497          }
[5137]498
[5469]499          if (appDomains.ContainsKey(id)) {
[5472]500            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
[5469]501
502            int repeat = 5;
503            while (repeat > 0) {
504              try {
505                AppDomain.Unload(appDomains[id]);
506                repeat = 0;
507              }
508              catch (CannotUnloadAppDomainException) {
509                ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
510                Thread.Sleep(1000);
511                repeat--;
512                if (repeat == 0) {
513                  throw; // rethrow and let app crash
[5105]514                }
515              }
516            }
[5469]517            appDomains.Remove(id);
518          }
[5137]519
[5469]520          jobs.Remove(id);
521          PluginCache.Instance.DeletePluginsForJob(id);
522          GC.Collect();
[5105]523        }
[5469]524        catch (Exception ex) {
525          ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
526        }
[5105]527      }
[5469]528      GC.Collect();
[5137]529    }
530  }
[5105]531}
Note: See TracBrowser for help on using the repository browser.