Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/Core.cs @ 4320

Last change on this file since 4320 was 4320, checked in by cneumuel, 14 years ago

made slave-console service configurable via xml

File size: 19.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Threading;
26using HeuristicLab.Core;
27using HeuristicLab.Hive.Contracts;
28using HeuristicLab.Hive.Contracts.BusinessObjects;
29using HeuristicLab.Hive.Contracts.ResponseObjects;
30using HeuristicLab.Hive.Slave.Common;
31using HeuristicLab.Hive.Slave.Communication;
32using HeuristicLab.Hive.Slave.Core.SlaveConsoleService;
33using HeuristicLab.Hive.Slave.Core.ConfigurationManager;
34using HeuristicLab.Hive.Slave.Core.JobStorage;
35using HeuristicLab.Hive.Slave.ExecutionEngine;
36using HeuristicLab.Tracing;
37using HeuristicLab.Hive.Slave.Communication.SlaveService;
38
39namespace HeuristicLab.Hive.Slave.Core {
40  /// <summary>
41  /// The core component of the Hive Client
42  /// </summary>
43  public class Core : MarshalByRefObject {
44    public static bool abortRequested { get; set; }
45
46    private bool _currentlyFetching;
47    private bool CurrentlyFetching {
48      get {
49        return _currentlyFetching;
50      }
51      set {
52        _currentlyFetching = value;
53        Logger.Debug("Set CurrentlyFetching to " + _currentlyFetching);
54      }
55    }
56
57    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
58    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
59    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
60
61    private WcfService wcfService;
62    private HeartbeatManager beat;
63
64    /// <summary>
65    /// Main Method for the client
66    /// </summary>
67    public void Start() {
68      abortRequested = false;
69      Logger.Info("Hive Slave started");
70      SlaveConsoleServer server = new SlaveConsoleServer();
71      server.StartClientConsoleServer();
72     
73      ConfigManager manager = ConfigManager.Instance;
74      manager.Core = this;
75
76      //Register all Wcf Service references
77      wcfService = WcfService.Instance;
78      wcfService.GetJobCompleted += new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
79      wcfService.GetFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
80      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
81      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
82      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
83      wcfService.Connected += new EventHandler(wcfService_Connected);
84
85      //Recover Server IP and Port from the Settings Framework
86      ConnectionContainer cc = ConfigManager.Instance.GetServerIP();
87      if (cc.IPAdress != String.Empty)
88        wcfService.SetIP(cc.IPAdress);
89
90      //Initialize the heartbeat
91      beat = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
92      beat.StartHeartbeat();
93
94      MessageQueue queue = MessageQueue.GetInstance();
95
96      //Main processing loop     
97      //Todo: own thread for message handling
98      //Rly?!
99      while (!abortRequested) {
100        MessageContainer container = queue.GetMessage();
101        DetermineAction(container);
102      }
103      Logger.Info("Program shutdown");
104    }
105
106    /// <summary>
107    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
108    /// </summary>
109    /// <param name="container">The Container, containing the message</param>
110    private void DetermineAction(MessageContainer container) {
111      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
112      switch (container.Message) {
113        //Server requests to abort a job
114        case MessageContainer.MessageType.AbortJob:
115          if (engines.ContainsKey(container.JobId))
116            engines[container.JobId].Abort();
117          else
118            Logger.Error("AbortJob: Engine doesn't exist");
119          break;
120
121        //Job has been successfully aborted
122        case MessageContainer.MessageType.JobAborted:
123          Guid jobId = new Guid(container.JobId.ToString());
124          KillAppDomain(jobId);
125          break;
126
127        //Request a Snapshot from the Execution Engine
128        case MessageContainer.MessageType.RequestSnapshot:
129          if (engines.ContainsKey(container.JobId))
130            engines[container.JobId].RequestSnapshot();
131          else
132            Logger.Error("RequestSnapshot: Engine with Job doesn't exist");
133          break;
134
135        //Snapshot is ready and can be sent back to the Server
136        case MessageContainer.MessageType.SnapshotReady:
137          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
138          break;
139
140        //Pull a Job from the Server
141        case MessageContainer.MessageType.FetchJob:
142          if (!CurrentlyFetching) {
143            wcfService.GetJobAsync(ConfigManager.Instance.GetClientInfo().Id);
144            CurrentlyFetching = true;
145          } else
146            Logger.Info("Currently fetching, won't fetch this time!");
147          break;
148
149
150        //A Job has finished and can be sent back to the server
151        case MessageContainer.MessageType.FinishedJob:
152          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
153          break;
154
155
156        //When the timeslice is up
157        case MessageContainer.MessageType.UptimeLimitDisconnect:
158          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
159
160          ShutdownRunningJobsAndSubmitSnapshots();
161          break;
162
163        //Fetch or Force Fetch Calendar!
164        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
165          Logger.Info("Fetch Calendar from Server");
166          FetchCalendarFromServer();
167          break;
168
169        //Hard shutdown of the client
170        case MessageContainer.MessageType.Shutdown:
171          Logger.Info("Shutdown Signal received");
172          lock (engines) {
173            Logger.Debug("engines locked");
174            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
175              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
176              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
177              AppDomain.Unload(kvp.Value);
178            }
179          }
180          Logger.Debug("Stopping heartbeat");
181          abortRequested = true;
182          beat.StopHeartBeat();
183          Logger.Debug("Logging out");
184          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
185          break;
186      }
187    }
188
189    private void ShutdownRunningJobsAndSubmitSnapshots() {
190      //check if there are running jobs
191      if (engines.Count > 0) {
192        //make sure there is no more fetching of jobs while the snapshots get processed
193        CurrentlyFetching = true;
194        //request a snapshot of each running job
195        foreach (KeyValuePair<Guid, Executor> kvp in engines) {
196          kvp.Value.RequestSnapshot();
197        }
198      }
199    }
200
201    //Asynchronous Threads for interaction with the Execution Engine
202    #region Async Threads for the EE
203
204    /// <summary>
205    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
206    /// once the connection gets reestablished, the job gets submitted
207    /// </summary>
208    /// <param name="jobId"></param>
209    private void GetFinishedJob(object jobId) {
210      Guid jId = (Guid)jobId;
211      Logger.Info("Getting the finished job with id: " + jId);
212      try {
213        if (!engines.ContainsKey(jId)) {
214          Logger.Info("Engine doesn't exist");
215          return;
216        }
217
218        byte[] sJob = engines[jId].GetFinishedJob();
219
220        if (WcfService.Instance.LoggedIn) {
221          Logger.Info("Sending the finished job with id: " + jId);
222          wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, 1.0, engines[jId].CurrentException, true);
223        } else {
224          Logger.Info("Storing the finished job with id: " + jId + " to hdd");
225          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment
226          KillAppDomain(jId);
227        }
228      }
229      catch (InvalidStateException ise) {
230        Logger.Error("Invalid State while Snapshoting:", ise);
231      }
232    }
233
234    private void GetSnapshot(object jobId) {
235      Logger.Info("Fetching a snapshot for job " + jobId);
236      Guid jId = (Guid)jobId;
237      byte[] obj = engines[jId].GetSnapshot();
238      Logger.Debug("BEGIN: Sending snapshot sync");
239      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
240        jId,
241        obj,
242        engines[jId].Progress,
243        null);
244      Logger.Debug("END: Sended snapshot sync");
245      //Uptime Limit reached, now is a good time to destroy this jobs.
246      Logger.Debug("Checking if uptime limit is reached");
247      if (!UptimeManager.Instance.IsAllowedToCalculate()) {
248        Logger.Debug("Uptime limit reached");
249        Logger.Debug("Killing Appdomain");
250        KillAppDomain(jId);
251        //Still anything running? 
252        if (engines.Count == 0) {
253          Logger.Info("All jobs snapshotted and sent back, disconnecting");
254          WcfService.Instance.Disconnect();
255        } else {
256          Logger.Debug("There are still active Jobs in the Field, not disconnecting");
257        }
258
259      } else {
260        Logger.Debug("Restarting the job" + jobId);
261        engines[jId].StartOnlyJob();
262        Logger.Info("Restarted the job" + jobId);
263      }
264    }
265
266    #endregion
267
268    //Eventhandlers for the communication with the wcf Layer
269    #region wcfService Events
270    /// <summary>
271    /// Login has returned
272    /// </summary>
273    /// <param name="sender"></param>
274    /// <param name="e"></param>
275    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
276      if (e.Result.StatusMessage == ResponseStatus.Ok) {
277        CurrentlyFetching = false;
278        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
279      } else
280        Logger.Error("Error during login: " + e.Result.StatusMessage.ToString());
281    }
282
283    /// <summary>
284    /// A new Job from the wcfService has been received and will be started within a AppDomain.
285    /// </summary>
286    /// <param name="sender"></param>
287    /// <param name="e"></param>
288    void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) {
289      if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) {
290        Logger.Info("Received new job with id " + e.Result.Obj.Id);
291        bool sandboxed = false;
292        Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
293        try {
294
295          PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded);
296          PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id);
297
298          //        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
299          //        files.AddRange(plugininfo.PluginFiles);
300          Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
301          String pluginDir = Path.Combine(PluginCache.Instance.PluginRepositoryDir, e.Result.Obj.Id.ToString());
302
303          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null);
304          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
305          lock (engines) {
306            if (!jobs.ContainsKey(e.Result.Obj.Id)) {
307              jobs.Add(e.Result.Obj.Id, e.Result.Obj);
308              appDomains.Add(e.Result.Obj.Id, appDomain);
309              Logger.Debug("Creating AppDomain");
310              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
311              Logger.Debug("Created AppDomain");
312              engine.JobId = e.Result.Obj.Id;
313              engine.Queue = MessageQueue.GetInstance();
314              Logger.Debug("Starting Engine for job " + e.Result.Obj.Id);
315              engine.Start(e.Data);
316              engines.Add(e.Result.Obj.Id, engine);
317
318              SlaveStatusInfo.JobsFetched++;
319              Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
320            }
321          }
322          beat.InterruptHeartBeatThread();
323        }
324        catch (Exception exception) {
325          Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id);
326          Logger.Error("Error thrown is: ", exception);
327          CurrentlyFetching = false;
328          KillAppDomain(e.Result.Obj.Id);
329          wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, 1, exception, true);
330        }
331      } else {
332        Logger.Info("No more jobs left!");
333      }
334      CurrentlyFetching = false;
335    }
336
337    /// <summary>
338    /// A finished job has been stored on the server
339    /// </summary>
340    /// <param name="sender"></param>
341    /// <param name="e"></param>
342    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
343      Logger.Info("Job submitted with id " + e.Result.JobId);
344      KillAppDomain(e.Result.JobId);
345      if (e.Result.StatusMessage == ResponseStatus.Ok) {
346        SlaveStatusInfo.JobsProcessed++;
347        Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
348        beat.InterruptHeartBeatThread();
349      } else {
350        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
351      }
352    }
353
354    /// <summary>
355    /// A snapshot has been stored on the server
356    /// </summary>
357    /// <param name="sender"></param>
358    /// <param name="e"></param>
359    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
360      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
361    }
362
363    /// <summary>
364    /// The server has been changed. All Appdomains and Jobs must be aborted!
365    /// </summary>
366    /// <param name="sender"></param>
367    /// <param name="e"></param>
368    void wcfService_ServerChanged(object sender, EventArgs e) {
369      Logger.Info("ServerChanged has been called");
370      lock (engines) {
371        foreach (KeyValuePair<Guid, Executor> entries in engines) {
372          engines[entries.Key].Abort();
373          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
374          //AppDomain.Unload(appDomains[entries.Key]);
375        }
376        //appDomains = new Dictionary<Guid, AppDomain>();
377        //engines = new Dictionary<Guid, Executor>();
378        //jobs = new Dictionary<Guid, Job>();
379      }
380    }
381
382    /// <summary>
383    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
384    /// </summary>
385    /// <param name="sender"></param>
386    /// <param name="e"></param>
387    void wcfService_Connected(object sender, EventArgs e) {
388      Logger.Info("WCF Service got a connection");
389      if (!UptimeManager.Instance.CalendarAvailable) {
390        Logger.Info("No local calendar available, fetch it");
391        FetchCalendarFromServer();
392      }
393      //if the fetching from the server failed - still set the client online... maybe we get
394      //a result within the next few heartbeats     
395      //if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
396      Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsAllowedToCalculate());
397      Logger.Info("Setting client online");
398      wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
399      JobStorageManager.CheckAndSubmitJobsFromDisc();
400      CurrentlyFetching = false;
401      //}
402    }
403
404    private void FetchCalendarFromServer() {
405      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
406      if (calres.StatusMessage == ResponseStatus.Ok) {
407        if (UptimeManager.Instance.SetAppointments(false, calres)) {
408          Logger.Info("Remote calendar installed");
409          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
410        } else {
411          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
412          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
413        }
414      } else {
415        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
416        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
417      }
418    }
419
420    //this is a little bit tricky -
421    void wcfService_ConnectionRestored(object sender, EventArgs e) {
422      Logger.Info("Reconnected to old server - checking currently running appdomains");
423
424      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
425        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
426          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
427          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
428          finThread.Start(execKVP.Value.JobId);
429        }
430      }
431    }
432
433    #endregion
434
435    public Dictionary<Guid, Executor> GetExecutionEngines() {
436      return engines;
437    }
438
439    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
440      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
441    }
442
443    internal Dictionary<Guid, JobDto> GetJobs() {
444      return jobs;
445    }
446
447    /// <summary>
448    /// Kill a appdomain with a specific id.
449    /// </summary>
450    /// <param name="id">the GUID of the job</param>
451    private void KillAppDomain(Guid id) {
452      Logger.Debug("Shutting down Appdomain for Job " + id);
453      lock (engines) {
454        try {
455          if (engines.ContainsKey(id))
456            engines[id].Dispose();
457          if (appDomains.ContainsKey(id)) {
458            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
459            AppDomain.Unload(appDomains[id]);
460            appDomains.Remove(id);
461          }
462          engines.Remove(id);
463          jobs.Remove(id);
464          PluginCache.Instance.DeletePluginsForJob(id);
465          GC.Collect();
466        }
467        catch (Exception ex) {
468          Logger.Error("Exception when unloading the appdomain: ", ex);
469        }
470      }
471      GC.Collect();
472    }
473  }
474
475
476}
Note: See TracBrowser for help on using the repository browser.