Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/Core.cs @ 4368

Last change on this file since 4368 was 4368, checked in by cneumuel, 14 years ago
  • created HiveClient which shows an overview over all submitted HiveExperiments
  • its possible to download all submitted HiveExperiments including results
  • Experiments are now sent as a whole to the Hive and the Hive-Slaves take care of creating child-jobs (if necessary). The parent job is then paused and will be reactivated when all child-jobs are finished
  • WcfService-Clients are now consistently managed by WcfServicePool which allows to use IDisposable-Pattern and always keeps exactly one proxy-object until all callers disposed them.
  • created ProgressView which is able to lock a View and display progress of an action. It also allows to simulate progress if no progress-information is available so that users don't get too nervous while waiting.
File size: 21.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Threading;
26using HeuristicLab.Core;
27using HeuristicLab.Hive.Contracts;
28using HeuristicLab.Hive.Contracts.BusinessObjects;
29using HeuristicLab.Hive.Contracts.ResponseObjects;
30using HeuristicLab.Hive.Slave.Common;
31using HeuristicLab.Hive.Slave.Communication;
32using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
33using HeuristicLab.Hive.Slave.Core.ConfigurationManager;
34using HeuristicLab.Hive.Slave.Core.JobStorage;
35using HeuristicLab.Hive.Slave.Core.SlaveConsoleService;
36using HeuristicLab.Hive.Slave.ExecutionEngine;
37using HeuristicLab.Tracing;
38
39namespace HeuristicLab.Hive.Slave.Core {
40  /// <summary>
41  /// The core component of the Hive Client
42  /// </summary>
43  public class Core : MarshalByRefObject {
44    public static bool abortRequested { get; set; }
45
46    private bool currentlyFetching;
47    private bool CurrentlyFetching {
48      get {
49        return currentlyFetching;
50      }
51      set {
52        currentlyFetching = value;
53        Logger.Debug("Set CurrentlyFetching to " + currentlyFetching);
54      }
55    }
56
57    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
58    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
59    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
60
61    private WcfService wcfService;
62    private HeartbeatManager beat;
63
64    /// <summary>
65    /// Main Method for the client
66    /// </summary>
67    public void Start() {
68      abortRequested = false;
69      Logger.Info("Hive Slave started");
70      SlaveConsoleServer server = new SlaveConsoleServer();
71      server.Start();
72     
73      ConfigManager manager = ConfigManager.Instance;
74      manager.Core = this;
75
76      wcfService = WcfService.Instance;
77      RegisterServiceEvents();
78
79      RecoverSettings(); // recover server IP from the settings framework
80      StartHeartbeats(); // Start heartbeats thread
81      DispatchMessageQueue(); // dispatch messages until abortRequested
82
83      DeRegisterServiceEvents();
84      server.Close();
85      Logger.Info("Program shutdown");
86    }
87
88    private void RecoverSettings() {
89      ConnectionContainer cc = ConfigManager.Instance.GetServerIP();
90      if (cc.IPAdress != String.Empty) {
91        wcfService.ServerIp = cc.IPAdress;
92      }
93    }
94
95    private void StartHeartbeats() {
96      //Initialize the heartbeat
97      beat = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
98      beat.StartHeartbeat();
99    }
100
101    private void DispatchMessageQueue() {
102      MessageQueue queue = MessageQueue.GetInstance();
103      while (!abortRequested) {
104        MessageContainer container = queue.GetMessage();
105        DetermineAction(container);
106      }
107    }
108
109    private void RegisterServiceEvents() {
110      wcfService.GetJobCompleted += new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
111      wcfService.GetFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
112      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
113      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
114      //wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
115      wcfService.Connected += new EventHandler(wcfService_Connected);
116    }
117
118    private void DeRegisterServiceEvents() {
119      wcfService.GetJobCompleted -= new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
120      wcfService.GetFinishedJobResultCompleted -= new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
121      wcfService.ProcessSnapshotCompleted -= new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
122      wcfService.ConnectionRestored -= new EventHandler(wcfService_ConnectionRestored);
123      //wcfService.ServerChanged -= new EventHandler(wcfService_ServerChanged);
124      wcfService.Connected -= new EventHandler(wcfService_Connected);
125    }
126
127    /// <summary>
128    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
129    /// </summary>
130    /// <param name="container">The Container, containing the message</param>
131    private void DetermineAction(MessageContainer container) {
132      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
133      switch (container.Message) {
134        //Server requests to abort a job
135        case MessageContainer.MessageType.AbortJob:
136          if (engines.ContainsKey(container.JobId))
137            engines[container.JobId].Abort();
138          else
139            Logger.Error("AbortJob: Engine doesn't exist");
140          break;
141
142        //Job has been successfully aborted
143        case MessageContainer.MessageType.JobAborted:
144          Guid jobId = new Guid(container.JobId.ToString());
145          KillAppDomain(jobId);
146          break;
147
148        //Request a Snapshot from the Execution Engine
149        case MessageContainer.MessageType.RequestSnapshot:
150          if (engines.ContainsKey(container.JobId))
151            engines[container.JobId].RequestSnapshot();
152          else
153            Logger.Error("RequestSnapshot: Engine with Job doesn't exist");
154          break;
155
156        //Snapshot is ready and can be sent back to the Server
157        case MessageContainer.MessageType.SnapshotReady:
158          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
159          break;
160
161        //Pull a Job from the Server
162        case MessageContainer.MessageType.FetchJob:
163          if (!CurrentlyFetching) {
164            wcfService.GetJobAsync(ConfigManager.Instance.GetClientInfo().Id);
165            CurrentlyFetching = true;
166          } else
167            Logger.Info("Currently fetching, won't fetch this time!");
168          break;
169
170
171        //A Job has finished and can be sent back to the server
172        case MessageContainer.MessageType.FinishedJob:
173          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
174          break;
175
176
177        //When the timeslice is up
178        case MessageContainer.MessageType.UptimeLimitDisconnect:
179          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
180
181          ShutdownRunningJobsAndSubmitSnapshots();
182          break;
183
184        //Fetch or Force Fetch Calendar!
185        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
186          Logger.Info("Fetch Calendar from Server");
187          FetchCalendarFromServer();
188          break;
189
190        //Hard shutdown of the client
191        case MessageContainer.MessageType.Shutdown:
192          Logger.Info("Shutdown Signal received");
193          lock (engines) {
194            Logger.Debug("engines locked");
195            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
196              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
197              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
198              AppDomain.Unload(kvp.Value);
199            }
200          }
201          Logger.Debug("Stopping heartbeat");
202          abortRequested = true;
203          beat.StopHeartBeat();
204          Logger.Debug("Logging out");
205          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
206          break;
207
208        case MessageContainer.MessageType.AddChildJob:
209          AddChildJob((MessageContainerWithJob)container);
210          break;
211
212        case MessageContainer.MessageType.PauseJob:
213          // send the job back to hive
214          PauseJob((MessageContainerWithJob)container);
215          break;
216
217        case MessageContainer.MessageType.GetChildJobs:
218          // send the job back to hive
219          GetChildJobs((MessageContainerWithCallback<SerializedJobList>)container);
220          break;
221      }
222    }
223
224    private void GetChildJobs(MessageContainerWithCallback<SerializedJobList> mc) {
225      ResponseObject<SerializedJobList> response = wcfService.GetChildJobs(mc.JobId);
226      if (response.StatusMessage != ResponseStatus.Ok) {
227        Logger.Error("GetChildJobs failed: " + response.StatusMessage);
228      } else {
229        mc.Callback(response.Obj);
230      }
231    }
232
233    private void PauseJob(MessageContainerWithJob mc) {
234      ResponseObject<JobDto> response = wcfService.PauseJob(mc.SerializedJob);
235      KillAppDomain(mc.JobId);
236      if (response.StatusMessage != ResponseStatus.Ok) {
237        Logger.Error("PauseJob failed: " + response.StatusMessage);
238      }
239    }
240
241    private ResponseObject<JobDto> AddChildJob(MessageContainerWithJob mc) {
242      ResponseObject<JobDto> response = wcfService.AddChildJob(mc.JobId, mc.SerializedJob);
243      if (response.StatusMessage != ResponseStatus.Ok) {
244        Logger.Error("AddChildJob failed: " + response.StatusMessage);
245      }
246      return response;
247    }
248
249    private void ShutdownRunningJobsAndSubmitSnapshots() {
250      //check if there are running jobs
251      if (engines.Count > 0) {
252        //make sure there is no more fetching of jobs while the snapshots get processed
253        CurrentlyFetching = true;
254        //request a snapshot of each running job
255        foreach (KeyValuePair<Guid, Executor> kvp in engines) {
256          kvp.Value.RequestSnapshot();
257        }
258      }
259    }
260
261    //Asynchronous Threads for interaction with the Execution Engine
262    #region Async Threads for the EE
263
264    /// <summary>
265    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
266    /// once the connection gets reestablished, the job gets submitted
267    /// </summary>
268    /// <param name="jobId"></param>
269    private void GetFinishedJob(object jobId) {
270      Guid jId = (Guid)jobId;
271      Logger.Info("Getting the finished job with id: " + jId);
272      try {
273        if (!engines.ContainsKey(jId)) {
274          Logger.Info("Engine doesn't exist");
275          return;
276        }
277
278        byte[] sJob = engines[jId].GetFinishedJob();
279
280        if (WcfService.Instance.LoggedIn) {
281          Logger.Info("Sending the finished job with id: " + jId);
282          wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, 1.0, engines[jId].CurrentException, true);
283        } else {
284          Logger.Info("Storing the finished job with id: " + jId + " to hdd");
285          JobStorageManager.PersistObjectToDisc(wcfService.ServerIp, 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment
286          KillAppDomain(jId);
287        }
288      }
289      catch (InvalidStateException ise) {
290        Logger.Error("Invalid State while Snapshoting:", ise);
291      }
292    }
293
294    private void GetSnapshot(object jobId) {
295      Logger.Info("Fetching a snapshot for job " + jobId);
296      Guid jId = (Guid)jobId;
297      byte[] obj = engines[jId].GetSnapshot();
298      Logger.Debug("BEGIN: Sending snapshot sync");
299      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
300        jId,
301        obj,
302        engines[jId].Progress,
303        null);
304      Logger.Debug("END: Sended snapshot sync");
305      //Uptime Limit reached, now is a good time to destroy this jobs.
306      Logger.Debug("Checking if uptime limit is reached");
307      if (!UptimeManager.Instance.IsAllowedToCalculate()) {
308        Logger.Debug("Uptime limit reached");
309        Logger.Debug("Killing Appdomain");
310        KillAppDomain(jId);
311        //Still anything running? 
312        if (engines.Count == 0) {
313          Logger.Info("All jobs snapshotted and sent back, disconnecting");
314          WcfService.Instance.Disconnect();
315        } else {
316          Logger.Debug("There are still active Jobs in the Field, not disconnecting");
317        }
318
319      } else {
320        Logger.Debug("Restarting the job" + jobId);
321        engines[jId].StartOnlyJob();
322        Logger.Info("Restarted the job" + jobId);
323      }
324    }
325
326    #endregion
327
328    //Eventhandlers for the communication with the wcf Layer
329    #region wcfService Events
330    /// <summary>
331    /// Login has returned
332    /// </summary>
333    /// <param name="sender"></param>
334    /// <param name="e"></param>
335    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
336      if (e.Result.StatusMessage == ResponseStatus.Ok) {
337        CurrentlyFetching = false;
338        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
339      } else
340        Logger.Error("Error during login: " + e.Result.StatusMessage.ToString());
341    }
342
343    /// <summary>
344    /// A new Job from the wcfService has been received and will be started within a AppDomain.
345    /// </summary>
346    /// <param name="sender"></param>
347    /// <param name="e"></param>
348    void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) {
349      if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) {
350        Logger.Info("Received new job with id " + e.Result.Obj.Id);
351        bool sandboxed = false;
352        Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
353        try {
354
355          PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded);
356          PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id);
357
358          //        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
359          //        files.AddRange(plugininfo.PluginFiles);
360          Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
361          String pluginDir = Path.Combine(PluginCache.Instance.PluginRepositoryDir, e.Result.Obj.Id.ToString());
362
363          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null);
364          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
365          lock (engines) {
366            if (!jobs.ContainsKey(e.Result.Obj.Id)) {
367              jobs.Add(e.Result.Obj.Id, e.Result.Obj);
368              appDomains.Add(e.Result.Obj.Id, appDomain);
369              Logger.Debug("Creating AppDomain");
370              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
371              Logger.Debug("Created AppDomain");
372              engine.JobId = e.Result.Obj.Id;
373              engine.Queue = MessageQueue.GetInstance();
374              Logger.Debug("Starting Engine for job " + e.Result.Obj.Id);
375              engine.Start(e.Data);
376              engines.Add(e.Result.Obj.Id, engine);
377
378              SlaveStatusInfo.JobsFetched++;
379              Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
380            }
381          }
382          beat.InterruptHeartBeatThread();
383        }
384        catch (Exception exception) {
385          Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id);
386          Logger.Error("Error thrown is: ", exception);
387          CurrentlyFetching = false;
388          KillAppDomain(e.Result.Obj.Id);
389          wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, 1, exception, true);
390        }
391      } else {
392        Logger.Info("No more jobs left!");
393      }
394      CurrentlyFetching = false;
395    }
396
397    /// <summary>
398    /// A finished job has been stored on the server
399    /// </summary>
400    /// <param name="sender"></param>
401    /// <param name="e"></param>
402    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
403      Logger.Info("Job submitted with id " + e.Result.JobId);
404      KillAppDomain(e.Result.JobId);
405      if (e.Result.StatusMessage == ResponseStatus.Ok) {
406        SlaveStatusInfo.JobsProcessed++;
407        Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
408        beat.InterruptHeartBeatThread();
409      } else {
410        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
411      }
412    }
413
414    /// <summary>
415    /// A snapshot has been stored on the server
416    /// </summary>
417    /// <param name="sender"></param>
418    /// <param name="e"></param>
419    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
420      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
421    }
422
423
424    /// <summary>
425    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
426    /// </summary>
427    /// <param name="sender"></param>
428    /// <param name="e"></param>
429    void wcfService_Connected(object sender, EventArgs e) {
430      Logger.Info("WCF Service got a connection");
431      if (!UptimeManager.Instance.CalendarAvailable) {
432        Logger.Info("No local calendar available, fetch it");
433        FetchCalendarFromServer();
434      }
435      //if the fetching from the server failed - still set the client online... maybe we get
436      //a result within the next few heartbeats     
437      //if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
438      Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsAllowedToCalculate());
439      Logger.Info("Setting client online");
440      wcfService.Login(ConfigManager.Instance.GetClientInfo());
441      JobStorageManager.CheckAndSubmitJobsFromDisc();
442      CurrentlyFetching = false;
443    }
444
445    private void FetchCalendarFromServer() {
446      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
447      if (calres.StatusMessage == ResponseStatus.Ok) {
448        if (UptimeManager.Instance.SetAppointments(false, calres)) {
449          Logger.Info("Remote calendar installed");
450          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
451        } else {
452          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
453          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
454        }
455      } else {
456        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
457        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
458      }
459    }
460
461    //this is a little bit tricky -
462    void wcfService_ConnectionRestored(object sender, EventArgs e) {
463      Logger.Info("Reconnected to old server - checking currently running appdomains");
464
465      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
466        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
467          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
468          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
469          finThread.Start(execKVP.Value.JobId);
470        }
471      }
472    }
473
474    #endregion
475
476    public Dictionary<Guid, Executor> ExecutionEngines {
477      get { return engines; }
478    }
479
480    internal Dictionary<Guid, JobDto> Jobs {
481      get { return jobs; }
482    }
483
484    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
485      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
486    }
487
488    /// <summary>
489    /// Kill a appdomain with a specific id.
490    /// </summary>
491    /// <param name="id">the GUID of the job</param>
492    private void KillAppDomain(Guid id) {
493      Logger.Debug("Shutting down Appdomain for Job " + id);
494      lock (engines) {
495        try {
496          if (engines.ContainsKey(id))
497            engines[id].Dispose();
498          if (appDomains.ContainsKey(id)) {
499            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
500            AppDomain.Unload(appDomains[id]);
501            appDomains.Remove(id);
502          }
503
504          engines.Remove(id);
505          jobs.Remove(id);
506          PluginCache.Instance.DeletePluginsForJob(id);
507          GC.Collect();
508        }
509        catch (Exception ex) {
510          Logger.Error("Exception when unloading the appdomain: ", ex);
511        }
512      }
513      GC.Collect();
514    }
515  }
516
517
518}
Note: See TracBrowser for help on using the repository browser.