Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/Core.cs @ 4263

Last change on this file since 4263 was 4263, checked in by cneumuel, 14 years ago

consolidated Response objects to use only StatusMessage with enums instead of strings.
removed Success property from Response. success is now represented by StatusMessage alone. (#1159)

File size: 20.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Slave.ExecutionEngine;
27using HeuristicLab.Hive.Slave.Common;
28using System.Threading;
29using System.Reflection;
30using System.Diagnostics;
31using System.Security.Permissions;
32using System.Security.Policy;
33using System.Security;
34using HeuristicLab.Hive.Slave.Communication;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Hive.Contracts;
37using System.Runtime.Remoting.Messaging;
38using HeuristicLab.PluginInfrastructure;
39using System.ServiceModel;
40using System.ServiceModel.Description;
41using HeuristicLab.Hive.Slave.Core.ClientConsoleService;
42using HeuristicLab.Hive.Slave.Core.ConfigurationManager;
43using HeuristicLab.Hive.Slave.Communication.ServerService;
44using HeuristicLab.Hive.JobBase;
45using HeuristicLab.Hive.Slave.Core.JobStorage;
46using HeuristicLab.Tracing;
47using HeuristicLab.Core;
48using System.IO;
49using HeuristicLab.Hive.Contracts.ResponseObjects;
50
51namespace HeuristicLab.Hive.Slave.Core {
52  /// <summary>
53  /// The core component of the Hive Client
54  /// </summary>
55  public class Core : MarshalByRefObject {
56    public static bool abortRequested { get; set; }
57
58    private bool _currentlyFetching;
59    private bool CurrentlyFetching {
60      get {
61        return _currentlyFetching;
62      }
63      set {
64        _currentlyFetching = value;
65        Logger.Debug("Set CurrentlyFetching to " + _currentlyFetching);
66      }
67    }
68
69    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
70    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
71    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
72
73    private WcfService wcfService;
74    private Heartbeat beat;
75
76    /// <summary>
77    /// Main Method for the client
78    /// </summary>
79    public void Start() {
80      abortRequested = false;
81      Logger.Info("Hive Slave started");
82      SlaveConsoleServer server = new SlaveConsoleServer();
83      server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/SlaveConsole/"));
84
85      ConfigManager manager = ConfigManager.Instance;
86      manager.Core = this;
87
88      //Register all Wcf Service references
89      wcfService = WcfService.Instance;
90      wcfService.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(wcfService_LoginCompleted);
91      wcfService.GetJobCompleted += new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
92      wcfService.GetFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
93      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
94      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
95      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
96      wcfService.Connected += new EventHandler(wcfService_Connected);
97
98      //Recover Server IP and Port from the Settings Framework
99      ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();
100      if (cc.IPAdress != String.Empty && cc.Port != 0)
101        wcfService.SetIPAndPort(cc.IPAdress, cc.Port);
102
103      //Initialize the heartbeat
104      beat = new Heartbeat { Interval = 10000 };
105      beat.StartHeartbeat();
106
107      MessageQueue queue = MessageQueue.GetInstance();
108
109      //Main processing loop     
110      //Todo: own thread for message handling
111      //Rly?!
112      while (!abortRequested) {
113        MessageContainer container = queue.GetMessage();
114        DetermineAction(container);
115      }
116      Logger.Info("Program shutdown");
117    }
118
119    /// <summary>
120    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
121    /// </summary>
122    /// <param name="container">The Container, containing the message</param>
123    private void DetermineAction(MessageContainer container) {
124      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
125      switch (container.Message) {
126        //Server requests to abort a job
127        case MessageContainer.MessageType.AbortJob:
128          if (engines.ContainsKey(container.JobId))
129            engines[container.JobId].Abort();
130          else
131            Logger.Error("AbortJob: Engine doesn't exist");
132          break;
133
134        //Job has been successfully aborted
135        case MessageContainer.MessageType.JobAborted:
136          Guid jobId = new Guid(container.JobId.ToString());
137          KillAppDomain(jobId);
138          break;
139
140        //Request a Snapshot from the Execution Engine
141        case MessageContainer.MessageType.RequestSnapshot:
142          if (engines.ContainsKey(container.JobId))
143            engines[container.JobId].RequestSnapshot();
144          else
145            Logger.Error("RequestSnapshot: Engine with Job doesn't exist");
146          break;
147
148        //Snapshot is ready and can be sent back to the Server
149        case MessageContainer.MessageType.SnapshotReady:
150          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
151          break;
152
153        //Pull a Job from the Server
154        case MessageContainer.MessageType.FetchJob:
155          if (!CurrentlyFetching) {
156            wcfService.GetJobAsync(ConfigManager.Instance.GetClientInfo().Id);
157            CurrentlyFetching = true;
158          } else
159            Logger.Info("Currently fetching, won't fetch this time!");
160          break;
161
162
163        //A Job has finished and can be sent back to the server
164        case MessageContainer.MessageType.FinishedJob:
165          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
166          break;
167
168
169        //When the timeslice is up
170        case MessageContainer.MessageType.UptimeLimitDisconnect:
171          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
172
173          //check if there are running jobs
174          if (engines.Count > 0) {
175            //make sure there is no more fetching of jobs while the snapshots get processed
176            CurrentlyFetching = true;
177            //request a snapshot of each running job
178            foreach (KeyValuePair<Guid, Executor> kvp in engines) {
179              kvp.Value.RequestSnapshot();
180            }
181
182          } else {
183            //Disconnect afterwards
184            WcfService.Instance.Disconnect();
185          }
186          break;
187
188        //Fetch or Force Fetch Calendar!
189        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
190          Logger.Info("Fetch Calendar from Server");
191          FetchCalendarFromServer();
192          break;
193
194        //Hard shutdown of the client
195        case MessageContainer.MessageType.Shutdown:
196          Logger.Info("Shutdown Signal received");
197          lock (engines) {
198            Logger.Debug("engines locked");
199            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
200              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
201              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
202              AppDomain.Unload(kvp.Value);
203            }
204          }
205          Logger.Debug("Stopping heartbeat");
206          abortRequested = true;
207          beat.StopHeartBeat();
208          Logger.Debug("Logging out");
209          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
210          break;
211      }
212    }
213
214    //Asynchronous Threads for interaction with the Execution Engine
215    #region Async Threads for the EE
216
217    /// <summary>
218    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
219    /// once the connection gets reestablished, the job gets submitted
220    /// </summary>
221    /// <param name="jobId"></param>
222    private void GetFinishedJob(object jobId) {
223      Guid jId = (Guid)jobId;
224      Logger.Info("Getting the finished job with id: " + jId);
225      try {
226        if (!engines.ContainsKey(jId)) {
227          Logger.Info("Engine doesn't exist");
228          return;
229        }
230
231        byte[] sJob = engines[jId].GetFinishedJob();
232
233        if (WcfService.Instance.ConnState == NetworkEnum.WcfConnState.Loggedin) {
234          Logger.Info("Sending the finished job with id: " + jId);
235          wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, 1.0, engines[jId].CurrentException, true);
236        } else {
237          Logger.Info("Storing the finished job with id: " + jId + " to hdd");
238          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
239          KillAppDomain(jId);
240        }
241      }
242      catch (InvalidStateException ise) {
243        Logger.Error("Invalid State while Snapshoting:", ise);
244      }
245    }
246
247    private void GetSnapshot(object jobId) {
248      Logger.Info("Fetching a snapshot for job " + jobId);
249      Guid jId = (Guid)jobId;
250      byte[] obj = engines[jId].GetSnapshot();
251      Logger.Debug("BEGIN: Sending snapshot sync");
252      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
253        jId,
254        obj,
255        engines[jId].Progress,
256        null);
257      Logger.Debug("END: Sended snapshot sync");
258      //Uptime Limit reached, now is a good time to destroy this jobs.
259      Logger.Debug("Checking if uptime limit is reached");
260      if (!UptimeManager.Instance.IsOnline()) {
261        Logger.Debug("Uptime limit reached");
262        Logger.Debug("Killing Appdomain");
263        KillAppDomain(jId);
264        //Still anything running? 
265        if (engines.Count == 0) {
266          Logger.Info("All jobs snapshotted and sent back, disconnecting");
267          WcfService.Instance.Disconnect();
268        } else {
269          Logger.Debug("There are still active Jobs in the Field, not disconnecting");
270        }
271
272      } else {
273        Logger.Debug("Restarting the job" + jobId);
274        engines[jId].StartOnlyJob();
275        Logger.Info("Restarted the job" + jobId);
276      }
277    }
278
279    #endregion
280
281    //Eventhandlers for the communication with the wcf Layer
282    #region wcfService Events
283    /// <summary>
284    /// Login has returned
285    /// </summary>
286    /// <param name="sender"></param>
287    /// <param name="e"></param>
288    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
289      if (e.Result.StatusMessage == ResponseStatus.Ok) {
290        CurrentlyFetching = false;
291        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
292      } else
293        Logger.Error("Error during login: " + e.Result.StatusMessage.ToString());
294    }
295
296    /// <summary>
297    /// A new Job from the wcfService has been received and will be started within a AppDomain.
298    /// </summary>
299    /// <param name="sender"></param>
300    /// <param name="e"></param>
301    void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) {
302      if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) {
303        Logger.Info("Received new job with id " + e.Result.Obj.Id);
304        bool sandboxed = false;
305        Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
306
307        PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded);
308
309        PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id);
310
311        //        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
312        //        files.AddRange(plugininfo.PluginFiles);
313        Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
314        try {
315          String pluginDir = Path.Combine(PluginCache.PLUGIN_REPO, e.Result.Obj.Id.ToString());
316
317          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null);
318          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
319          lock (engines) {
320            if (!jobs.ContainsKey(e.Result.Obj.Id)) {
321              jobs.Add(e.Result.Obj.Id, e.Result.Obj);
322              appDomains.Add(e.Result.Obj.Id, appDomain);
323              Logger.Debug("Creating AppDomain");
324              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
325              Logger.Debug("Created AppDomain");
326              engine.JobId = e.Result.Obj.Id;
327              engine.Queue = MessageQueue.GetInstance();
328              Logger.Debug("Starting Engine for job " + e.Result.Obj.Id);
329              engine.Start(e.Data);
330              engines.Add(e.Result.Obj.Id, engine);
331
332              SlaveStatusInfo.JobsFetched++;
333              Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
334            }
335          }
336        }
337        catch (Exception exception) {
338          Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id);
339          Logger.Error("Error thrown is: ", exception);
340          CurrentlyFetching = false;
341          KillAppDomain(e.Result.Obj.Id);
342          wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, 1, exception, true);
343        }
344      } else {
345        Logger.Info("No more jobs left!");
346      }
347      CurrentlyFetching = false;
348    }
349
350    /// <summary>
351    /// A finished job has been stored on the server
352    /// </summary>
353    /// <param name="sender"></param>
354    /// <param name="e"></param>
355    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
356      Logger.Info("Job submitted with id " + e.Result.JobId);
357      KillAppDomain(e.Result.JobId);
358      if (e.Result.StatusMessage == ResponseStatus.Ok) {
359        SlaveStatusInfo.JobsProcessed++;
360        Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
361      } else {
362        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
363      }
364    }
365
366    /// <summary>
367    /// A snapshot has been stored on the server
368    /// </summary>
369    /// <param name="sender"></param>
370    /// <param name="e"></param>
371    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
372      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
373    }
374
375    /// <summary>
376    /// The server has been changed. All Appdomains and Jobs must be aborted!
377    /// </summary>
378    /// <param name="sender"></param>
379    /// <param name="e"></param>
380    void wcfService_ServerChanged(object sender, EventArgs e) {
381      Logger.Info("ServerChanged has been called");
382      lock (engines) {
383        foreach (KeyValuePair<Guid, Executor> entries in engines) {
384          engines[entries.Key].Abort();
385          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
386          //AppDomain.Unload(appDomains[entries.Key]);
387        }
388        //appDomains = new Dictionary<Guid, AppDomain>();
389        //engines = new Dictionary<Guid, Executor>();
390        //jobs = new Dictionary<Guid, Job>();
391      }
392    }
393
394    /// <summary>
395    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
396    /// </summary>
397    /// <param name="sender"></param>
398    /// <param name="e"></param>
399    void wcfService_Connected(object sender, EventArgs e) {
400      Logger.Info("WCF Service got a connection");
401      if (!UptimeManager.Instance.CalendarAvailable) {
402        Logger.Info("No local calendar available, fetch it");
403        FetchCalendarFromServer();
404      }
405      //if the fetching from the server failed - still set the client online... maybe we get
406      //a result within the next few heartbeats     
407      if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
408        Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsOnline());
409        Logger.Info("Setting client online");
410        wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
411        JobStorageManager.CheckAndSubmitJobsFromDisc();
412        CurrentlyFetching = false;
413      }
414    }
415
416    private void FetchCalendarFromServer() {
417      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
418      if (calres.StatusMessage == ResponseStatus.Ok) {
419        if (UptimeManager.Instance.SetAppointments(false, calres)) {
420          Logger.Info("Remote calendar installed");
421          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
422        } else {
423          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
424          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
425        }
426      } else {
427        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
428        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
429      }
430    }
431
432    //this is a little bit tricky -
433    void wcfService_ConnectionRestored(object sender, EventArgs e) {
434      Logger.Info("Reconnected to old server - checking currently running appdomains");
435
436      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
437        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
438          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
439          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
440          finThread.Start(execKVP.Value.JobId);
441        }
442      }
443    }
444
445    #endregion
446
447    public Dictionary<Guid, Executor> GetExecutionEngines() {
448      return engines;
449    }
450
451    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
452      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
453    }
454
455    internal Dictionary<Guid, JobDto> GetJobs() {
456      return jobs;
457    }
458
459    /// <summary>
460    /// Kill a appdomain with a specific id.
461    /// </summary>
462    /// <param name="id">the GUID of the job</param>
463    private void KillAppDomain(Guid id) {
464      Logger.Debug("Shutting down Appdomain for Job " + id);
465      lock (engines) {
466        try {
467          if (engines.ContainsKey(id))
468            engines[id].Dispose();
469          if (appDomains.ContainsKey(id)) {
470            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
471            AppDomain.Unload(appDomains[id]);
472            appDomains.Remove(id);
473          }
474          engines.Remove(id);
475          jobs.Remove(id);
476          PluginCache.Instance.DeletePluginsForJob(id);
477          GC.Collect();
478        }
479        catch (Exception ex) {
480          Logger.Error("Exception when unloading the appdomain: ", ex);
481        }
482      }
483      GC.Collect();
484    }
485  }
486
487
488}
Note: See TracBrowser for help on using the repository browser.