Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5606

Last change on this file since 5606 was 5599, checked in by ascheibe, 14 years ago

#1233

  • rename 'Slave' namespace to 'SlaveCore' (and assemblies etc) to avoid problems with 'Slave' class
  • use svcutil (OKB-style)
File size: 19.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.CompilerServices;
26using System.ServiceModel;
27using System.Threading;
28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31
32
33
34namespace HeuristicLab.Clients.Hive.SlaveCore {
35  /// <summary>
36  /// The core component of the Hive Client
37  /// </summary>
38  public class Core : MarshalByRefObject {
39
40    //TODO: this class should be a singleton; there is only one instance, the reference is meanwhile save in TheCore
41    public static Core TheCore;
42
43    public static bool abortRequested { get; set; }
44    private Semaphore waitShutdownSem = new Semaphore(0, 1);
45    public static ILog Log { get; set; }
46
47    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
48    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
49    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
50
51    private WcfService wcfService;
52    private HeartbeatManager heartbeatManager;
53    private int coreThreadId;
54
55    private ISlaveCommunication ClientCom;
56    private ServiceHost slaveComm;
57
58    public Dictionary<Guid, Executor> ExecutionEngines {
59      get { return engines; }
60    }
61
62    internal Dictionary<Guid, Job> Jobs {
63      get { return jobs; }
64    }
65
66    public Core() {
67      TheCore = this;
68    }
69
70    /// <summary>
71    /// Main Method for the client
72    /// </summary>
73    public void Start() {
74      coreThreadId = Thread.CurrentThread.ManagedThreadId;
75      abortRequested = false;
76
77      //start the client communication service (pipe between slave and slave gui)
78      slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
79      slaveComm.Open();
80
81      ClientCom = SlaveClientCom.Instance.ClientCom;
82      ClientCom.LogMessage("Hive Slave started");
83
84      ConfigManager manager = ConfigManager.Instance;
85      manager.Core = this;
86
87      wcfService = WcfService.Instance;
88      RegisterServiceEvents();
89
90      StartHeartbeats(); // Start heartbeats thread
91      DispatchMessageQueue(); // dispatch messages until abortRequested
92
93      DeRegisterServiceEvents();
94      waitShutdownSem.Release();
95    }
96
97    private void StartHeartbeats() {
98      //Initialize the heartbeat     
99      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
100      heartbeatManager.StartHeartbeat();
101    }
102
103    private void DispatchMessageQueue() {
104      MessageQueue queue = MessageQueue.GetInstance();
105      while (!abortRequested) {
106        MessageContainer container = queue.GetMessage();
107        DetermineAction(container);
108      }
109    }
110
111    private void RegisterServiceEvents() {
112      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
113      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
114    }
115
116    private void DeRegisterServiceEvents() {
117      WcfService.Instance.Connected -= WcfService_Connected;
118      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
119    }
120
121    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
122      ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
123    }
124
125    void WcfService_Connected(object sender, EventArgs e) {
126      ClientCom.LogMessage("Connected successfully to Hive server");
127    }
128
129    /// <summary>
130    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
131    /// </summary>
132    /// <param name="container">The Container, containing the message</param>
133    private void DetermineAction(MessageContainer container) {
134      ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
135      //TODO: find a better solution
136      if (container is ExecutorMessageContainer<Guid>) {
137        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
138        c.execute();
139      } else if (container is MessageContainer) {
140        switch (container.Message) {
141          case MessageContainer.MessageType.CalculateJob:
142            Job myJob = wcfService.GetJob(container.JobId);
143            //TODO: handle in own thread!!
144            JobData jobData = wcfService.GetJobData(myJob.Id);
145            StartJobInAppDomain(myJob, jobData);
146            break;
147          case MessageContainer.MessageType.ShutdownSlave:
148            ShutdownCore();
149            break;
150          case MessageContainer.MessageType.StopAll:
151            DoStopAll();
152            break;
153          case MessageContainer.MessageType.PauseAll:
154            DoPauseAll();
155            break;
156          case MessageContainer.MessageType.AbortAll:
157            DoAbortAll();
158            break;
159          case MessageContainer.MessageType.AbortJob:
160            KillAppDomain(container.JobId);
161            break;
162          case MessageContainer.MessageType.StopJob:
163            DoStopJob(container.JobId);
164            break;
165          case MessageContainer.MessageType.PauseJob:
166            DoPauseJob(container.JobId);
167            break;
168          case MessageContainer.MessageType.Restart:
169            DoStartSlave();
170            break;
171          case MessageContainer.MessageType.Sleep:
172            Sleep();
173            break;
174          case MessageContainer.MessageType.SayHello:
175            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
176            break;
177        }
178      } else {
179        ClientCom.LogMessage("Unknown MessageContainer: " + container);
180      }
181    }
182
183    private void DoPauseJob(Guid guid) {
184      Job job = Jobs[guid];
185
186      if (job != null) {
187        engines[job.Id].Pause();
188        JobData sJob = engines[job.Id].GetFinishedJob();
189        // job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is paused
190        job.ExecutionTime = engines[job.Id].ExecutionTime;
191
192        try {
193          ClientCom.LogMessage("Sending the paused job with id: " + job.Id);
194          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id);
195          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
196        }
197        catch (Exception e) {
198          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
199        }
200        finally {
201          KillAppDomain(job.Id); // kill app-domain in every case         
202        }
203      }
204    }
205
206    private void DoStopJob(Guid guid) {
207      Job job = Jobs[guid];
208
209      if (job != null) {
210        engines[job.Id].Stop();
211        JobData sJob = engines[job.Id].GetFinishedJob();
212        // job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is stopped regularly
213        job.ExecutionTime = engines[job.Id].ExecutionTime;
214
215        try {
216          ClientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
217          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id);
218          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
219        }
220        catch (Exception e) {
221          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
222        }
223        finally {
224          KillAppDomain(job.Id); // kill app-domain in every case         
225        }
226      }
227    }
228
229    /// <summary>
230    /// aborts all running jobs, no results are sent back
231    /// </summary>
232    private void DoAbortAll() {
233      List<Guid> guids = new List<Guid>();
234      foreach (Guid job in Jobs.Keys) {
235        guids.Add(job);
236      }
237
238      foreach (Guid g in guids) {
239        KillAppDomain(g);
240      }
241
242      ClientCom.LogMessage("Aborted all jobs!");
243    }
244
245    /// <summary>
246    /// wait for jobs to finish, then pause client
247    /// </summary>
248    private void DoPauseAll() {
249      ClientCom.LogMessage("Pause all received");
250
251      //copy guids because there will be removed items from 'Jobs'
252      List<Guid> guids = new List<Guid>();
253      foreach (Guid job in Jobs.Keys) {
254        guids.Add(job);
255      }
256
257      foreach (Guid g in guids) {
258        DoPauseJob(g);
259      }
260    }
261
262    /// <summary>
263    /// pause slave immediately
264    /// </summary>
265    private void DoStopAll() {
266      ClientCom.LogMessage("Stop all received");
267
268      //copy guids because there will be removed items from 'Jobs'
269      List<Guid> guids = new List<Guid>();
270      foreach (Guid job in Jobs.Keys) {
271        guids.Add(job);
272      }
273
274      foreach (Guid g in guids) {
275        DoStopJob(g);
276      }
277    }
278
279    /// <summary>
280    /// completly shudown slave
281    /// </summary>
282    public void Shutdown() {
283      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
284      MessageQueue.GetInstance().AddMessage(mc);
285      waitShutdownSem.WaitOne();
286    }
287
288    /// <summary>
289    /// complete shutdown, should be called before the the application is exited
290    /// </summary>
291    private void ShutdownCore() {
292      ClientCom.LogMessage("Shutdown Signal received");
293      ClientCom.LogMessage("Stopping heartbeat");
294      heartbeatManager.StopHeartBeat();
295      abortRequested = true;
296      ClientCom.LogMessage("Logging out");
297
298
299      lock (engines) {
300        ClientCom.LogMessage("engines locked");
301        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
302          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
303          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
304          AppDomain.Unload(kvp.Value);
305        }
306      }
307      WcfService.Instance.Disconnect();
308      ClientCom.Shutdown();
309      SlaveClientCom.Close();
310
311      if (slaveComm.State != CommunicationState.Closed)
312        slaveComm.Close();
313    }
314
315    /// <summary>
316    /// reinitializes everything and continues operation,
317    /// can be called after Sleep()
318    /// </summary> 
319    private void DoStartSlave() {
320      ClientCom.LogMessage("Restart received");
321      StartHeartbeats();
322      ClientCom.LogMessage("Restart done");
323    }
324
325    /// <summary>
326    /// stop slave, except for client gui communication,
327    /// primarily used by gui if core is running as windows service
328    /// </summary>
329    //TODO: do we need an AbortSleep?
330    private void Sleep() {
331      ClientCom.LogMessage("Sleep received");
332      heartbeatManager.StopHeartBeat();
333      DoStopAll();
334      WcfService.Instance.Disconnect();
335      ClientCom.LogMessage("Sleep done");
336    }
337
338    /// <summary>
339    /// Pauses a job, which means sending it to the server and killing it locally;
340    /// atm only used when executor is waiting for child jobs
341    /// </summary>
342    /// <param name="data"></param>
343    [MethodImpl(MethodImplOptions.Synchronized)]
344    public void PauseWaitJob(JobData data) {
345      if (!Jobs.ContainsKey(data.JobId)) {
346        ClientCom.LogMessage("Can't find job with id " + data.JobId);
347      } else {
348        Job job = Jobs[data.JobId];
349        job.SetState(JobState.Transferring);
350        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id);
351        job.SetState(JobState.Waiting); // todo: what if it was a ResumeOnChildJobsFinished job before? maybe look into StateLog
352        wcfService.UpdateJob(job);
353      }
354      KillAppDomain(data.JobId);
355    }
356
357    /// <summary>
358    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
359    /// once the connection gets reestablished, the job gets submitted
360    /// </summary>
361    /// <param name="jobId"></param>
362    [MethodImpl(MethodImplOptions.Synchronized)]
363    public void SendFinishedJob(Guid jobId) {
364      try {
365        ClientCom.LogMessage("Getting the finished job with id: " + jobId);
366        if (!engines.ContainsKey(jobId)) {
367          ClientCom.LogMessage("Engine doesn't exist");
368          return;
369        }
370        if (!jobs.ContainsKey(jobId)) {
371          ClientCom.LogMessage("Job doesn't exist");
372          return;
373        }
374        Job cJob = jobs[jobId];
375        cJob.State = JobState.Finished; // TODO: what if failed?
376        cJob.ExecutionTime = engines[jobId].ExecutionTime;
377
378        JobData sJob = engines[jobId].GetFinishedJob();
379        // cJob.Exception = engines[jId].CurrentException; // can there be an exception if the job is sent normally. the exception should be entered in the statelog with the corresponding state (Failed)
380        cJob.ExecutionTime = engines[jobId].ExecutionTime;
381
382        try {
383          ClientCom.LogMessage("Sending the finished job with id: " + jobId);
384          wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id);
385          SlaveStatusInfo.JobsProcessed++;
386        }
387        catch (Exception e) {
388          ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
389        }
390        finally {
391          KillAppDomain(jobId); // kill app-domain in every case
392          heartbeatManager.AwakeHeartBeatThread();
393        }
394      }
395      catch (Exception e) {
396        OnExceptionOccured(e);
397      }
398    }
399
400    /// <summary>
401    /// A new Job from the wcfService has been received and will be started within a AppDomain.
402    /// </summary>
403    /// <param name="sender"></param>
404    /// <param name="e"></param>
405    private void StartJobInAppDomain(Job myJob, JobData jobData) {
406      ClientCom.LogMessage("Received new job with id " + myJob.Id);
407      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
408      bool pluginsPrepared = false;
409      string configFileName = string.Empty;
410
411      try {
412        PluginCache.Instance.PreparePlugins(myJob, out configFileName);
413        ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
414        pluginsPrepared = true;
415      }
416      catch (Exception exception) {
417        ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
418      }
419
420      if (pluginsPrepared) {
421        try {
422          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
423          appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
424          lock (engines) {
425            if (!jobs.ContainsKey(myJob.Id)) {
426              jobs.Add(myJob.Id, myJob);
427              appDomains.Add(myJob.Id, appDomain);
428              ClientCom.LogMessage("Creating AppDomain");
429              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
430              ClientCom.LogMessage("Created AppDomain");
431              engine.JobId = myJob.Id;
432              engine.Core = this;
433              ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
434              engines.Add(myJob.Id, engine);
435              engine.Start(jobData.Data);
436              SlaveStatusInfo.JobsFetched++;
437              ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
438            }
439          }
440          heartbeatManager.AwakeHeartBeatThread();
441        }
442        catch (Exception exception) {
443          ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
444          ClientCom.LogMessage("Error thrown is: " + exception.ToString());
445          KillAppDomain(myJob.Id);
446        }
447      }
448    }
449
450    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
451    private void OnExceptionOccured(Exception e) {
452      ClientCom.LogMessage("Error: " + e.ToString());
453      var handler = ExceptionOccured;
454      if (handler != null) handler(this, new EventArgs<Exception>(e));
455    }
456
457    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
458      ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
459      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
460    }
461
462    /// <summary>
463    /// Enqueues messages from the executor to the message queue.
464    /// This is necessary if the core thread has to execute certain actions, e.g.
465    /// killing of an app domain.
466    /// </summary>
467    /// <typeparam name="T"></typeparam>
468    /// <param name="action"></param>
469    /// <param name="parameter"></param>
470    /// <returns>true if the calling method can continue execution, else false</returns>
471    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
472      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
473      container.Callback = action;
474      container.CallbackParameter = parameter;
475      MessageQueue.GetInstance().AddMessage(container);
476    }
477
478    /// <summary>
479    /// Kill a appdomain with a specific id.
480    /// </summary>
481    /// <param name="id">the GUID of the job</param>
482    //[MethodImpl(MethodImplOptions.Synchronized)]
483    public void KillAppDomain(Guid id) {
484      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
485        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
486        return;
487      }
488
489      ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
490      lock (engines) {
491        try {
492          if (engines.ContainsKey(id)) {
493            engines[id].Dispose();
494            engines.Remove(id);
495          }
496
497          if (appDomains.ContainsKey(id)) {
498            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
499
500            int repeat = 5;
501            while (repeat > 0) {
502              try {
503                AppDomain.Unload(appDomains[id]);
504                repeat = 0;
505              }
506              catch (CannotUnloadAppDomainException) {
507                ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
508                Thread.Sleep(1000);
509                repeat--;
510                if (repeat == 0) {
511                  throw; // rethrow and let app crash
512                }
513              }
514            }
515            appDomains.Remove(id);
516          }
517
518          jobs.Remove(id);
519          PluginCache.Instance.DeletePluginsForJob(id);
520          GC.Collect();
521        }
522        catch (Exception ex) {
523          ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
524        }
525      }
526      GC.Collect();
527    }
528  }
529}
Note: See TracBrowser for help on using the repository browser.