Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5721

Last change on this file since 5721 was 5721, checked in by ascheibe, 14 years ago

#1233 worked on slave and slave service installer

File size: 19.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.CompilerServices;
26using System.ServiceModel;
27using System.Threading;
28using System.Threading.Tasks;
29using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32
33
34
35namespace HeuristicLab.Clients.Hive.SlaveCore {
36  /// <summary>
37  /// The core component of the Hive Client
38  /// </summary>
39  public class Core : MarshalByRefObject {
40
41    //TODO: this class should be a singleton; there is only one instance, the reference is meanwhile save in TheCore
42    public static Core TheCore;
43
44    public static bool abortRequested { get; set; }
45    private Semaphore waitShutdownSem = new Semaphore(0, 1);
46    public static ILog Log { get; set; }
47
48    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
49    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
50    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
51
52    private WcfService wcfService;
53    private HeartbeatManager heartbeatManager;
54    private int coreThreadId;
55
56    private ISlaveCommunication ClientCom;
57    private ServiceHost slaveComm;
58
59    public Dictionary<Guid, Executor> ExecutionEngines {
60      get { return engines; }
61    }
62
63    internal Dictionary<Guid, Job> Jobs {
64      get { return jobs; }
65    }
66
67    public Core() {
68      TheCore = this;
69    }
70
71    /// <summary>
72    /// Main Method for the client
73    /// </summary>
74    public void Start() {
75      coreThreadId = Thread.CurrentThread.ManagedThreadId;
76      abortRequested = false;
77
78      //start the client communication service (pipe between slave and slave gui)
79      slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
80      slaveComm.Open();
81
82      ClientCom = SlaveClientCom.Instance.ClientCom;
83      ClientCom.LogMessage("Hive Slave started");
84
85      ConfigManager manager = ConfigManager.Instance;
86      manager.Core = this;
87
88      wcfService = WcfService.Instance;
89      RegisterServiceEvents();
90
91      StartHeartbeats(); // Start heartbeats thread
92      DispatchMessageQueue(); // dispatch messages until abortRequested
93
94      DeRegisterServiceEvents();
95      waitShutdownSem.Release();
96    }
97
98    private void StartHeartbeats() {
99      //Initialize the heartbeat     
100      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
101      heartbeatManager.StartHeartbeat();
102    }
103
104    private void DispatchMessageQueue() {
105      MessageQueue queue = MessageQueue.GetInstance();
106      while (!abortRequested) {
107        MessageContainer container = queue.GetMessage();
108        DetermineAction(container);
109      }
110    }
111
112    private void RegisterServiceEvents() {
113      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
114      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
115    }
116
117    private void DeRegisterServiceEvents() {
118      WcfService.Instance.Connected -= WcfService_Connected;
119      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
120    }
121
122    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
123      ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
124    }
125
126    void WcfService_Connected(object sender, EventArgs e) {
127      ClientCom.LogMessage("Connected successfully to Hive server");
128    }
129
130    /// <summary>
131    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
132    /// </summary>
133    /// <param name="container">The Container, containing the message</param>
134    private void DetermineAction(MessageContainer container) {
135      ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
136
137      if (container is ExecutorMessageContainer<Guid>) {
138        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
139        c.execute();
140      } else if (container is MessageContainer) {
141        switch (container.Message) {
142          case MessageContainer.MessageType.CalculateJob:
143            Task.Factory.StartNew(() => {
144              Job job = wcfService.GetJob(container.JobId);
145              lock (engines) {
146                if (!jobs.ContainsKey(job.Id)) {
147                  jobs.Add(job.Id, job);
148                }
149              }
150              JobData jobData = wcfService.GetJobData(job.Id);
151              job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
152              StartJobInAppDomain(job, jobData);
153            });
154            break;
155          case MessageContainer.MessageType.ShutdownSlave:
156            ShutdownCore();
157            break;
158          case MessageContainer.MessageType.StopAll:
159            DoStopAll();
160            break;
161          case MessageContainer.MessageType.PauseAll:
162            DoPauseAll();
163            break;
164          case MessageContainer.MessageType.AbortAll:
165            DoAbortAll();
166            break;
167          case MessageContainer.MessageType.AbortJob:
168            KillAppDomain(container.JobId);
169            break;
170          case MessageContainer.MessageType.StopJob:
171            DoStopJob(container.JobId);
172            break;
173          case MessageContainer.MessageType.PauseJob:
174            DoPauseJob(container.JobId);
175            break;
176          case MessageContainer.MessageType.Restart:
177            DoStartSlave();
178            break;
179          case MessageContainer.MessageType.Sleep:
180            Sleep();
181            break;
182          case MessageContainer.MessageType.SayHello:
183            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
184            break;
185        }
186      } else {
187        ClientCom.LogMessage("Unknown MessageContainer: " + container);
188      }
189    }
190
191    private void DoPauseJob(Guid jobId) {
192      Job job = Jobs[jobId];
193
194      if (job != null) {
195        engines[job.Id].Pause();
196        JobData sJob = engines[job.Id].GetFinishedJob();
197        // job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is paused
198        job.ExecutionTime = engines[job.Id].ExecutionTime;
199
200        try {
201          ClientCom.LogMessage("Sending the paused job with id: " + job.Id);
202          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
203          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
204        }
205        catch (Exception e) {
206          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
207        }
208        finally {
209          KillAppDomain(job.Id); // kill app-domain in every case         
210        }
211      }
212    }
213
214    private void DoStopJob(Guid guid) {
215      Job job = Jobs[guid];
216
217      if (job != null) {
218        engines[job.Id].Stop();
219        JobData sJob = engines[job.Id].GetFinishedJob();
220        // job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is stopped regularly
221        job.ExecutionTime = engines[job.Id].ExecutionTime;
222
223        try {
224          ClientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
225          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
226          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
227        }
228        catch (Exception e) {
229          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
230        }
231        finally {
232          KillAppDomain(job.Id); // kill app-domain in every case         
233        }
234      }
235    }
236
237    /// <summary>
238    /// aborts all running jobs, no results are sent back
239    /// </summary>
240    private void DoAbortAll() {
241      List<Guid> guids = new List<Guid>();
242      foreach (Guid job in Jobs.Keys) {
243        guids.Add(job);
244      }
245
246      foreach (Guid g in guids) {
247        KillAppDomain(g);
248      }
249
250      ClientCom.LogMessage("Aborted all jobs!");
251    }
252
253    /// <summary>
254    /// wait for jobs to finish, then pause client
255    /// </summary>
256    private void DoPauseAll() {
257      ClientCom.LogMessage("Pause all received");
258
259      //copy guids because there will be removed items from 'Jobs'
260      List<Guid> guids = new List<Guid>();
261      foreach (Guid job in Jobs.Keys) {
262        guids.Add(job);
263      }
264
265      foreach (Guid g in guids) {
266        DoPauseJob(g);
267      }
268    }
269
270    /// <summary>
271    /// pause slave immediately
272    /// </summary>
273    private void DoStopAll() {
274      ClientCom.LogMessage("Stop all received");
275
276      //copy guids because there will be removed items from 'Jobs'
277      List<Guid> guids = new List<Guid>();
278      foreach (Guid job in Jobs.Keys) {
279        guids.Add(job);
280      }
281
282      foreach (Guid g in guids) {
283        DoStopJob(g);
284      }
285    }
286
287    /// <summary>
288    /// completly shudown slave
289    /// </summary>
290    public void Shutdown() {
291      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
292      MessageQueue.GetInstance().AddMessage(mc);
293      waitShutdownSem.WaitOne();
294    }
295
296    /// <summary>
297    /// complete shutdown, should be called before the the application is exited
298    /// </summary>
299    private void ShutdownCore() {
300      ClientCom.LogMessage("Shutdown Signal received");
301      ClientCom.LogMessage("Stopping heartbeat");
302      heartbeatManager.StopHeartBeat();
303      abortRequested = true;
304      ClientCom.LogMessage("Logging out");
305
306
307      lock (engines) {
308        ClientCom.LogMessage("engines locked");
309        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
310          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
311          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
312          AppDomain.Unload(kvp.Value);
313        }
314      }
315      WcfService.Instance.Disconnect();
316      ClientCom.Shutdown();
317      SlaveClientCom.Close();
318
319      if (slaveComm.State != CommunicationState.Closed)
320        slaveComm.Close();
321    }
322
323    /// <summary>
324    /// reinitializes everything and continues operation,
325    /// can be called after Sleep()
326    /// </summary> 
327    private void DoStartSlave() {
328      ClientCom.LogMessage("Restart received");
329      StartHeartbeats();
330      ClientCom.LogMessage("Restart done");
331    }
332
333    /// <summary>
334    /// stop slave, except for client gui communication,
335    /// primarily used by gui if core is running as windows service
336    /// </summary>
337    //TODO: do we need an AbortSleep?
338    private void Sleep() {
339      ClientCom.LogMessage("Sleep received");
340      heartbeatManager.StopHeartBeat();
341      DoStopAll();
342      WcfService.Instance.Disconnect();
343      ClientCom.LogMessage("Sleep done");
344    }
345
346    /// <summary>
347    /// Pauses a job, which means sending it to the server and killing it locally;
348    /// atm only used when executor is waiting for child jobs
349    /// </summary>
350    /// <param name="data"></param>
351    [MethodImpl(MethodImplOptions.Synchronized)]
352    public void PauseWaitJob(JobData data) {
353      if (!Jobs.ContainsKey(data.JobId)) {
354        ClientCom.LogMessage("Can't find job with id " + data.JobId);
355      } else {
356        Job job = Jobs[data.JobId];
357        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
358        wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
359      }
360      KillAppDomain(data.JobId);
361    }
362
363    /// <summary>
364    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
365    /// once the connection gets reestablished, the job gets submitted
366    /// </summary>
367    /// <param name="jobId"></param>
368    [MethodImpl(MethodImplOptions.Synchronized)]
369    public void SendFinishedJob(Guid jobId) {
370      try {
371        ClientCom.LogMessage("Getting the finished job with id: " + jobId);
372        if (!engines.ContainsKey(jobId)) {
373          ClientCom.LogMessage("Engine doesn't exist");
374          return;
375        }
376        if (!jobs.ContainsKey(jobId)) {
377          ClientCom.LogMessage("Job doesn't exist");
378          return;
379        }
380        Job cJob = jobs[jobId];
381        cJob.ExecutionTime = engines[jobId].ExecutionTime;
382
383        JobData sJob = engines[jobId].GetFinishedJob();
384        // cJob.Exception = engines[jId].CurrentException; // can there be an exception if the job is sent normally. the exception should be entered in the statelog with the corresponding state (Failed)
385        cJob.ExecutionTime = engines[jobId].ExecutionTime;
386
387        try {
388          ClientCom.LogMessage("Sending the finished job with id: " + jobId);
389          wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
390          SlaveStatusInfo.JobsProcessed++;
391        }
392        catch (Exception e) {
393          ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
394        }
395        finally {
396          KillAppDomain(jobId); // kill app-domain in every case
397          heartbeatManager.AwakeHeartBeatThread();
398        }
399      }
400      catch (Exception e) {
401        OnExceptionOccured(e);
402      }
403    }
404
405    /// <summary>
406    /// A new Job from the wcfService has been received and will be started within a AppDomain.
407    /// </summary>
408    /// <param name="sender"></param>
409    /// <param name="e"></param>
410    private void StartJobInAppDomain(Job myJob, JobData jobData) {
411      ClientCom.LogMessage("Received new job with id " + myJob.Id);
412      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
413      bool pluginsPrepared = false;
414      string configFileName = string.Empty;
415
416      try {
417        PluginCache.Instance.PreparePlugins(myJob, out configFileName);
418        ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
419        pluginsPrepared = true;
420      }
421      catch (Exception exception) {
422        ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
423      }
424
425      if (pluginsPrepared) {
426        try {
427          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
428          appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
429          lock (engines) {
430            appDomains.Add(myJob.Id, appDomain);
431            ClientCom.LogMessage("Creating AppDomain");
432            Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
433            ClientCom.LogMessage("Created AppDomain");
434            engine.JobId = myJob.Id;
435            engine.Core = this;
436            ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
437            engines.Add(myJob.Id, engine);
438            engine.Start(jobData.Data);
439            SlaveStatusInfo.JobsFetched++;
440            ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
441          }
442        }
443        catch (Exception exception) {
444          ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
445          ClientCom.LogMessage("Error thrown is: " + exception.ToString());
446          KillAppDomain(myJob.Id);
447        }
448      }
449      heartbeatManager.AwakeHeartBeatThread();
450    }
451
452    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
453    private void OnExceptionOccured(Exception e) {
454      ClientCom.LogMessage("Error: " + e.ToString());
455      var handler = ExceptionOccured;
456      if (handler != null) handler(this, new EventArgs<Exception>(e));
457    }
458
459    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
460      ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
461      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
462    }
463
464    /// <summary>
465    /// Enqueues messages from the executor to the message queue.
466    /// This is necessary if the core thread has to execute certain actions, e.g.
467    /// killing of an app domain.
468    /// </summary>
469    /// <typeparam name="T"></typeparam>
470    /// <param name="action"></param>
471    /// <param name="parameter"></param>
472    /// <returns>true if the calling method can continue execution, else false</returns>
473    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
474      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
475      container.Callback = action;
476      container.CallbackParameter = parameter;
477      MessageQueue.GetInstance().AddMessage(container);
478    }
479
480    /// <summary>
481    /// Kill a appdomain with a specific id.
482    /// </summary>
483    /// <param name="id">the GUID of the job</param>
484    //[MethodImpl(MethodImplOptions.Synchronized)]
485    public void KillAppDomain(Guid id) {
486      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
487        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
488        return;
489      }
490
491      ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
492      lock (engines) {
493        try {
494          if (engines.ContainsKey(id)) {
495            engines[id].Dispose();
496            engines.Remove(id);
497          }
498
499          if (appDomains.ContainsKey(id)) {
500            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
501
502            int repeat = 5;
503            while (repeat > 0) {
504              try {
505                AppDomain.Unload(appDomains[id]);
506                repeat = 0;
507              }
508              catch (CannotUnloadAppDomainException) {
509                ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
510                Thread.Sleep(1000);
511                repeat--;
512                if (repeat == 0) {
513                  throw; // rethrow and let app crash
514                }
515              }
516            }
517            appDomains.Remove(id);
518          }
519
520          jobs.Remove(id);
521          PluginCache.Instance.DeletePluginsForJob(id);
522          GC.Collect();
523        }
524        catch (Exception ex) {
525          ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
526        }
527      }
528      GC.Collect();
529    }
530  }
531}
Note: See TracBrowser for help on using the repository browser.