Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/Core.cs @ 4810

Last change on this file since 4810 was 4810, checked in by cneumuel, 13 years ago

#1260

  • changed dependency discovery machanism: now all locally loaded plugins will be dependencies for a job.
  • fixed logging of slaveconsole by limiting the maximum log-messages
  • minor bug fixes.
File size: 21.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts.ResponseObjects;
31using HeuristicLab.Hive.Slave.Common;
32using HeuristicLab.Hive.Slave.Communication;
33using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
34using HeuristicLab.Hive.Slave.Core.ConfigurationManager;
35using HeuristicLab.Hive.Slave.Core.JobStorage;
36using HeuristicLab.Hive.Slave.Core.SlaveConsoleService;
37using HeuristicLab.Hive.Slave.ExecutionEngine;
38
39namespace HeuristicLab.Hive.Slave.Core {
40  /// <summary>
41  /// The core component of the Hive Client
42  /// </summary>
43  public class Core : MarshalByRefObject {
44    public static bool abortRequested { get; set; }
45
46    public static ILog Log { get; set; }
47
48    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
49    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
50    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
51
52    private WcfService wcfService;
53    private HeartbeatManager heartbeatManager;
54
55    private bool currentlyFetching;
56    private bool CurrentlyFetching {
57      get {
58        return currentlyFetching;
59      }
60      set {
61        currentlyFetching = value;
62        Logger.Debug("Set CurrentlyFetching to " + currentlyFetching);
63      }
64    }
65
66    public Dictionary<Guid, Executor> ExecutionEngines {
67      get { return engines; }
68    }
69
70    internal Dictionary<Guid, JobDto> Jobs {
71      get { return jobs; }
72    }
73
74    /// <summary>
75    /// Main Method for the client
76    /// </summary>
77    public void Start() {
78      abortRequested = false;
79      Logger.Info("Hive Slave started");
80      SlaveConsoleServer server = new SlaveConsoleServer();
81      server.Start();
82
83      ConfigManager manager = ConfigManager.Instance;
84      manager.Core = this;
85
86      wcfService = WcfService.Instance;
87      RegisterServiceEvents();
88
89      RecoverSettings(); // recover server IP from the settings framework
90      StartHeartbeats(); // Start heartbeats thread
91      DispatchMessageQueue(); // dispatch messages until abortRequested
92
93      DeRegisterServiceEvents();
94      server.Close();
95      Logger.Info("Program shutdown");
96    }
97
98    private void RecoverSettings() {
99      ConnectionContainer cc = ConfigManager.Instance.GetServerIP();
100      if (cc.IPAdress != String.Empty) {
101        wcfService.ServerIp = cc.IPAdress;
102      }
103    }
104
105    private void StartHeartbeats() {
106      //Initialize the heartbeat
107      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
108      heartbeatManager.StartHeartbeat();
109    }
110
111    private void DispatchMessageQueue() {
112      MessageQueue queue = MessageQueue.GetInstance();
113      while (!abortRequested) {
114        MessageContainer container = queue.GetMessage();
115        DetermineAction(container);
116      }
117    }
118
119    private void RegisterServiceEvents() {
120      wcfService.GetJobCompleted += new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
121      wcfService.GetFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
122      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
123      wcfService.Connected += new EventHandler(wcfService_Connected);
124    }
125
126    private void DeRegisterServiceEvents() {
127      wcfService.GetJobCompleted -= new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
128      wcfService.GetFinishedJobResultCompleted -= new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
129      wcfService.ProcessSnapshotCompleted -= new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
130      wcfService.Connected -= new EventHandler(wcfService_Connected);
131    }
132
133    /// <summary>
134    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
135    /// </summary>
136    /// <param name="container">The Container, containing the message</param>
137    private void DetermineAction(MessageContainer container) {
138      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
139      switch (container.Message) {
140        //Server requests to abort a job
141        case MessageContainer.MessageType.AbortJob:
142          if (engines.ContainsKey(container.JobId))
143            try {
144              engines[container.JobId].Abort();
145            }
146            catch (AppDomainUnloadedException) {
147              // appdomain already unloaded. Finishing job probably ongoing
148            }
149          else
150            Logger.Error("AbortJob: Engine doesn't exist");
151          break;
152
153        //Job has been successfully aborted
154        case MessageContainer.MessageType.JobAborted:
155          Guid jobId = new Guid(container.JobId.ToString());
156          KillAppDomain(jobId);
157          break;
158
159        //Request a Snapshot from the Execution Engine
160        case MessageContainer.MessageType.RequestSnapshot:
161          if (engines.ContainsKey(container.JobId))
162            engines[container.JobId].RequestSnapshot();
163          else
164            Logger.Error("RequestSnapshot: Engine with Job doesn't exist");
165          break;
166
167        //Snapshot is ready and can be sent back to the Server
168        case MessageContainer.MessageType.SnapshotReady:
169          GetSnapshot(container.JobId);
170          break;
171
172        //Pull a Job from the Server
173        case MessageContainer.MessageType.FetchJob:
174          if (!CurrentlyFetching) {
175            wcfService.GetJobAsync(ConfigManager.Instance.GetClientInfo().Id);
176            CurrentlyFetching = true;
177          } else
178            Logger.Info("Currently fetching, won't fetch this time!");
179          break;
180
181        //A Job has finished and can be sent back to the server
182        case MessageContainer.MessageType.FinishedJob:
183          SendFinishedJob(container.JobId);
184          break;
185
186        case MessageContainer.MessageType.JobFailed:
187          SendFinishedJob(container.JobId);
188          break;
189
190        //When the timeslice is up
191        case MessageContainer.MessageType.UptimeLimitDisconnect:
192          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
193          ShutdownRunningJobsAndSubmitSnapshots();
194          break;
195
196        //Fetch or Force Fetch Calendar!
197        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
198          Logger.Info("Fetch Calendar from Server");
199          FetchCalendarFromServer();
200          break;
201
202        //Hard shutdown of the client
203        case MessageContainer.MessageType.Shutdown:
204          Logger.Info("Shutdown Signal received");
205          lock (engines) {
206            Logger.Debug("engines locked");
207            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
208              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
209              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
210              AppDomain.Unload(kvp.Value);
211            }
212          }
213          Logger.Debug("Stopping heartbeat");
214          abortRequested = true;
215          heartbeatManager.StopHeartBeat();
216          Logger.Debug("Logging out");
217          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
218          break;
219
220        case MessageContainer.MessageType.AddChildJob:
221          AddChildJob((MessageContainerWithJob)container);
222          break;
223
224        case MessageContainer.MessageType.PauseJob:
225          // send the job back to hive
226          PauseJob((MessageContainerWithJob)container);
227          break;
228
229        case MessageContainer.MessageType.GetChildJobs:
230          GetChildJobs((MessageContainerWithCallback<SerializedJobList>)container);
231          break;
232
233        case MessageContainer.MessageType.DeleteChildJobs:
234          wcfService.DeleteChildJobs(container.JobId);
235          break;
236      }
237    }
238
239    private void GetChildJobs(MessageContainerWithCallback<SerializedJobList> mc) {
240      ResponseObject<SerializedJobList> response = wcfService.GetChildJobs(mc.JobId);
241      if (response != null && response.StatusMessage == ResponseStatus.Ok) {
242        mc.Callback(response.Obj);
243      } else {
244        if (response != null) {
245          Logger.Error(string.Format("GetChildJobs failed: {0}", response.StatusMessage));
246        } else {
247          Logger.Error("GetChildJobs failed.");
248        }
249      }
250    }
251
252    private void PauseJob(MessageContainerWithJob mc) {
253      ResponseObject<JobDto> response = wcfService.PauseJob(mc.SerializedJob);
254      KillAppDomain(mc.JobId);
255      if (response == null || response.StatusMessage != ResponseStatus.Ok) {
256        Logger.Error("PauseJob failed: " + response.StatusMessage);
257      }
258    }
259
260    private ResponseObject<JobDto> AddChildJob(MessageContainerWithJob mc) {
261      ResponseObject<JobDto> response = wcfService.AddChildJob(mc.JobId, mc.SerializedJob);
262      if (response == null || response.StatusMessage != ResponseStatus.Ok) {
263        Logger.Error("AddChildJob failed: " + response.StatusMessage);
264      }
265      return response;
266    }
267
268    private void ShutdownRunningJobsAndSubmitSnapshots() {
269      //check if there are running jobs
270      if (engines.Count > 0) {
271        //make sure there is no more fetching of jobs while the snapshots get processed
272        CurrentlyFetching = true;
273        //request a snapshot of each running job
274        foreach (KeyValuePair<Guid, Executor> kvp in engines) {
275          kvp.Value.RequestSnapshot();
276        }
277      }
278    }
279
280    //Asynchronous Threads for interaction with the Execution Engine
281    #region Async Threads for the EE
282
283    /// <summary>
284    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
285    /// once the connection gets reestablished, the job gets submitted
286    /// </summary>
287    /// <param name="jobId"></param>
288    private void SendFinishedJob(object jobId) {
289      try {
290        Guid jId = (Guid)jobId;
291        Logger.Info("Getting the finished job with id: " + jId);
292        if (!engines.ContainsKey(jId)) {
293          Logger.Info("Engine doesn't exist");
294          return;
295        }
296
297        byte[] sJob = engines[jId].GetFinishedJob();
298       
299        try {
300          Logger.Info("Sending the finished job with id: " + jId);
301          wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, engines[jId].ExecutionTime, engines[jId].CurrentException, true);
302        }
303        catch (Exception e) {
304          Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
305          JobStorageManager.PersistObjectToDisc(wcfService.ServerIp, 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment
306        }
307        finally {
308          KillAppDomain(jId); // kill app-domain in every case
309        }
310      }
311      catch (Exception e) {
312        OnExceptionOccured(e);
313      }
314    }
315
316    private void GetSnapshot(object jobId) {
317      try {
318        Logger.Info("Fetching a snapshot for job " + jobId);
319        Guid jId = (Guid)jobId;
320        byte[] obj = engines[jId].GetSnapshot();
321        wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id, jId, obj, engines[jId].ExecutionTime, null);
322
323        //Uptime Limit reached, now is a good time to destroy this jobs.
324        Logger.Debug("Checking if uptime limit is reached");
325        if (!UptimeManager.Instance.IsAllowedToCalculate()) {
326          Logger.Debug("Uptime limit reached");
327          Logger.Debug("Killing Appdomain");
328          KillAppDomain(jId);
329          //Still anything running? 
330          if (engines.Count == 0) {
331            Logger.Info("All jobs snapshotted and sent back, disconnecting");
332            WcfService.Instance.Disconnect();
333          } else {
334            Logger.Debug("There are still active Jobs in the Field, not disconnecting");
335          }
336        } else {
337          Logger.Debug("Restarting the job" + jobId);
338          engines[jId].StartOnlyJob();
339          Logger.Info("Restarted the job" + jobId);
340        }
341      }
342      catch (Exception e) {
343        OnExceptionOccured(e);
344      }
345    }
346
347    #endregion
348
349    //Eventhandlers for the communication with the wcf Layer
350    #region wcfService Events
351    /// <summary>
352    /// Login has returned
353    /// </summary>
354    /// <param name="sender"></param>
355    /// <param name="e"></param>
356    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
357      if (e.Result.StatusMessage == ResponseStatus.Ok) {
358        CurrentlyFetching = false;
359        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
360      } else
361        Logger.Error("Error during login: " + e.Result.StatusMessage.ToString());
362    }
363
364    /// <summary>
365    /// A new Job from the wcfService has been received and will be started within a AppDomain.
366    /// </summary>
367    /// <param name="sender"></param>
368    /// <param name="e"></param>
369    void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) {
370      if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) {
371        Logger.Info("Received new job with id " + e.Result.Obj.Id);
372        Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
373        try {
374          PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded);
375          PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id);
376
377          Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
378          String pluginDir = Path.Combine(PluginCache.Instance.PluginRepositoryDir, e.Result.Obj.Id.ToString());
379
380          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null);
381          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
382          lock (engines) {
383            if (!jobs.ContainsKey(e.Result.Obj.Id)) {
384              jobs.Add(e.Result.Obj.Id, e.Result.Obj);
385              appDomains.Add(e.Result.Obj.Id, appDomain);
386              Logger.Debug("Creating AppDomain");
387              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
388              Logger.Debug("Created AppDomain");
389              engine.JobId = e.Result.Obj.Id;
390              engine.Queue = MessageQueue.GetInstance();
391              Logger.Debug("Starting Engine for job " + e.Result.Obj.Id);
392              engine.Start(e.Data);
393              engines.Add(e.Result.Obj.Id, engine);
394              SlaveStatusInfo.JobsFetched++;
395              Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
396            }
397          }
398          heartbeatManager.AwakeHeartBeatThread();
399        }
400        catch (Exception exception) {
401          Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id);
402          Logger.Error("Error thrown is: ", exception);
403          CurrentlyFetching = false;
404          KillAppDomain(e.Result.Obj.Id);
405          wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, e.Result.Obj.ExecutionTime, exception.ToString(), true);
406        }
407      } else {
408        Logger.Info("No more jobs left!");
409      }
410      CurrentlyFetching = false;
411    }
412
413    /// <summary>
414    /// A finished job has been stored on the server
415    /// </summary>
416    /// <param name="sender"></param>
417    /// <param name="e"></param>
418    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
419      Logger.Info("Job submitted with id " + e.Result.JobId);
420      KillAppDomain(e.Result.JobId);
421      if (e.Result.StatusMessage == ResponseStatus.Ok) {
422        SlaveStatusInfo.JobsProcessed++;
423        Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
424        heartbeatManager.AwakeHeartBeatThread();
425      } else {
426        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
427      }
428    }
429
430    /// <summary>
431    /// A snapshot has been stored on the server
432    /// </summary>
433    /// <param name="sender"></param>
434    /// <param name="e"></param>
435    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
436      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
437    }
438
439
440    /// <summary>
441    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
442    /// </summary>
443    /// <param name="sender"></param>
444    /// <param name="e"></param>
445    void wcfService_Connected(object sender, EventArgs e) {
446      Logger.Info("WCF Service got a connection");
447      if (!UptimeManager.Instance.CalendarAvailable) {
448        Logger.Info("No local calendar available, fetch it");
449        FetchCalendarFromServer();
450      }
451      Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsAllowedToCalculate());
452      CurrentlyFetching = false;
453      CheckRunningAppDomains();
454      JobStorageManager.CheckAndSubmitJobsFromDisc();
455    }
456
457    private void FetchCalendarFromServer() {
458      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
459      if (calres.StatusMessage == ResponseStatus.Ok) {
460        if (UptimeManager.Instance.SetAppointments(false, calres)) {
461          Logger.Info("Remote calendar installed");
462          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
463        } else {
464          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
465          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
466        }
467      } else {
468        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
469        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
470      }
471    }
472
473    private void CheckRunningAppDomains() {
474      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
475        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
476          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
477          Thread finThread = new Thread(new ParameterizedThreadStart(SendFinishedJob));
478          finThread.Start(execKVP.Value.JobId);
479        }
480      }
481    }
482
483    #endregion
484
485    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
486    private void OnExceptionOccured(Exception e) {
487      Logger.Error("Error: " + e.ToString());
488      var handler = ExceptionOccured;
489      if (handler != null) handler(this, new EventArgs<Exception>(e));
490    }
491
492    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
493      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
494    }
495
496    /// <summary>
497    /// Kill a appdomain with a specific id.
498    /// </summary>
499    /// <param name="id">the GUID of the job</param>
500    private void KillAppDomain(Guid id) {
501      Logger.Debug("Shutting down Appdomain for Job " + id);
502      lock (engines) {
503        try {
504          if (engines.ContainsKey(id))
505            engines[id].Dispose();
506          if (appDomains.ContainsKey(id)) {
507            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
508
509            int repeat = 5;
510            while (repeat > 0) {
511              try {
512                AppDomain.Unload(appDomains[id]);
513                repeat = 0;
514              }
515              catch (CannotUnloadAppDomainException) {
516                Logger.Error("Could not unload AppDomain, will try again in 1 sec.");
517                Thread.Sleep(1000);
518                repeat--;
519                if (repeat == 0) {
520                  throw; // rethrow and let app crash
521                }
522              }
523            }
524            appDomains.Remove(id);
525          }
526
527          engines.Remove(id);
528          jobs.Remove(id);
529          PluginCache.Instance.DeletePluginsForJob(id);
530          GC.Collect();
531        }
532        catch (Exception ex) {
533          Logger.Error("Exception when unloading the appdomain: ", ex);
534        }
535      }
536      GC.Collect();
537    }
538  }
539
540
541}
Note: See TracBrowser for help on using the repository browser.