Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/Core.cs @ 4254

Last change on this file since 4254 was 4254, checked in by cneumuel, 14 years ago

some small refactorings (#1159)

File size: 19.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Slave.ExecutionEngine;
27using HeuristicLab.Hive.Slave.Common;
28using System.Threading;
29using System.Reflection;
30using System.Diagnostics;
31using System.Security.Permissions;
32using System.Security.Policy;
33using System.Security;
34using HeuristicLab.Hive.Slave.Communication;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Hive.Contracts;
37using System.Runtime.Remoting.Messaging;
38using HeuristicLab.PluginInfrastructure;
39using System.ServiceModel;
40using System.ServiceModel.Description;
41using HeuristicLab.Hive.Slave.Core.ClientConsoleService;
42using HeuristicLab.Hive.Slave.Core.ConfigurationManager;
43using HeuristicLab.Hive.Slave.Communication.ServerService;
44using HeuristicLab.Hive.JobBase;
45using HeuristicLab.Hive.Slave.Core.JobStorage;
46using HeuristicLab.Tracing;
47using HeuristicLab.Core;
48using System.IO;
49
50namespace HeuristicLab.Hive.Slave.Core {
51  /// <summary>
52  /// The core component of the Hive Client
53  /// </summary>
54  public class Core : MarshalByRefObject {
55    public static bool abortRequested { get; set; }
56
57    private bool _currentlyFetching;
58    private bool CurrentlyFetching {
59      get {
60        return _currentlyFetching;
61      }
62      set {
63        _currentlyFetching = value;
64        Logger.Debug("Set CurrentlyFetching to " + _currentlyFetching);
65      }
66    }
67
68    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
69    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
70    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
71
72    private WcfService wcfService;
73    private Heartbeat beat;
74
75    /// <summary>
76    /// Main Method for the client
77    /// </summary>
78    public void Start() {
79      abortRequested = false;
80      Logger.Info("Hive Slave started");
81      SlaveConsoleServer server = new SlaveConsoleServer();
82      server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/SlaveConsole/"));
83
84      ConfigManager manager = ConfigManager.Instance;
85      manager.Core = this;
86
87      //Register all Wcf Service references
88      wcfService = WcfService.Instance;
89      wcfService.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(wcfService_LoginCompleted);
90      wcfService.SendJobCompleted += new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
91      wcfService.StoreFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
92      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
93      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
94      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
95      wcfService.Connected += new EventHandler(wcfService_Connected);
96
97      //Recover Server IP and Port from the Settings Framework
98      ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();
99      if (cc.IPAdress != String.Empty && cc.Port != 0)
100        wcfService.SetIPAndPort(cc.IPAdress, cc.Port);
101
102      //Initialize the heartbeat
103      beat = new Heartbeat { Interval = 10000 };
104      beat.StartHeartbeat();
105
106      MessageQueue queue = MessageQueue.GetInstance();
107
108      //Main processing loop     
109      //Todo: own thread for message handling
110      //Rly?!
111      while (!abortRequested) {
112        MessageContainer container = queue.GetMessage();
113        DetermineAction(container);
114      }
115      Logger.Info("Program shutdown");
116    }
117
118    /// <summary>
119    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
120    /// </summary>
121    /// <param name="container">The Container, containing the message</param>
122    private void DetermineAction(MessageContainer container) {
123      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
124      switch (container.Message) {
125        //Server requests to abort a job
126        case MessageContainer.MessageType.AbortJob:
127          if (engines.ContainsKey(container.JobId))
128            engines[container.JobId].Abort();
129          else
130            Logger.Error("AbortJob: Engine doesn't exist");
131          break;
132
133        //Job has been successfully aborted
134        case MessageContainer.MessageType.JobAborted:
135          Guid jobId = new Guid(container.JobId.ToString());
136          KillAppDomain(jobId);
137          break;
138
139        //Request a Snapshot from the Execution Engine
140        case MessageContainer.MessageType.RequestSnapshot:
141          if (engines.ContainsKey(container.JobId))
142            engines[container.JobId].RequestSnapshot();
143          else
144            Logger.Error("RequestSnapshot: Engine with Job doesn't exist");
145          break;
146
147        //Snapshot is ready and can be sent back to the Server
148        case MessageContainer.MessageType.SnapshotReady:
149          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
150          break;
151
152        //Pull a Job from the Server
153        case MessageContainer.MessageType.FetchJob:
154          if (!CurrentlyFetching) {
155            wcfService.SendJobAsync(ConfigManager.Instance.GetClientInfo().Id);
156            CurrentlyFetching = true;
157          } else
158            Logger.Info("Currently fetching, won't fetch this time!");
159          break;
160
161
162        //A Job has finished and can be sent back to the server
163        case MessageContainer.MessageType.FinishedJob:
164          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
165          break;
166
167
168        //When the timeslice is up
169        case MessageContainer.MessageType.UptimeLimitDisconnect:
170          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
171
172          //check if there are running jobs
173          if (engines.Count > 0) {
174            //make sure there is no more fetching of jobs while the snapshots get processed
175            CurrentlyFetching = true;
176            //request a snapshot of each running job
177            foreach (KeyValuePair<Guid, Executor> kvp in engines) {
178              kvp.Value.RequestSnapshot();
179            }
180
181          } else {
182            //Disconnect afterwards
183            WcfService.Instance.Disconnect();
184          }
185          break;
186
187        //Fetch or Force Fetch Calendar!
188        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
189          Logger.Info("Fetch Calendar from Server");
190          FetchCalendarFromServer();
191          break;
192
193        //Hard shutdown of the client
194        case MessageContainer.MessageType.Shutdown:
195          Logger.Info("Shutdown Signal received");
196          lock (engines) {
197            Logger.Debug("engines locked");
198            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
199              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
200              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
201              AppDomain.Unload(kvp.Value);
202            }
203          }
204          Logger.Debug("Stopping heartbeat");
205          abortRequested = true;
206          beat.StopHeartBeat();
207          Logger.Debug("Logging out");
208          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
209          break;
210      }
211    }
212
213    //Asynchronous Threads for interaction with the Execution Engine
214    #region Async Threads for the EE
215
216    /// <summary>
217    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
218    /// once the connection gets reestablished, the job gets submitted
219    /// </summary>
220    /// <param name="jobId"></param>
221    private void GetFinishedJob(object jobId) {
222      Guid jId = (Guid)jobId;
223      Logger.Info("Getting the finished job with id: " + jId);
224      try {
225        if (!engines.ContainsKey(jId)) {
226          Logger.Info("Engine doesn't exist");
227          return;
228        }
229
230        byte[] sJob = engines[jId].GetFinishedJob();
231
232        if (WcfService.Instance.ConnState == NetworkEnum.WcfConnState.Loggedin) {
233          Logger.Info("Sending the finished job with id: " + jId);
234          wcfService.StoreFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, 1.0, engines[jId].CurrentException, true);
235        } else {
236          Logger.Info("Storing the finished job with id: " + jId + " to hdd");
237          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
238          KillAppDomain(jId);
239        }
240      }
241      catch (InvalidStateException ise) {
242        Logger.Error("Invalid State while Snapshoting:", ise);
243      }
244    }
245
246    private void GetSnapshot(object jobId) {
247      Logger.Info("Fetching a snapshot for job " + jobId);
248      Guid jId = (Guid)jobId;
249      byte[] obj = engines[jId].GetSnapshot();
250      Logger.Debug("BEGIN: Sending snapshot sync");
251      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
252        jId,
253        obj,
254        engines[jId].Progress,
255        null);
256      Logger.Debug("END: Sended snapshot sync");
257      //Uptime Limit reached, now is a good time to destroy this jobs.
258      Logger.Debug("Checking if uptime limit is reached");
259      if (!UptimeManager.Instance.IsOnline()) {
260        Logger.Debug("Uptime limit reached");
261        Logger.Debug("Killing Appdomain");
262        KillAppDomain(jId);
263        //Still anything running? 
264        if (engines.Count == 0) {
265          Logger.Info("All jobs snapshotted and sent back, disconnecting");
266          WcfService.Instance.Disconnect();
267        } else {
268          Logger.Debug("There are still active Jobs in the Field, not disconnecting");
269        }
270
271      } else {
272        Logger.Debug("Restarting the job" + jobId);
273        engines[jId].StartOnlyJob();
274        Logger.Info("Restarted the job" + jobId);
275      }
276    }
277
278    #endregion
279
280    //Eventhandlers for the communication with the wcf Layer
281    #region wcfService Events
282    /// <summary>
283    /// Login has returned
284    /// </summary>
285    /// <param name="sender"></param>
286    /// <param name="e"></param>
287    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
288      if (e.Result.Success) {
289        CurrentlyFetching = false;
290        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
291      } else
292        Logger.Error("Error during login: " + e.Result.StatusMessage);
293    }
294
295    /// <summary>
296    /// A new Job from the wcfService has been received and will be started within a AppDomain.
297    /// </summary>
298    /// <param name="sender"></param>
299    /// <param name="e"></param>
300    void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) {
301      if (e.Result.StatusMessage != ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT) {
302        Logger.Info("Received new job with id " + e.Result.Obj.Id);
303        bool sandboxed = false;
304        Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
305
306        PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded);
307
308        PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id);
309
310        //        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
311        //        files.AddRange(plugininfo.PluginFiles);
312        Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
313        try {
314          String pluginDir = Path.Combine(PluginCache.PLUGIN_REPO, e.Result.Obj.Id.ToString());
315
316          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null);
317          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
318          lock (engines) {
319            if (!jobs.ContainsKey(e.Result.Obj.Id)) {
320              jobs.Add(e.Result.Obj.Id, e.Result.Obj);
321              appDomains.Add(e.Result.Obj.Id, appDomain);
322              Logger.Debug("Creating AppDomain");
323              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
324              Logger.Debug("Created AppDomain");
325              engine.JobId = e.Result.Obj.Id;
326              engine.Queue = MessageQueue.GetInstance();
327              Logger.Debug("Starting Engine for job " + e.Result.Obj.Id);
328              engine.Start(e.Data);
329              engines.Add(e.Result.Obj.Id, engine);
330
331              SlaveStatusInfo.JobsFetched++;
332              Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
333            }
334          }
335        }
336        catch (Exception exception) {
337          Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id);
338          Logger.Error("Error thrown is: ", exception);
339          CurrentlyFetching = false;
340          KillAppDomain(e.Result.Obj.Id);
341          wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, 1, exception, true);
342        }
343      } else {
344        Logger.Info("No more jobs left!");
345      }
346      CurrentlyFetching = false;
347    }
348
349    /// <summary>
350    /// A finished job has been stored on the server
351    /// </summary>
352    /// <param name="sender"></param>
353    /// <param name="e"></param>
354    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
355      Logger.Info("Job submitted with id " + e.Result.JobId);
356      KillAppDomain(e.Result.JobId);
357      if (e.Result.Success) {
358        SlaveStatusInfo.JobsProcessed++;
359        Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
360      } else {
361        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
362      }
363    }
364
365    /// <summary>
366    /// A snapshot has been stored on the server
367    /// </summary>
368    /// <param name="sender"></param>
369    /// <param name="e"></param>
370    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
371      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
372    }
373
374    /// <summary>
375    /// The server has been changed. All Appdomains and Jobs must be aborted!
376    /// </summary>
377    /// <param name="sender"></param>
378    /// <param name="e"></param>
379    void wcfService_ServerChanged(object sender, EventArgs e) {
380      Logger.Info("ServerChanged has been called");
381      lock (engines) {
382        foreach (KeyValuePair<Guid, Executor> entries in engines) {
383          engines[entries.Key].Abort();
384          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
385          //AppDomain.Unload(appDomains[entries.Key]);
386        }
387        //appDomains = new Dictionary<Guid, AppDomain>();
388        //engines = new Dictionary<Guid, Executor>();
389        //jobs = new Dictionary<Guid, Job>();
390      }
391    }
392
393    /// <summary>
394    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
395    /// </summary>
396    /// <param name="sender"></param>
397    /// <param name="e"></param>
398    void wcfService_Connected(object sender, EventArgs e) {
399      Logger.Info("WCF Service got a connection");
400      if (!UptimeManager.Instance.CalendarAvailable) {
401        Logger.Info("No local calendar available, fetch it");
402        FetchCalendarFromServer();
403      }
404      //if the fetching from the server failed - still set the client online... maybe we get
405      //a result within the next few heartbeats     
406      if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
407        Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsOnline());
408        Logger.Info("Setting client online");
409        wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
410        JobStorageManager.CheckAndSubmitJobsFromDisc();
411        CurrentlyFetching = false;
412      }
413    }
414
415    private void FetchCalendarFromServer() {
416      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
417      if (calres.Success) {
418        if (UptimeManager.Instance.SetAppointments(false, calres)) {
419          Logger.Info("Remote calendar installed");
420          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
421        } else {
422          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
423          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
424        }
425      } else {
426        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
427        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
428      }
429    }
430
431    //this is a little bit tricky -
432    void wcfService_ConnectionRestored(object sender, EventArgs e) {
433      Logger.Info("Reconnected to old server - checking currently running appdomains");
434
435      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
436        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
437          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
438          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
439          finThread.Start(execKVP.Value.JobId);
440        }
441      }
442    }
443
444    #endregion
445
446    public Dictionary<Guid, Executor> GetExecutionEngines() {
447      return engines;
448    }
449
450    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
451      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
452    }
453
454    internal Dictionary<Guid, JobDto> GetJobs() {
455      return jobs;
456    }
457
458    /// <summary>
459    /// Kill a appdomain with a specific id.
460    /// </summary>
461    /// <param name="id">the GUID of the job</param>
462    private void KillAppDomain(Guid id) {
463      Logger.Debug("Shutting down Appdomain for Job " + id);
464      lock (engines) {
465        try {
466          if (engines.ContainsKey(id))
467            engines[id].Dispose();
468          if (appDomains.ContainsKey(id)) {
469            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
470            AppDomain.Unload(appDomains[id]);
471            appDomains.Remove(id);
472          }
473          engines.Remove(id);
474          jobs.Remove(id);
475          PluginCache.Instance.DeletePluginsForJob(id);
476          GC.Collect();
477        }
478        catch (Exception ex) {
479          Logger.Error("Exception when unloading the appdomain: ", ex);
480        }
481      }
482      GC.Collect();
483    }
484  }
485
486
487}
Note: See TracBrowser for help on using the repository browser.