Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/Core.cs @ 5707

Last change on this file since 5707 was 5707, checked in by cneumuel, 13 years ago

#1260

  • some changes due to the removal of Disposable (r5706)
  • copy PluginInfrastructure files into PluginCache folder in slaves (needed due to r5703)
File size: 22.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Linq;
26using System.Threading;
27using HeuristicLab.Common;
28using HeuristicLab.Core;
29using HeuristicLab.Hive.Contracts;
30using HeuristicLab.Hive.Contracts.BusinessObjects;
31using HeuristicLab.Hive.Contracts.ResponseObjects;
32using HeuristicLab.Hive.Slave.Common;
33using HeuristicLab.Hive.Slave.Communication;
34using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
35using HeuristicLab.Hive.Slave.Core.ConfigurationManager;
36using HeuristicLab.Hive.Slave.Core.JobStorage;
37using HeuristicLab.Hive.Slave.Core.SlaveConsoleService;
38using HeuristicLab.Hive.Slave.ExecutionEngine;
39
40namespace HeuristicLab.Hive.Slave.Core {
41  /// <summary>
42  /// The core component of the Hive Client
43  /// </summary>
44  public class Core : MarshalByRefObject {
45    public static bool abortRequested { get; set; }
46
47    public static ILog Log { get; set; }
48
49    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
50    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
51    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
52
53    private WcfService wcfService;
54    private HeartbeatManager heartbeatManager;
55
56    private bool currentlyFetching;
57    private bool CurrentlyFetching {
58      get {
59        return currentlyFetching;
60      }
61      set {
62        currentlyFetching = value;
63        Logger.Debug("Set CurrentlyFetching to " + currentlyFetching);
64      }
65    }
66
67    public Dictionary<Guid, Executor> ExecutionEngines {
68      get { return engines; }
69    }
70
71    internal Dictionary<Guid, JobDto> Jobs {
72      get { return jobs; }
73    }
74
75    /// <summary>
76    /// Main Method for the client
77    /// </summary>
78    public void Start() {
79      abortRequested = false;
80      Logger.Info("Hive Slave started");
81      SlaveConsoleServer server = new SlaveConsoleServer();
82      server.Start();
83
84      ConfigManager manager = ConfigManager.Instance;
85      manager.Core = this;
86
87      wcfService = WcfService.Instance;
88      RegisterServiceEvents();
89
90      StartHeartbeats(); // Start heartbeats thread
91      DispatchMessageQueue(); // dispatch messages until abortRequested
92
93      DeRegisterServiceEvents();
94      server.Close();
95      Logger.Info("Program shutdown");
96    }
97
98    private void StartHeartbeats() {
99      //Initialize the heartbeat
100      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
101      heartbeatManager.StartHeartbeat();
102    }
103
104    private void DispatchMessageQueue() {
105      MessageQueue queue = MessageQueue.GetInstance();
106      while (!abortRequested) {
107        MessageContainer container = queue.GetMessage();
108        DetermineAction(container);
109      }
110    }
111
112    private void RegisterServiceEvents() {
113      wcfService.GetJobCompleted += new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
114      wcfService.GetFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
115      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
116      wcfService.Connected += new EventHandler(wcfService_Connected);
117      wcfService.GetJobFailed += new EventHandler<EventArgs<Exception>>(wcfService_GetJobFailed);
118    }
119
120    private void DeRegisterServiceEvents() {
121      wcfService.GetJobCompleted -= new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
122      wcfService.GetFinishedJobResultCompleted -= new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
123      wcfService.ProcessSnapshotCompleted -= new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
124      wcfService.Connected -= new EventHandler(wcfService_Connected);
125      wcfService.GetJobFailed -= new EventHandler<EventArgs<Exception>>(wcfService_GetJobFailed);
126    }
127
128    /// <summary>
129    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
130    /// </summary>
131    /// <param name="container">The Container, containing the message</param>
132    private void DetermineAction(MessageContainer container) {
133      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
134      switch (container.Message) {
135        //Server requests to abort a job
136        case MessageContainer.MessageType.AbortJob:
137          if (engines.ContainsKey(container.JobId))
138            try {
139              engines[container.JobId].Abort();
140            }
141            catch (AppDomainUnloadedException) {
142              // appdomain already unloaded. Finishing job probably ongoing
143            } else
144            Logger.Error("AbortJob: Engine doesn't exist");
145          break;
146
147        //Job has been successfully aborted
148        case MessageContainer.MessageType.JobAborted:
149          Guid jobId = new Guid(container.JobId.ToString());
150          KillAppDomain(jobId);
151          break;
152
153        //Request a Snapshot from the Execution Engine
154        case MessageContainer.MessageType.RequestSnapshot:
155          if (engines.ContainsKey(container.JobId))
156            engines[container.JobId].RequestSnapshot();
157          else
158            Logger.Error("RequestSnapshot: Engine with Job doesn't exist");
159          break;
160
161        //Snapshot is ready and can be sent back to the Server
162        case MessageContainer.MessageType.SnapshotReady:
163          GetSnapshot(container.JobId);
164          break;
165
166        //Pull a Job from the Server
167        case MessageContainer.MessageType.FetchJob:
168          if (!CurrentlyFetching) {
169            wcfService.GetJobAsync(ConfigManager.Instance.GetClientInfo().Id);
170            CurrentlyFetching = true;
171          } else
172            Logger.Info("Currently fetching, won't fetch this time!");
173          break;
174
175        //A Job has finished and can be sent back to the server
176        case MessageContainer.MessageType.FinishedJob:
177          SendFinishedJob(container.JobId);
178          break;
179
180        case MessageContainer.MessageType.JobFailed:
181          SendFinishedJob(container.JobId);
182          break;
183
184        //When the timeslice is up
185        case MessageContainer.MessageType.UptimeLimitDisconnect:
186          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
187          ShutdownRunningJobsAndSubmitSnapshots();
188          break;
189
190        //Fetch or Force Fetch Calendar!
191        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
192          Logger.Info("Fetch Calendar from Server");
193          FetchCalendarFromServer();
194          break;
195
196        //Hard shutdown of the client
197        case MessageContainer.MessageType.Shutdown:
198          Logger.Info("Shutdown Signal received");
199          lock (engines) {
200            Logger.Debug("engines locked");
201            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
202              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
203              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
204              AppDomain.Unload(kvp.Value);
205            }
206          }
207          Logger.Debug("Stopping heartbeat");
208          abortRequested = true;
209          heartbeatManager.StopHeartBeat();
210          Logger.Debug("Logging out");
211          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
212          break;
213
214        case MessageContainer.MessageType.AddChildJob:
215          AddChildJob((MessageContainerWithJob)container);
216          break;
217
218        case MessageContainer.MessageType.PauseJob:
219          // send the job back to hive
220          PauseJob((MessageContainerWithJob)container);
221          break;
222
223        case MessageContainer.MessageType.GetChildJobs:
224          GetChildJobs((MessageContainerWithCallback<SerializedJobList>)container);
225          break;
226
227        case MessageContainer.MessageType.DeleteChildJobs:
228          wcfService.DeleteChildJobs(container.JobId);
229          break;
230      }
231    }
232
233    private void GetChildJobs(MessageContainerWithCallback<SerializedJobList> mc) {
234      ResponseObject<SerializedJobList> response = wcfService.GetChildJobs(mc.JobId);
235      if (response != null && response.StatusMessage == ResponseStatus.Ok) {
236        mc.Callback(response.Obj);
237      } else {
238        if (response != null) {
239          Logger.Error(string.Format("GetChildJobs failed: {0}", response.StatusMessage));
240        } else {
241          Logger.Error("GetChildJobs failed.");
242        }
243      }
244    }
245
246    private void PauseJob(MessageContainerWithJob mc) {
247      ResponseObject<JobDto> response = wcfService.PauseJob(mc.SerializedJob);
248      KillAppDomain(mc.JobId);
249      if (response == null || response.StatusMessage != ResponseStatus.Ok) {
250        Logger.Error("PauseJob failed: " + response.StatusMessage);
251      }
252    }
253
254    private ResponseObject<JobDto> AddChildJob(MessageContainerWithJob mc) {
255      ResponseObject<JobDto> response = wcfService.AddChildJob(mc.JobId, mc.SerializedJob);
256      if (response == null || response.StatusMessage != ResponseStatus.Ok) {
257        Logger.Error("AddChildJob failed: " + response.StatusMessage);
258      }
259      return response;
260    }
261
262    private void ShutdownRunningJobsAndSubmitSnapshots() {
263      //check if there are running jobs
264      if (engines.Count > 0) {
265        //make sure there is no more fetching of jobs while the snapshots get processed
266        CurrentlyFetching = true;
267        //request a snapshot of each running job
268        foreach (KeyValuePair<Guid, Executor> kvp in engines) {
269          kvp.Value.RequestSnapshot();
270        }
271      }
272    }
273
274    //Asynchronous Threads for interaction with the Execution Engine
275    #region Async Threads for the EE
276
277    /// <summary>
278    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
279    /// once the connection gets reestablished, the job gets submitted
280    /// </summary>
281    /// <param name="jobId"></param>
282    private void SendFinishedJob(object jobId) {
283      try {
284        Guid jId = (Guid)jobId;
285        Logger.Info("Getting the finished job with id: " + jId);
286        if (!engines.ContainsKey(jId)) {
287          Logger.Info("Engine doesn't exist");
288          return;
289        }
290
291        byte[] sJob = engines[jId].GetFinishedJob();
292
293        try {
294          Logger.Info("Sending the finished job with id: " + jId);
295          wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, engines[jId].ExecutionTime, engines[jId].CurrentException, true);
296        }
297        catch (Exception e) {
298          Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
299          JobStorageManager.PersistObjectToDisc("0", 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment. also serverIp is not used anymore
300        }
301        finally {
302          KillAppDomain(jId); // kill app-domain in every case
303        }
304      }
305      catch (Exception e) {
306        OnExceptionOccured(e);
307      }
308    }
309
310    private void GetSnapshot(object jobId) {
311      try {
312        Logger.Info("Fetching a snapshot for job " + jobId);
313        Guid jId = (Guid)jobId;
314        byte[] obj = engines[jId].GetSnapshot();
315        wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id, jId, obj, engines[jId].ExecutionTime, null);
316
317        //Uptime Limit reached, now is a good time to destroy this jobs.
318        Logger.Debug("Checking if uptime limit is reached");
319        if (!UptimeManager.Instance.IsAllowedToCalculate()) {
320          Logger.Debug("Uptime limit reached");
321          Logger.Debug("Killing Appdomain");
322          KillAppDomain(jId);
323          //Still anything running? 
324          if (engines.Count == 0) {
325            Logger.Info("All jobs snapshotted and sent back, disconnecting");
326            WcfService.Instance.Disconnect();
327          } else {
328            Logger.Debug("There are still active Jobs in the Field, not disconnecting");
329          }
330        } else {
331          Logger.Debug("Restarting the job" + jobId);
332          engines[jId].StartOnlyJob();
333          Logger.Info("Restarted the job" + jobId);
334        }
335      }
336      catch (Exception e) {
337        OnExceptionOccured(e);
338      }
339    }
340
341    #endregion
342
343    //Eventhandlers for the communication with the wcf Layer
344    #region wcfService Events
345    /// <summary>
346    /// Login has returned
347    /// </summary>
348    /// <param name="sender"></param>
349    /// <param name="e"></param>
350    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
351      if (e.Result.StatusMessage == ResponseStatus.Ok) {
352        CurrentlyFetching = false;
353        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
354      } else
355        Logger.Error("Error during login: " + e.Result.StatusMessage.ToString());
356    }
357
358    /// <summary>
359    /// A new Job from the wcfService has been received and will be started within a AppDomain.
360    /// </summary>
361    /// <param name="sender"></param>
362    /// <param name="e"></param>
363    void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) {
364      if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) {
365        Logger.Info("Received new job with id " + e.Result.Obj.Id);
366        Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
367       
368        String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, e.Result.Obj.Id.ToString());
369        bool pluginsPrepared = false;
370        try {
371          PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded.OrderBy(x => x.Name).ToList());
372          PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded.OrderBy(x => x.Name).ToList(), e.Result.Obj.Id);
373
374          Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
375          pluginsPrepared = true;
376        }
377        catch (Exception exception) {
378          Logger.Error(string.Format("Copying plugins for job {0} failed: {1}", e.Result.Obj.Id, exception));
379        }
380
381        if (pluginsPrepared) {
382          try {
383            AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(e.Result.Obj.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName));
384            appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
385
386            if (!jobs.ContainsKey(e.Result.Obj.Id)) {
387              jobs.Add(e.Result.Obj.Id, e.Result.Obj);
388              appDomains.Add(e.Result.Obj.Id, appDomain);
389              Logger.Debug("Creating AppDomain");
390
391              Executor engine;
392              lock (engines) {
393                engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
394                Logger.Debug("Created AppDomain");
395                engine.JobId = e.Result.Obj.Id;
396                engine.Queue = MessageQueue.GetInstance();
397                Logger.Debug("Starting Engine for job " + e.Result.Obj.Id);
398                engines.Add(e.Result.Obj.Id, engine);
399              }
400              SlaveStatusInfo.JobsFetched++;
401              Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
402              engine.Start(e.Data);
403            }
404            heartbeatManager.AwakeHeartBeatThread();
405          }
406          catch (Exception exception) {
407            Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id);
408            Logger.Error("Error thrown is: ", exception);
409            CurrentlyFetching = false;
410            KillAppDomain(e.Result.Obj.Id);
411            wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, e.Result.Obj.ExecutionTime, exception.ToString(), false);
412          }
413        }
414      } else {
415        Logger.Info("No more jobs left!");
416      }
417      CurrentlyFetching = false;
418    }
419
420    /// <summary>
421    /// A finished job has been stored on the server
422    /// </summary>
423    /// <param name="sender"></param>
424    /// <param name="e"></param>
425    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
426      Logger.Info("Job submitted with id " + e.Result.JobId);
427      if (e.Result.StatusMessage == ResponseStatus.Ok) {
428        SlaveStatusInfo.JobsProcessed++;
429        Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
430        heartbeatManager.AwakeHeartBeatThread();
431      } else {
432        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
433      }
434    }
435
436    /// <summary>
437    /// A snapshot has been stored on the server
438    /// </summary>
439    /// <param name="sender"></param>
440    /// <param name="e"></param>
441    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
442      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
443    }
444
445
446    /// <summary>
447    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
448    /// </summary>
449    /// <param name="sender"></param>
450    /// <param name="e"></param>
451    void wcfService_Connected(object sender, EventArgs e) {
452      Logger.Info("WCF Service got a connection");
453      if (!UptimeManager.Instance.CalendarAvailable) {
454        Logger.Info("No local calendar available, fetch it");
455        FetchCalendarFromServer();
456      }
457      Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsAllowedToCalculate());
458      CurrentlyFetching = false;
459      CheckRunningAppDomains();
460      JobStorageManager.CheckAndSubmitJobsFromDisc();
461    }
462
463    void wcfService_GetJobFailed(object sender, EventArgs<Exception> e) {
464      Logger.Info("GetJobFailed: " + e.Value.ToString());
465      CurrentlyFetching = false;
466    }
467
468    private void FetchCalendarFromServer() {
469      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
470      if (calres.StatusMessage == ResponseStatus.Ok) {
471        if (UptimeManager.Instance.SetAppointments(false, calres)) {
472          Logger.Info("Remote calendar installed");
473          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
474        } else {
475          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
476          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
477        }
478      } else {
479        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
480        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
481      }
482    }
483
484    private void CheckRunningAppDomains() {
485      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
486        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
487          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
488          Thread finThread = new Thread(new ParameterizedThreadStart(SendFinishedJob));
489          finThread.Start(execKVP.Value.JobId);
490        }
491      }
492    }
493
494    #endregion
495
496    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
497    private void OnExceptionOccured(Exception e) {
498      Logger.Error("Error: " + e.ToString());
499      var handler = ExceptionOccured;
500      if (handler != null) handler(this, new EventArgs<Exception>(e));
501    }
502
503    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
504      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
505      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
506    }
507
508    /// <summary>
509    /// Kill a appdomain with a specific id.
510    /// </summary>
511    /// <param name="id">the GUID of the job</param>
512    private void KillAppDomain(Guid id) {
513      Logger.Debug("Shutting down Appdomain for Job " + id);
514      lock (engines) {
515        try {
516          if (engines.ContainsKey(id))
517            engines[id].Dispose();
518          if (appDomains.ContainsKey(id)) {
519            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
520
521            int repeat = 5;
522            while (repeat > 0) {
523              try {
524                AppDomain.Unload(appDomains[id]);
525                repeat = 0;
526              }
527              catch (CannotUnloadAppDomainException) {
528                Logger.Error("Could not unload AppDomain, will try again in 1 sec.");
529                Thread.Sleep(1000);
530                repeat--;
531                if (repeat == 0) {
532                  throw; // rethrow and let app crash
533                }
534              }
535            }
536            appDomains.Remove(id);
537          }
538
539          engines.Remove(id);
540          jobs.Remove(id);
541          PluginCache.Instance.DeletePluginsForJob(id);
542          GC.Collect();
543        }
544        catch (Exception ex) {
545          Logger.Error("Exception when unloading the appdomain: ", ex);
546        }
547      }
548      GC.Collect();
549    }
550  }
551
552
553}
Note: See TracBrowser for help on using the repository browser.