Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/Core.cs @ 4333

Last change on this file since 4333 was 4333, checked in by cneumuel, 14 years ago

added authorizationManager which checks for permission to specific jobs (#1168)

File size: 20.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Threading;
26using HeuristicLab.Core;
27using HeuristicLab.Hive.Contracts;
28using HeuristicLab.Hive.Contracts.BusinessObjects;
29using HeuristicLab.Hive.Contracts.ResponseObjects;
30using HeuristicLab.Hive.Slave.Common;
31using HeuristicLab.Hive.Slave.Communication;
32using HeuristicLab.Hive.Slave.Core.SlaveConsoleService;
33using HeuristicLab.Hive.Slave.Core.ConfigurationManager;
34using HeuristicLab.Hive.Slave.Core.JobStorage;
35using HeuristicLab.Hive.Slave.ExecutionEngine;
36using HeuristicLab.Tracing;
37using HeuristicLab.Hive.Slave.Communication.SlaveService;
38
39namespace HeuristicLab.Hive.Slave.Core {
40  /// <summary>
41  /// The core component of the Hive Client
42  /// </summary>
43  public class Core : MarshalByRefObject {
44    public static bool abortRequested { get; set; }
45
46    private bool currentlyFetching;
47    private bool CurrentlyFetching {
48      get {
49        return currentlyFetching;
50      }
51      set {
52        currentlyFetching = value;
53        Logger.Debug("Set CurrentlyFetching to " + currentlyFetching);
54      }
55    }
56
57    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
58    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
59    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
60
61    private WcfService wcfService;
62    private HeartbeatManager beat;
63
64    /// <summary>
65    /// Main Method for the client
66    /// </summary>
67    public void Start() {
68      abortRequested = false;
69      Logger.Info("Hive Slave started");
70      SlaveConsoleServer server = new SlaveConsoleServer();
71      server.Start();
72     
73      ConfigManager manager = ConfigManager.Instance;
74      manager.Core = this;
75
76      wcfService = WcfService.Instance;
77      RegisterServiceEvents();
78
79      RecoverSettings(); // recover server IP from the settings framework
80      StartHeartbeats(); // Start heartbeats thread
81      DispatchMessageQueue(); // dispatch messages until abortRequested
82
83      DeRegisterServiceEvents();
84      server.Close();
85      Logger.Info("Program shutdown");
86    }
87
88    private void RecoverSettings() {
89      ConnectionContainer cc = ConfigManager.Instance.GetServerIP();
90      if (cc.IPAdress != String.Empty) {
91        wcfService.SetIP(cc.IPAdress);
92      }
93    }
94
95    private void StartHeartbeats() {
96      //Initialize the heartbeat
97      beat = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
98      beat.StartHeartbeat();
99    }
100
101    private void DispatchMessageQueue() {
102      MessageQueue queue = MessageQueue.GetInstance();
103      while (!abortRequested) {
104        MessageContainer container = queue.GetMessage();
105        DetermineAction(container);
106      }
107    }
108
109    private void RegisterServiceEvents() {
110      wcfService.GetJobCompleted += new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
111      wcfService.GetFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
112      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
113      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
114      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
115      wcfService.Connected += new EventHandler(wcfService_Connected);
116    }
117
118    private void DeRegisterServiceEvents() {
119      wcfService.GetJobCompleted -= new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
120      wcfService.GetFinishedJobResultCompleted -= new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
121      wcfService.ProcessSnapshotCompleted -= new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
122      wcfService.ConnectionRestored -= new EventHandler(wcfService_ConnectionRestored);
123      wcfService.ServerChanged -= new EventHandler(wcfService_ServerChanged);
124      wcfService.Connected -= new EventHandler(wcfService_Connected);
125    }
126
127    /// <summary>
128    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
129    /// </summary>
130    /// <param name="container">The Container, containing the message</param>
131    private void DetermineAction(MessageContainer container) {
132      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
133      switch (container.Message) {
134        //Server requests to abort a job
135        case MessageContainer.MessageType.AbortJob:
136          if (engines.ContainsKey(container.JobId))
137            engines[container.JobId].Abort();
138          else
139            Logger.Error("AbortJob: Engine doesn't exist");
140          break;
141
142        //Job has been successfully aborted
143        case MessageContainer.MessageType.JobAborted:
144          Guid jobId = new Guid(container.JobId.ToString());
145          KillAppDomain(jobId);
146          break;
147
148        //Request a Snapshot from the Execution Engine
149        case MessageContainer.MessageType.RequestSnapshot:
150          if (engines.ContainsKey(container.JobId))
151            engines[container.JobId].RequestSnapshot();
152          else
153            Logger.Error("RequestSnapshot: Engine with Job doesn't exist");
154          break;
155
156        //Snapshot is ready and can be sent back to the Server
157        case MessageContainer.MessageType.SnapshotReady:
158          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
159          break;
160
161        //Pull a Job from the Server
162        case MessageContainer.MessageType.FetchJob:
163          if (!CurrentlyFetching) {
164            wcfService.GetJobAsync(ConfigManager.Instance.GetClientInfo().Id);
165            CurrentlyFetching = true;
166          } else
167            Logger.Info("Currently fetching, won't fetch this time!");
168          break;
169
170
171        //A Job has finished and can be sent back to the server
172        case MessageContainer.MessageType.FinishedJob:
173          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
174          break;
175
176
177        //When the timeslice is up
178        case MessageContainer.MessageType.UptimeLimitDisconnect:
179          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
180
181          ShutdownRunningJobsAndSubmitSnapshots();
182          break;
183
184        //Fetch or Force Fetch Calendar!
185        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
186          Logger.Info("Fetch Calendar from Server");
187          FetchCalendarFromServer();
188          break;
189
190        //Hard shutdown of the client
191        case MessageContainer.MessageType.Shutdown:
192          Logger.Info("Shutdown Signal received");
193          lock (engines) {
194            Logger.Debug("engines locked");
195            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
196              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
197              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
198              AppDomain.Unload(kvp.Value);
199            }
200          }
201          Logger.Debug("Stopping heartbeat");
202          abortRequested = true;
203          beat.StopHeartBeat();
204          Logger.Debug("Logging out");
205          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
206          break;
207      }
208    }
209
210    private void ShutdownRunningJobsAndSubmitSnapshots() {
211      //check if there are running jobs
212      if (engines.Count > 0) {
213        //make sure there is no more fetching of jobs while the snapshots get processed
214        CurrentlyFetching = true;
215        //request a snapshot of each running job
216        foreach (KeyValuePair<Guid, Executor> kvp in engines) {
217          kvp.Value.RequestSnapshot();
218        }
219      }
220    }
221
222    //Asynchronous Threads for interaction with the Execution Engine
223    #region Async Threads for the EE
224
225    /// <summary>
226    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
227    /// once the connection gets reestablished, the job gets submitted
228    /// </summary>
229    /// <param name="jobId"></param>
230    private void GetFinishedJob(object jobId) {
231      Guid jId = (Guid)jobId;
232      Logger.Info("Getting the finished job with id: " + jId);
233      try {
234        if (!engines.ContainsKey(jId)) {
235          Logger.Info("Engine doesn't exist");
236          return;
237        }
238
239        byte[] sJob = engines[jId].GetFinishedJob();
240
241        if (WcfService.Instance.LoggedIn) {
242          Logger.Info("Sending the finished job with id: " + jId);
243          wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, 1.0, engines[jId].CurrentException, true);
244        } else {
245          Logger.Info("Storing the finished job with id: " + jId + " to hdd");
246          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment
247          KillAppDomain(jId);
248        }
249      }
250      catch (InvalidStateException ise) {
251        Logger.Error("Invalid State while Snapshoting:", ise);
252      }
253    }
254
255    private void GetSnapshot(object jobId) {
256      Logger.Info("Fetching a snapshot for job " + jobId);
257      Guid jId = (Guid)jobId;
258      byte[] obj = engines[jId].GetSnapshot();
259      Logger.Debug("BEGIN: Sending snapshot sync");
260      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
261        jId,
262        obj,
263        engines[jId].Progress,
264        null);
265      Logger.Debug("END: Sended snapshot sync");
266      //Uptime Limit reached, now is a good time to destroy this jobs.
267      Logger.Debug("Checking if uptime limit is reached");
268      if (!UptimeManager.Instance.IsAllowedToCalculate()) {
269        Logger.Debug("Uptime limit reached");
270        Logger.Debug("Killing Appdomain");
271        KillAppDomain(jId);
272        //Still anything running? 
273        if (engines.Count == 0) {
274          Logger.Info("All jobs snapshotted and sent back, disconnecting");
275          WcfService.Instance.Disconnect();
276        } else {
277          Logger.Debug("There are still active Jobs in the Field, not disconnecting");
278        }
279
280      } else {
281        Logger.Debug("Restarting the job" + jobId);
282        engines[jId].StartOnlyJob();
283        Logger.Info("Restarted the job" + jobId);
284      }
285    }
286
287    #endregion
288
289    //Eventhandlers for the communication with the wcf Layer
290    #region wcfService Events
291    /// <summary>
292    /// Login has returned
293    /// </summary>
294    /// <param name="sender"></param>
295    /// <param name="e"></param>
296    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
297      if (e.Result.StatusMessage == ResponseStatus.Ok) {
298        CurrentlyFetching = false;
299        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
300      } else
301        Logger.Error("Error during login: " + e.Result.StatusMessage.ToString());
302    }
303
304    /// <summary>
305    /// A new Job from the wcfService has been received and will be started within a AppDomain.
306    /// </summary>
307    /// <param name="sender"></param>
308    /// <param name="e"></param>
309    void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) {
310      if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) {
311        Logger.Info("Received new job with id " + e.Result.Obj.Id);
312        bool sandboxed = false;
313        Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
314        try {
315
316          PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded);
317          PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id);
318
319          //        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
320          //        files.AddRange(plugininfo.PluginFiles);
321          Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
322          String pluginDir = Path.Combine(PluginCache.Instance.PluginRepositoryDir, e.Result.Obj.Id.ToString());
323
324          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null);
325          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
326          lock (engines) {
327            if (!jobs.ContainsKey(e.Result.Obj.Id)) {
328              jobs.Add(e.Result.Obj.Id, e.Result.Obj);
329              appDomains.Add(e.Result.Obj.Id, appDomain);
330              Logger.Debug("Creating AppDomain");
331              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
332              Logger.Debug("Created AppDomain");
333              engine.JobId = e.Result.Obj.Id;
334              engine.Queue = MessageQueue.GetInstance();
335              Logger.Debug("Starting Engine for job " + e.Result.Obj.Id);
336              engine.Start(e.Data);
337              engines.Add(e.Result.Obj.Id, engine);
338
339              SlaveStatusInfo.JobsFetched++;
340              Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
341            }
342          }
343          beat.InterruptHeartBeatThread();
344        }
345        catch (Exception exception) {
346          Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id);
347          Logger.Error("Error thrown is: ", exception);
348          CurrentlyFetching = false;
349          KillAppDomain(e.Result.Obj.Id);
350          wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, 1, exception, true);
351        }
352      } else {
353        Logger.Info("No more jobs left!");
354      }
355      CurrentlyFetching = false;
356    }
357
358    /// <summary>
359    /// A finished job has been stored on the server
360    /// </summary>
361    /// <param name="sender"></param>
362    /// <param name="e"></param>
363    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
364      Logger.Info("Job submitted with id " + e.Result.JobId);
365      KillAppDomain(e.Result.JobId);
366      if (e.Result.StatusMessage == ResponseStatus.Ok) {
367        SlaveStatusInfo.JobsProcessed++;
368        Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
369        beat.InterruptHeartBeatThread();
370      } else {
371        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
372      }
373    }
374
375    /// <summary>
376    /// A snapshot has been stored on the server
377    /// </summary>
378    /// <param name="sender"></param>
379    /// <param name="e"></param>
380    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
381      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
382    }
383
384    /// <summary>
385    /// The server has been changed. All Appdomains and Jobs must be aborted!
386    /// </summary>
387    /// <param name="sender"></param>
388    /// <param name="e"></param>
389    void wcfService_ServerChanged(object sender, EventArgs e) {
390      Logger.Info("ServerChanged has been called");
391      lock (engines) {
392        foreach (KeyValuePair<Guid, Executor> entries in engines) {
393          engines[entries.Key].Abort();
394          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
395          //AppDomain.Unload(appDomains[entries.Key]);
396        }
397        //appDomains = new Dictionary<Guid, AppDomain>();
398        //engines = new Dictionary<Guid, Executor>();
399        //jobs = new Dictionary<Guid, Job>();
400      }
401    }
402
403    /// <summary>
404    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
405    /// </summary>
406    /// <param name="sender"></param>
407    /// <param name="e"></param>
408    void wcfService_Connected(object sender, EventArgs e) {
409      Logger.Info("WCF Service got a connection");
410      if (!UptimeManager.Instance.CalendarAvailable) {
411        Logger.Info("No local calendar available, fetch it");
412        FetchCalendarFromServer();
413      }
414      //if the fetching from the server failed - still set the client online... maybe we get
415      //a result within the next few heartbeats     
416      //if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
417      Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsAllowedToCalculate());
418      Logger.Info("Setting client online");
419      wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
420      JobStorageManager.CheckAndSubmitJobsFromDisc();
421      CurrentlyFetching = false;
422      //}
423    }
424
425    private void FetchCalendarFromServer() {
426      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
427      if (calres.StatusMessage == ResponseStatus.Ok) {
428        if (UptimeManager.Instance.SetAppointments(false, calres)) {
429          Logger.Info("Remote calendar installed");
430          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
431        } else {
432          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
433          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
434        }
435      } else {
436        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
437        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
438      }
439    }
440
441    //this is a little bit tricky -
442    void wcfService_ConnectionRestored(object sender, EventArgs e) {
443      Logger.Info("Reconnected to old server - checking currently running appdomains");
444
445      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
446        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
447          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
448          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
449          finThread.Start(execKVP.Value.JobId);
450        }
451      }
452    }
453
454    #endregion
455
456    public Dictionary<Guid, Executor> ExecutionEngines {
457      get { return engines; }
458    }
459
460    internal Dictionary<Guid, JobDto> Jobs {
461      get { return jobs; }
462    }
463
464    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
465      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
466    }
467
468    /// <summary>
469    /// Kill a appdomain with a specific id.
470    /// </summary>
471    /// <param name="id">the GUID of the job</param>
472    private void KillAppDomain(Guid id) {
473      Logger.Debug("Shutting down Appdomain for Job " + id);
474      lock (engines) {
475        try {
476          if (engines.ContainsKey(id))
477            engines[id].Dispose();
478          if (appDomains.ContainsKey(id)) {
479            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
480            AppDomain.Unload(appDomains[id]);
481            appDomains.Remove(id);
482          }
483          engines.Remove(id);
484          jobs.Remove(id);
485          PluginCache.Instance.DeletePluginsForJob(id);
486          GC.Collect();
487        }
488        catch (Exception ex) {
489          Logger.Error("Exception when unloading the appdomain: ", ex);
490        }
491      }
492      GC.Collect();
493    }
494  }
495
496
497}
Note: See TracBrowser for help on using the repository browser.