Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5541

Last change on this file since 5541 was 5541, checked in by ascheibe, 14 years ago

#1233

  • added app.configs
  • use PreBuildEvent.cmd in SlaveConsoleClient
  • react to SayHello msg
File size: 19.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.CompilerServices;
26using System.ServiceModel;
27using System.Threading;
28using HeuristicLab.Clients.Hive.Slave.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using HeuristicLab.Services.Hive.Common;
32using HeuristicLab.Services.Hive.Common.DataTransfer;
33
34
35namespace HeuristicLab.Clients.Hive.Slave {
36  /// <summary>
37  /// The core component of the Hive Client
38  /// </summary>
39  public class Core : MarshalByRefObject {
40
41    //TODO: this class should be a singleton; there is only one instance, the reference is meanwhile save in TheCore
42    public static Core TheCore;
43
44    public static bool abortRequested { get; set; }
45    private Semaphore waitShutdownSem = new Semaphore(0, 1);
46    public static ILog Log { get; set; }
47
48    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
49    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
50    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
51
52    private WcfService wcfService;
53    private HeartbeatManager heartbeatManager;
54    private int coreThreadId;
55
56    private ISlaveCommunication ClientCom;
57    private ServiceHost slaveComm;
58
59    public Dictionary<Guid, Executor> ExecutionEngines {
60      get { return engines; }
61    }
62
63    internal Dictionary<Guid, Job> Jobs {
64      get { return jobs; }
65    }
66
67    public Core() {
68      TheCore = this;
69    }
70
71    /// <summary>
72    /// Main Method for the client
73    /// </summary>
74    public void Start() {
75      coreThreadId = Thread.CurrentThread.ManagedThreadId;
76      abortRequested = false;
77
78      //start the client communication service (pipe between slave and slave gui)
79      slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
80      slaveComm.Open();
81
82      ClientCom = SlaveClientCom.Instance.ClientCom;
83      ClientCom.LogMessage("Hive Slave started");
84
85      ConfigManager manager = ConfigManager.Instance;
86      manager.Core = this;
87
88      wcfService = WcfService.Instance;
89      RegisterServiceEvents();
90
91      StartHeartbeats(); // Start heartbeats thread
92      DispatchMessageQueue(); // dispatch messages until abortRequested
93
94      DeRegisterServiceEvents();
95      waitShutdownSem.Release();
96    }
97
98    private void StartHeartbeats() {
99      //Initialize the heartbeat     
100      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
101      heartbeatManager.StartHeartbeat();
102    }
103
104    private void DispatchMessageQueue() {
105      MessageQueue queue = MessageQueue.GetInstance();
106      while (!abortRequested) {
107        MessageContainer container = queue.GetMessage();
108        DetermineAction(container);
109      }
110    }
111
112    private void RegisterServiceEvents() {
113      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
114      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
115    }
116
117    private void DeRegisterServiceEvents() {
118      WcfService.Instance.Connected -= WcfService_Connected;
119      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
120    }
121
122    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
123      ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
124    }
125
126    void WcfService_Connected(object sender, EventArgs e) {
127      ClientCom.LogMessage("Connected successfully to Hive server");
128    }
129
130    /// <summary>
131    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
132    /// </summary>
133    /// <param name="container">The Container, containing the message</param>
134    private void DetermineAction(MessageContainer container) {
135      ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
136      //TODO: find a better solution
137      if (container is ExecutorMessageContainer<Guid>) {
138        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
139        c.execute();
140      } else if (container is MessageContainer) {
141        switch (container.Message) {
142          case MessageContainer.MessageType.CalculateJob:
143            Job myJob = wcfService.GetJob(container.JobId);
144            //TODO: handle in own thread!!
145            JobData jobData = wcfService.GetJobData(myJob.Id);
146            StartJobInAppDomain(myJob, jobData);
147            break;
148          case MessageContainer.MessageType.ShutdownSlave:
149            ShutdownCore();
150            break;
151          case MessageContainer.MessageType.StopAll:
152            DoStopAll();
153            break;
154          case MessageContainer.MessageType.PauseAll:
155            DoPauseAll();
156            break;
157          case MessageContainer.MessageType.AbortAll:
158            DoAbortAll();
159            break;
160          case MessageContainer.MessageType.AbortJob:
161            KillAppDomain(container.JobId);
162            break;
163          case MessageContainer.MessageType.StopJob:
164            DoStopJob(container.JobId);
165            break;
166          case MessageContainer.MessageType.PauseJob:
167            DoPauseJob(container.JobId);
168            break;
169          case MessageContainer.MessageType.Restart:
170            DoStartSlave();
171            break;
172          case MessageContainer.MessageType.Sleep:
173            Sleep();
174            break;
175          case MessageContainer.MessageType.SayHello:
176            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
177            break;
178        }
179      } else {
180        ClientCom.LogMessage("Unknown MessageContainer: " + container);
181      }
182    }
183
184    private void DoPauseJob(Guid guid) {
185      Job job = Jobs[guid];
186
187      if (job != null) {
188        engines[job.Id].Pause();
189        JobData sJob = engines[job.Id].GetFinishedJob();
190        // job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is paused
191        job.ExecutionTime = engines[job.Id].ExecutionTime;
192
193        try {
194          ClientCom.LogMessage("Sending the paused job with id: " + job.Id);
195          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id);
196          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
197        }
198        catch (Exception e) {
199          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
200        }
201        finally {
202          KillAppDomain(job.Id); // kill app-domain in every case         
203        }
204      }
205    }
206
207    private void DoStopJob(Guid guid) {
208      Job job = Jobs[guid];
209
210      if (job != null) {
211        engines[job.Id].Stop();
212        JobData sJob = engines[job.Id].GetFinishedJob();
213        // job.Exception = engines[job.Id].CurrentException; // can there be an exception if a job is stopped regularly
214        job.ExecutionTime = engines[job.Id].ExecutionTime;
215
216        try {
217          ClientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
218          wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id);
219          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
220        }
221        catch (Exception e) {
222          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
223        }
224        finally {
225          KillAppDomain(job.Id); // kill app-domain in every case         
226        }
227      }
228    }
229
230    /// <summary>
231    /// aborts all running jobs, no results are sent back
232    /// </summary>
233    private void DoAbortAll() {
234      List<Guid> guids = new List<Guid>();
235      foreach (Guid job in Jobs.Keys) {
236        guids.Add(job);
237      }
238
239      foreach (Guid g in guids) {
240        KillAppDomain(g);
241      }
242
243      ClientCom.LogMessage("Aborted all jobs!");
244    }
245
246    /// <summary>
247    /// wait for jobs to finish, then pause client
248    /// </summary>
249    private void DoPauseAll() {
250      ClientCom.LogMessage("Pause all received");
251
252      //copy guids because there will be removed items from 'Jobs'
253      List<Guid> guids = new List<Guid>();
254      foreach (Guid job in Jobs.Keys) {
255        guids.Add(job);
256      }
257
258      foreach (Guid g in guids) {
259        DoPauseJob(g);
260      }
261    }
262
263    /// <summary>
264    /// pause slave immediately
265    /// </summary>
266    private void DoStopAll() {
267      ClientCom.LogMessage("Stop all received");
268
269      //copy guids because there will be removed items from 'Jobs'
270      List<Guid> guids = new List<Guid>();
271      foreach (Guid job in Jobs.Keys) {
272        guids.Add(job);
273      }
274
275      foreach (Guid g in guids) {
276        DoStopJob(g);
277      }
278    }
279
280    /// <summary>
281    /// completly shudown slave
282    /// </summary>
283    public void Shutdown() {
284      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
285      MessageQueue.GetInstance().AddMessage(mc);
286      waitShutdownSem.WaitOne();
287    }
288
289    /// <summary>
290    /// complete shutdown, should be called before the the application is exited
291    /// </summary>
292    private void ShutdownCore() {
293      ClientCom.LogMessage("Shutdown Signal received");
294      ClientCom.LogMessage("Stopping heartbeat");
295      heartbeatManager.StopHeartBeat();
296      abortRequested = true;
297      ClientCom.LogMessage("Logging out");
298
299
300      lock (engines) {
301        ClientCom.LogMessage("engines locked");
302        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
303          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
304          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
305          AppDomain.Unload(kvp.Value);
306        }
307      }
308      WcfService.Instance.Disconnect();
309      ClientCom.Shutdown();
310      SlaveClientCom.Close();
311
312      if (slaveComm.State != CommunicationState.Closed)
313        slaveComm.Close();
314    }
315
316    /// <summary>
317    /// reinitializes everything and continues operation,
318    /// can be called after Sleep()
319    /// </summary> 
320    private void DoStartSlave() {
321      ClientCom.LogMessage("Restart received");
322      StartHeartbeats();
323      ClientCom.LogMessage("Restart done");
324    }
325
326    /// <summary>
327    /// stop slave, except for client gui communication,
328    /// primarily used by gui if core is running as windows service
329    /// </summary>
330    //TODO: do we need an AbortSleep?
331    private void Sleep() {
332      ClientCom.LogMessage("Sleep received");
333      heartbeatManager.StopHeartBeat();
334      DoStopAll();
335      WcfService.Instance.Disconnect();
336      ClientCom.LogMessage("Sleep done");
337    }
338
339    /// <summary>
340    /// Pauses a job, which means sending it to the server and killing it locally;
341    /// atm only used when executor is waiting for child jobs
342    /// </summary>
343    /// <param name="data"></param>
344    [MethodImpl(MethodImplOptions.Synchronized)]
345    public void PauseWaitJob(JobData data) {
346      if (!Jobs.ContainsKey(data.JobId)) {
347        ClientCom.LogMessage("Can't find job with id " + data.JobId);
348      } else {
349        Job job = Jobs[data.JobId];
350        job.SetState(JobState.Transferring);
351        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id);
352        job.SetState(JobState.Waiting); // todo: what if it was a ResumeOnChildJobsFinished job before? maybe look into StateLog
353        wcfService.UpdateJob(job);
354      }
355      KillAppDomain(data.JobId);
356    }
357
358    /// <summary>
359    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
360    /// once the connection gets reestablished, the job gets submitted
361    /// </summary>
362    /// <param name="jobId"></param>
363    [MethodImpl(MethodImplOptions.Synchronized)]
364    public void SendFinishedJob(Guid jobId) {
365      try {
366        ClientCom.LogMessage("Getting the finished job with id: " + jobId);
367        if (!engines.ContainsKey(jobId)) {
368          ClientCom.LogMessage("Engine doesn't exist");
369          return;
370        }
371        if (!jobs.ContainsKey(jobId)) {
372          ClientCom.LogMessage("Job doesn't exist");
373          return;
374        }
375        Job cJob = jobs[jobId];
376        cJob.State = JobState.Finished; // TODO: what if failed?
377        cJob.ExecutionTime = engines[jobId].ExecutionTime;
378
379        JobData sJob = engines[jobId].GetFinishedJob();
380        // cJob.Exception = engines[jId].CurrentException; // can there be an exception if the job is sent normally. the exception should be entered in the statelog with the corresponding state (Failed)
381        cJob.ExecutionTime = engines[jobId].ExecutionTime;
382
383        try {
384          ClientCom.LogMessage("Sending the finished job with id: " + jobId);
385          wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id);
386          SlaveStatusInfo.JobsProcessed++;
387        }
388        catch (Exception e) {
389          ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
390        }
391        finally {
392          KillAppDomain(jobId); // kill app-domain in every case
393          heartbeatManager.AwakeHeartBeatThread();
394        }
395      }
396      catch (Exception e) {
397        OnExceptionOccured(e);
398      }
399    }
400
401    /// <summary>
402    /// A new Job from the wcfService has been received and will be started within a AppDomain.
403    /// </summary>
404    /// <param name="sender"></param>
405    /// <param name="e"></param>
406    private void StartJobInAppDomain(Job myJob, JobData jobData) {
407      ClientCom.LogMessage("Received new job with id " + myJob.Id);
408      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
409      bool pluginsPrepared = false;
410      string configFileName = string.Empty;
411
412      try {
413        PluginCache.Instance.PreparePlugins(myJob, out configFileName);
414        ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
415        pluginsPrepared = true;
416      }
417      catch (Exception exception) {
418        ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
419      }
420
421      if (pluginsPrepared) {
422        try {
423          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
424          appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
425          lock (engines) {
426            if (!jobs.ContainsKey(myJob.Id)) {
427              jobs.Add(myJob.Id, myJob);
428              appDomains.Add(myJob.Id, appDomain);
429              ClientCom.LogMessage("Creating AppDomain");
430              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
431              ClientCom.LogMessage("Created AppDomain");
432              engine.JobId = myJob.Id;
433              engine.Core = this;
434              ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
435              engines.Add(myJob.Id, engine);
436              engine.Start(jobData.Data);
437              SlaveStatusInfo.JobsFetched++;
438              ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
439            }
440          }
441          heartbeatManager.AwakeHeartBeatThread();
442        }
443        catch (Exception exception) {
444          ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
445          ClientCom.LogMessage("Error thrown is: " + exception.ToString());
446          KillAppDomain(myJob.Id);
447        }
448      }
449    }
450
451    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
452    private void OnExceptionOccured(Exception e) {
453      ClientCom.LogMessage("Error: " + e.ToString());
454      var handler = ExceptionOccured;
455      if (handler != null) handler(this, new EventArgs<Exception>(e));
456    }
457
458    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
459      ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
460      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
461    }
462
463    /// <summary>
464    /// Enqueues messages from the executor to the message queue.
465    /// This is necessary if the core thread has to execute certain actions, e.g.
466    /// killing of an app domain.
467    /// </summary>
468    /// <typeparam name="T"></typeparam>
469    /// <param name="action"></param>
470    /// <param name="parameter"></param>
471    /// <returns>true if the calling method can continue execution, else false</returns>
472    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
473      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
474      container.Callback = action;
475      container.CallbackParameter = parameter;
476      MessageQueue.GetInstance().AddMessage(container);
477    }
478
479    /// <summary>
480    /// Kill a appdomain with a specific id.
481    /// </summary>
482    /// <param name="id">the GUID of the job</param>
483    //[MethodImpl(MethodImplOptions.Synchronized)]
484    public void KillAppDomain(Guid id) {
485      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
486        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
487        return;
488      }
489
490      ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
491      lock (engines) {
492        try {
493          if (engines.ContainsKey(id)) {
494            engines[id].Dispose();
495            engines.Remove(id);
496          }
497
498          if (appDomains.ContainsKey(id)) {
499            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
500
501            int repeat = 5;
502            while (repeat > 0) {
503              try {
504                AppDomain.Unload(appDomains[id]);
505                repeat = 0;
506              }
507              catch (CannotUnloadAppDomainException) {
508                ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
509                Thread.Sleep(1000);
510                repeat--;
511                if (repeat == 0) {
512                  throw; // rethrow and let app crash
513                }
514              }
515            }
516            appDomains.Remove(id);
517          }
518
519          jobs.Remove(id);
520          PluginCache.Instance.DeletePluginsForJob(id);
521          GC.Collect();
522        }
523        catch (Exception ex) {
524          ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
525        }
526      }
527      GC.Collect();
528    }
529  }
530}
Note: See TracBrowser for help on using the repository browser.