Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/Core.cs @ 4267

Last change on this file since 4267 was 4267, checked in by cneumuel, 14 years ago

renamed all database entities from "Client" to "Slave" (#1157)
made slave-heartbeats synchronous, also they send HBs when timetable disallows them to calculate. they will appear on the server as Idle bis IsAllowedToCalculate will be false (#1159)

File size: 19.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Threading;
26using HeuristicLab.Core;
27using HeuristicLab.Hive.Contracts;
28using HeuristicLab.Hive.Contracts.BusinessObjects;
29using HeuristicLab.Hive.Contracts.ResponseObjects;
30using HeuristicLab.Hive.Slave.Common;
31using HeuristicLab.Hive.Slave.Communication;
32using HeuristicLab.Hive.Slave.Communication.ServerService;
33using HeuristicLab.Hive.Slave.Core.ClientConsoleService;
34using HeuristicLab.Hive.Slave.Core.ConfigurationManager;
35using HeuristicLab.Hive.Slave.Core.JobStorage;
36using HeuristicLab.Hive.Slave.ExecutionEngine;
37using HeuristicLab.Tracing;
38
39namespace HeuristicLab.Hive.Slave.Core {
40  /// <summary>
41  /// The core component of the Hive Client
42  /// </summary>
43  public class Core : MarshalByRefObject {
44    public static bool abortRequested { get; set; }
45
46    private bool _currentlyFetching;
47    private bool CurrentlyFetching {
48      get {
49        return _currentlyFetching;
50      }
51      set {
52        _currentlyFetching = value;
53        Logger.Debug("Set CurrentlyFetching to " + _currentlyFetching);
54      }
55    }
56
57    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
58    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
59    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
60
61    private WcfService wcfService;
62    private Heartbeat beat;
63
64    /// <summary>
65    /// Main Method for the client
66    /// </summary>
67    public void Start() {
68      abortRequested = false;
69      Logger.Info("Hive Slave started");
70      SlaveConsoleServer server = new SlaveConsoleServer();
71      server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/SlaveConsole/"));
72
73      ConfigManager manager = ConfigManager.Instance;
74      manager.Core = this;
75
76      //Register all Wcf Service references
77      wcfService = WcfService.Instance;
78      wcfService.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(wcfService_LoginCompleted);
79      wcfService.GetJobCompleted += new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
80      wcfService.GetFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
81      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
82      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
83      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
84      wcfService.Connected += new EventHandler(wcfService_Connected);
85
86      //Recover Server IP and Port from the Settings Framework
87      ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();
88      if (cc.IPAdress != String.Empty && cc.Port != 0)
89        wcfService.SetIPAndPort(cc.IPAdress, cc.Port);
90
91      //Initialize the heartbeat
92      beat = new Heartbeat { Interval = new TimeSpan(0, 0, 10) };
93      beat.StartHeartbeat();
94
95      MessageQueue queue = MessageQueue.GetInstance();
96
97      //Main processing loop     
98      //Todo: own thread for message handling
99      //Rly?!
100      while (!abortRequested) {
101        MessageContainer container = queue.GetMessage();
102        DetermineAction(container);
103      }
104      Logger.Info("Program shutdown");
105    }
106
107    /// <summary>
108    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
109    /// </summary>
110    /// <param name="container">The Container, containing the message</param>
111    private void DetermineAction(MessageContainer container) {
112      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
113      switch (container.Message) {
114        //Server requests to abort a job
115        case MessageContainer.MessageType.AbortJob:
116          if (engines.ContainsKey(container.JobId))
117            engines[container.JobId].Abort();
118          else
119            Logger.Error("AbortJob: Engine doesn't exist");
120          break;
121
122        //Job has been successfully aborted
123        case MessageContainer.MessageType.JobAborted:
124          Guid jobId = new Guid(container.JobId.ToString());
125          KillAppDomain(jobId);
126          break;
127
128        //Request a Snapshot from the Execution Engine
129        case MessageContainer.MessageType.RequestSnapshot:
130          if (engines.ContainsKey(container.JobId))
131            engines[container.JobId].RequestSnapshot();
132          else
133            Logger.Error("RequestSnapshot: Engine with Job doesn't exist");
134          break;
135
136        //Snapshot is ready and can be sent back to the Server
137        case MessageContainer.MessageType.SnapshotReady:
138          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
139          break;
140
141        //Pull a Job from the Server
142        case MessageContainer.MessageType.FetchJob:
143          if (!CurrentlyFetching) {
144            wcfService.GetJobAsync(ConfigManager.Instance.GetClientInfo().Id);
145            CurrentlyFetching = true;
146          } else
147            Logger.Info("Currently fetching, won't fetch this time!");
148          break;
149
150
151        //A Job has finished and can be sent back to the server
152        case MessageContainer.MessageType.FinishedJob:
153          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
154          break;
155
156
157        //When the timeslice is up
158        case MessageContainer.MessageType.UptimeLimitDisconnect:
159          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
160
161          ShutdownRunningJobsAndSubmitSnapshots();
162          break;
163
164        //Fetch or Force Fetch Calendar!
165        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
166          Logger.Info("Fetch Calendar from Server");
167          FetchCalendarFromServer();
168          break;
169
170        //Hard shutdown of the client
171        case MessageContainer.MessageType.Shutdown:
172          Logger.Info("Shutdown Signal received");
173          lock (engines) {
174            Logger.Debug("engines locked");
175            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
176              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
177              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
178              AppDomain.Unload(kvp.Value);
179            }
180          }
181          Logger.Debug("Stopping heartbeat");
182          abortRequested = true;
183          beat.StopHeartBeat();
184          Logger.Debug("Logging out");
185          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
186          break;
187      }
188    }
189
190    private void ShutdownRunningJobsAndSubmitSnapshots() {
191      //check if there are running jobs
192      if (engines.Count > 0) {
193        //make sure there is no more fetching of jobs while the snapshots get processed
194        CurrentlyFetching = true;
195        //request a snapshot of each running job
196        foreach (KeyValuePair<Guid, Executor> kvp in engines) {
197          kvp.Value.RequestSnapshot();
198        }
199
200      }
201    }
202
203    //Asynchronous Threads for interaction with the Execution Engine
204    #region Async Threads for the EE
205
206    /// <summary>
207    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
208    /// once the connection gets reestablished, the job gets submitted
209    /// </summary>
210    /// <param name="jobId"></param>
211    private void GetFinishedJob(object jobId) {
212      Guid jId = (Guid)jobId;
213      Logger.Info("Getting the finished job with id: " + jId);
214      try {
215        if (!engines.ContainsKey(jId)) {
216          Logger.Info("Engine doesn't exist");
217          return;
218        }
219
220        byte[] sJob = engines[jId].GetFinishedJob();
221
222        if (WcfService.Instance.LoggedIn) {
223          Logger.Info("Sending the finished job with id: " + jId);
224          wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, 1.0, engines[jId].CurrentException, true);
225        } else {
226          Logger.Info("Storing the finished job with id: " + jId + " to hdd");
227          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
228          KillAppDomain(jId);
229        }
230      }
231      catch (InvalidStateException ise) {
232        Logger.Error("Invalid State while Snapshoting:", ise);
233      }
234    }
235
236    private void GetSnapshot(object jobId) {
237      Logger.Info("Fetching a snapshot for job " + jobId);
238      Guid jId = (Guid)jobId;
239      byte[] obj = engines[jId].GetSnapshot();
240      Logger.Debug("BEGIN: Sending snapshot sync");
241      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
242        jId,
243        obj,
244        engines[jId].Progress,
245        null);
246      Logger.Debug("END: Sended snapshot sync");
247      //Uptime Limit reached, now is a good time to destroy this jobs.
248      Logger.Debug("Checking if uptime limit is reached");
249      if (!UptimeManager.Instance.IsAllowedToCalculate()) {
250        Logger.Debug("Uptime limit reached");
251        Logger.Debug("Killing Appdomain");
252        KillAppDomain(jId);
253        //Still anything running? 
254        if (engines.Count == 0) {
255          Logger.Info("All jobs snapshotted and sent back, disconnecting");
256          WcfService.Instance.Disconnect();
257        } else {
258          Logger.Debug("There are still active Jobs in the Field, not disconnecting");
259        }
260
261      } else {
262        Logger.Debug("Restarting the job" + jobId);
263        engines[jId].StartOnlyJob();
264        Logger.Info("Restarted the job" + jobId);
265      }
266    }
267
268    #endregion
269
270    //Eventhandlers for the communication with the wcf Layer
271    #region wcfService Events
272    /// <summary>
273    /// Login has returned
274    /// </summary>
275    /// <param name="sender"></param>
276    /// <param name="e"></param>
277    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
278      if (e.Result.StatusMessage == ResponseStatus.Ok) {
279        CurrentlyFetching = false;
280        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
281      } else
282        Logger.Error("Error during login: " + e.Result.StatusMessage.ToString());
283    }
284
285    /// <summary>
286    /// A new Job from the wcfService has been received and will be started within a AppDomain.
287    /// </summary>
288    /// <param name="sender"></param>
289    /// <param name="e"></param>
290    void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) {
291      if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) {
292        Logger.Info("Received new job with id " + e.Result.Obj.Id);
293        bool sandboxed = false;
294        Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
295
296        PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded);
297
298        PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id);
299
300        //        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
301        //        files.AddRange(plugininfo.PluginFiles);
302        Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
303        try {
304          String pluginDir = Path.Combine(PluginCache.PLUGIN_REPO, e.Result.Obj.Id.ToString());
305
306          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null);
307          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
308          lock (engines) {
309            if (!jobs.ContainsKey(e.Result.Obj.Id)) {
310              jobs.Add(e.Result.Obj.Id, e.Result.Obj);
311              appDomains.Add(e.Result.Obj.Id, appDomain);
312              Logger.Debug("Creating AppDomain");
313              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
314              Logger.Debug("Created AppDomain");
315              engine.JobId = e.Result.Obj.Id;
316              engine.Queue = MessageQueue.GetInstance();
317              Logger.Debug("Starting Engine for job " + e.Result.Obj.Id);
318              engine.Start(e.Data);
319              engines.Add(e.Result.Obj.Id, engine);
320
321              SlaveStatusInfo.JobsFetched++;
322              Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
323            }
324          }
325        }
326        catch (Exception exception) {
327          Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id);
328          Logger.Error("Error thrown is: ", exception);
329          CurrentlyFetching = false;
330          KillAppDomain(e.Result.Obj.Id);
331          wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, 1, exception, true);
332        }
333      } else {
334        Logger.Info("No more jobs left!");
335      }
336      CurrentlyFetching = false;
337    }
338
339    /// <summary>
340    /// A finished job has been stored on the server
341    /// </summary>
342    /// <param name="sender"></param>
343    /// <param name="e"></param>
344    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
345      Logger.Info("Job submitted with id " + e.Result.JobId);
346      KillAppDomain(e.Result.JobId);
347      if (e.Result.StatusMessage == ResponseStatus.Ok) {
348        SlaveStatusInfo.JobsProcessed++;
349        Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
350      } else {
351        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
352      }
353    }
354
355    /// <summary>
356    /// A snapshot has been stored on the server
357    /// </summary>
358    /// <param name="sender"></param>
359    /// <param name="e"></param>
360    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
361      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
362    }
363
364    /// <summary>
365    /// The server has been changed. All Appdomains and Jobs must be aborted!
366    /// </summary>
367    /// <param name="sender"></param>
368    /// <param name="e"></param>
369    void wcfService_ServerChanged(object sender, EventArgs e) {
370      Logger.Info("ServerChanged has been called");
371      lock (engines) {
372        foreach (KeyValuePair<Guid, Executor> entries in engines) {
373          engines[entries.Key].Abort();
374          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
375          //AppDomain.Unload(appDomains[entries.Key]);
376        }
377        //appDomains = new Dictionary<Guid, AppDomain>();
378        //engines = new Dictionary<Guid, Executor>();
379        //jobs = new Dictionary<Guid, Job>();
380      }
381    }
382
383    /// <summary>
384    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
385    /// </summary>
386    /// <param name="sender"></param>
387    /// <param name="e"></param>
388    void wcfService_Connected(object sender, EventArgs e) {
389      Logger.Info("WCF Service got a connection");
390      if (!UptimeManager.Instance.CalendarAvailable) {
391        Logger.Info("No local calendar available, fetch it");
392        FetchCalendarFromServer();
393      }
394      //if the fetching from the server failed - still set the client online... maybe we get
395      //a result within the next few heartbeats     
396      //if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
397        Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsAllowedToCalculate());
398        Logger.Info("Setting client online");
399        wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
400        JobStorageManager.CheckAndSubmitJobsFromDisc();
401        CurrentlyFetching = false;
402      //}
403    }
404
405    private void FetchCalendarFromServer() {
406      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
407      if (calres.StatusMessage == ResponseStatus.Ok) {
408        if (UptimeManager.Instance.SetAppointments(false, calres)) {
409          Logger.Info("Remote calendar installed");
410          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
411        } else {
412          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
413          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
414        }
415      } else {
416        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
417        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
418      }
419    }
420
421    //this is a little bit tricky -
422    void wcfService_ConnectionRestored(object sender, EventArgs e) {
423      Logger.Info("Reconnected to old server - checking currently running appdomains");
424
425      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
426        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
427          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
428          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
429          finThread.Start(execKVP.Value.JobId);
430        }
431      }
432    }
433
434    #endregion
435
436    public Dictionary<Guid, Executor> GetExecutionEngines() {
437      return engines;
438    }
439
440    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
441      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
442    }
443
444    internal Dictionary<Guid, JobDto> GetJobs() {
445      return jobs;
446    }
447
448    /// <summary>
449    /// Kill a appdomain with a specific id.
450    /// </summary>
451    /// <param name="id">the GUID of the job</param>
452    private void KillAppDomain(Guid id) {
453      Logger.Debug("Shutting down Appdomain for Job " + id);
454      lock (engines) {
455        try {
456          if (engines.ContainsKey(id))
457            engines[id].Dispose();
458          if (appDomains.ContainsKey(id)) {
459            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
460            AppDomain.Unload(appDomains[id]);
461            appDomains.Remove(id);
462          }
463          engines.Remove(id);
464          jobs.Remove(id);
465          PluginCache.Instance.DeletePluginsForJob(id);
466          GC.Collect();
467        }
468        catch (Exception ex) {
469          Logger.Error("Exception when unloading the appdomain: ", ex);
470        }
471      }
472      GC.Collect();
473    }
474  }
475
476
477}
Note: See TracBrowser for help on using the repository browser.