Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/Core.cs @ 5093

Last change on this file since 5093 was 5093, checked in by cneumuel, 13 years ago

#1260

  • moved all state-information into lifecycleManager
  • changed isolation level for transactions to ReadCommited
  • made currentlyFetching-status on slave more rubust
  • made LogServiceReader more rubust
File size: 22.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Linq;
26using System.Threading;
27using HeuristicLab.Common;
28using HeuristicLab.Core;
29using HeuristicLab.Hive.Contracts;
30using HeuristicLab.Hive.Contracts.BusinessObjects;
31using HeuristicLab.Hive.Contracts.ResponseObjects;
32using HeuristicLab.Hive.Slave.Common;
33using HeuristicLab.Hive.Slave.Communication;
34using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
35using HeuristicLab.Hive.Slave.Core.ConfigurationManager;
36using HeuristicLab.Hive.Slave.Core.JobStorage;
37using HeuristicLab.Hive.Slave.Core.SlaveConsoleService;
38using HeuristicLab.Hive.Slave.ExecutionEngine;
39
40namespace HeuristicLab.Hive.Slave.Core {
41  /// <summary>
42  /// The core component of the Hive Client
43  /// </summary>
44  public class Core : MarshalByRefObject {
45    public static bool abortRequested { get; set; }
46
47    public static ILog Log { get; set; }
48
49    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
50    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
51    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
52
53    private WcfService wcfService;
54    private HeartbeatManager heartbeatManager;
55
56    private bool currentlyFetching;
57    private bool CurrentlyFetching {
58      get {
59        return currentlyFetching;
60      }
61      set {
62        currentlyFetching = value;
63        Logger.Debug("Set CurrentlyFetching to " + currentlyFetching);
64      }
65    }
66
67    public Dictionary<Guid, Executor> ExecutionEngines {
68      get { return engines; }
69    }
70
71    internal Dictionary<Guid, JobDto> Jobs {
72      get { return jobs; }
73    }
74
75    /// <summary>
76    /// Main Method for the client
77    /// </summary>
78    public void Start() {
79      abortRequested = false;
80      Logger.Info("Hive Slave started");
81      SlaveConsoleServer server = new SlaveConsoleServer();
82      server.Start();
83
84      ConfigManager manager = ConfigManager.Instance;
85      manager.Core = this;
86
87      wcfService = WcfService.Instance;
88      RegisterServiceEvents();
89
90      StartHeartbeats(); // Start heartbeats thread
91      DispatchMessageQueue(); // dispatch messages until abortRequested
92
93      DeRegisterServiceEvents();
94      server.Close();
95      Logger.Info("Program shutdown");
96    }
97
98    private void StartHeartbeats() {
99      //Initialize the heartbeat
100      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
101      heartbeatManager.StartHeartbeat();
102    }
103
104    private void DispatchMessageQueue() {
105      MessageQueue queue = MessageQueue.GetInstance();
106      while (!abortRequested) {
107        MessageContainer container = queue.GetMessage();
108        DetermineAction(container);
109      }
110    }
111
112    private void RegisterServiceEvents() {
113      wcfService.GetJobCompleted += new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
114      wcfService.GetFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
115      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
116      wcfService.Connected += new EventHandler(wcfService_Connected);
117      wcfService.GetJobFailed += new EventHandler<EventArgs<Exception>>(wcfService_GetJobFailed);
118    }
119
120    private void DeRegisterServiceEvents() {
121      wcfService.GetJobCompleted -= new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
122      wcfService.GetFinishedJobResultCompleted -= new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
123      wcfService.ProcessSnapshotCompleted -= new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
124      wcfService.Connected -= new EventHandler(wcfService_Connected);
125      wcfService.GetJobFailed -= new EventHandler<EventArgs<Exception>>(wcfService_GetJobFailed);
126    }
127
128    /// <summary>
129    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
130    /// </summary>
131    /// <param name="container">The Container, containing the message</param>
132    private void DetermineAction(MessageContainer container) {
133      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
134      switch (container.Message) {
135        //Server requests to abort a job
136        case MessageContainer.MessageType.AbortJob:
137          if (engines.ContainsKey(container.JobId))
138            try {
139              engines[container.JobId].Abort();
140            }
141            catch (AppDomainUnloadedException) {
142              // appdomain already unloaded. Finishing job probably ongoing
143            }
144          else
145            Logger.Error("AbortJob: Engine doesn't exist");
146          break;
147
148        //Job has been successfully aborted
149        case MessageContainer.MessageType.JobAborted:
150          Guid jobId = new Guid(container.JobId.ToString());
151          KillAppDomain(jobId);
152          break;
153
154        //Request a Snapshot from the Execution Engine
155        case MessageContainer.MessageType.RequestSnapshot:
156          if (engines.ContainsKey(container.JobId))
157            engines[container.JobId].RequestSnapshot();
158          else
159            Logger.Error("RequestSnapshot: Engine with Job doesn't exist");
160          break;
161
162        //Snapshot is ready and can be sent back to the Server
163        case MessageContainer.MessageType.SnapshotReady:
164          GetSnapshot(container.JobId);
165          break;
166
167        //Pull a Job from the Server
168        case MessageContainer.MessageType.FetchJob:
169          if (!CurrentlyFetching) {
170            wcfService.GetJobAsync(ConfigManager.Instance.GetClientInfo().Id);
171            CurrentlyFetching = true;
172          } else
173            Logger.Info("Currently fetching, won't fetch this time!");
174          break;
175
176        //A Job has finished and can be sent back to the server
177        case MessageContainer.MessageType.FinishedJob:
178          SendFinishedJob(container.JobId);
179          break;
180
181        case MessageContainer.MessageType.JobFailed:
182          SendFinishedJob(container.JobId);
183          break;
184
185        //When the timeslice is up
186        case MessageContainer.MessageType.UptimeLimitDisconnect:
187          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
188          ShutdownRunningJobsAndSubmitSnapshots();
189          break;
190
191        //Fetch or Force Fetch Calendar!
192        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
193          Logger.Info("Fetch Calendar from Server");
194          FetchCalendarFromServer();
195          break;
196
197        //Hard shutdown of the client
198        case MessageContainer.MessageType.Shutdown:
199          Logger.Info("Shutdown Signal received");
200          lock (engines) {
201            Logger.Debug("engines locked");
202            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
203              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
204              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
205              AppDomain.Unload(kvp.Value);
206            }
207          }
208          Logger.Debug("Stopping heartbeat");
209          abortRequested = true;
210          heartbeatManager.StopHeartBeat();
211          Logger.Debug("Logging out");
212          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
213          break;
214
215        case MessageContainer.MessageType.AddChildJob:
216          AddChildJob((MessageContainerWithJob)container);
217          break;
218
219        case MessageContainer.MessageType.PauseJob:
220          // send the job back to hive
221          PauseJob((MessageContainerWithJob)container);
222          break;
223
224        case MessageContainer.MessageType.GetChildJobs:
225          GetChildJobs((MessageContainerWithCallback<SerializedJobList>)container);
226          break;
227
228        case MessageContainer.MessageType.DeleteChildJobs:
229          wcfService.DeleteChildJobs(container.JobId);
230          break;
231      }
232    }
233
234    private void GetChildJobs(MessageContainerWithCallback<SerializedJobList> mc) {
235      ResponseObject<SerializedJobList> response = wcfService.GetChildJobs(mc.JobId);
236      if (response != null && response.StatusMessage == ResponseStatus.Ok) {
237        mc.Callback(response.Obj);
238      } else {
239        if (response != null) {
240          Logger.Error(string.Format("GetChildJobs failed: {0}", response.StatusMessage));
241        } else {
242          Logger.Error("GetChildJobs failed.");
243        }
244      }
245    }
246
247    private void PauseJob(MessageContainerWithJob mc) {
248      ResponseObject<JobDto> response = wcfService.PauseJob(mc.SerializedJob);
249      KillAppDomain(mc.JobId);
250      if (response == null || response.StatusMessage != ResponseStatus.Ok) {
251        Logger.Error("PauseJob failed: " + response.StatusMessage);
252      }
253    }
254
255    private ResponseObject<JobDto> AddChildJob(MessageContainerWithJob mc) {
256      ResponseObject<JobDto> response = wcfService.AddChildJob(mc.JobId, mc.SerializedJob);
257      if (response == null || response.StatusMessage != ResponseStatus.Ok) {
258        Logger.Error("AddChildJob failed: " + response.StatusMessage);
259      }
260      return response;
261    }
262
263    private void ShutdownRunningJobsAndSubmitSnapshots() {
264      //check if there are running jobs
265      if (engines.Count > 0) {
266        //make sure there is no more fetching of jobs while the snapshots get processed
267        CurrentlyFetching = true;
268        //request a snapshot of each running job
269        foreach (KeyValuePair<Guid, Executor> kvp in engines) {
270          kvp.Value.RequestSnapshot();
271        }
272      }
273    }
274
275    //Asynchronous Threads for interaction with the Execution Engine
276    #region Async Threads for the EE
277
278    /// <summary>
279    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
280    /// once the connection gets reestablished, the job gets submitted
281    /// </summary>
282    /// <param name="jobId"></param>
283    private void SendFinishedJob(object jobId) {
284      try {
285        Guid jId = (Guid)jobId;
286        Logger.Info("Getting the finished job with id: " + jId);
287        if (!engines.ContainsKey(jId)) {
288          Logger.Info("Engine doesn't exist");
289          return;
290        }
291
292        byte[] sJob = engines[jId].GetFinishedJob();
293       
294        try {
295          Logger.Info("Sending the finished job with id: " + jId);
296          wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, engines[jId].ExecutionTime, engines[jId].CurrentException, true);
297        }
298        catch (Exception e) {
299          Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
300          JobStorageManager.PersistObjectToDisc("0", 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment. also serverIp is not used anymore
301        }
302        finally {
303          KillAppDomain(jId); // kill app-domain in every case
304        }
305      }
306      catch (Exception e) {
307        OnExceptionOccured(e);
308      }
309    }
310
311    private void GetSnapshot(object jobId) {
312      try {
313        Logger.Info("Fetching a snapshot for job " + jobId);
314        Guid jId = (Guid)jobId;
315        byte[] obj = engines[jId].GetSnapshot();
316        wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id, jId, obj, engines[jId].ExecutionTime, null);
317
318        //Uptime Limit reached, now is a good time to destroy this jobs.
319        Logger.Debug("Checking if uptime limit is reached");
320        if (!UptimeManager.Instance.IsAllowedToCalculate()) {
321          Logger.Debug("Uptime limit reached");
322          Logger.Debug("Killing Appdomain");
323          KillAppDomain(jId);
324          //Still anything running? 
325          if (engines.Count == 0) {
326            Logger.Info("All jobs snapshotted and sent back, disconnecting");
327            WcfService.Instance.Disconnect();
328          } else {
329            Logger.Debug("There are still active Jobs in the Field, not disconnecting");
330          }
331        } else {
332          Logger.Debug("Restarting the job" + jobId);
333          engines[jId].StartOnlyJob();
334          Logger.Info("Restarted the job" + jobId);
335        }
336      }
337      catch (Exception e) {
338        OnExceptionOccured(e);
339      }
340    }
341
342    #endregion
343
344    //Eventhandlers for the communication with the wcf Layer
345    #region wcfService Events
346    /// <summary>
347    /// Login has returned
348    /// </summary>
349    /// <param name="sender"></param>
350    /// <param name="e"></param>
351    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
352      if (e.Result.StatusMessage == ResponseStatus.Ok) {
353        CurrentlyFetching = false;
354        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
355      } else
356        Logger.Error("Error during login: " + e.Result.StatusMessage.ToString());
357    }
358
359    /// <summary>
360    /// A new Job from the wcfService has been received and will be started within a AppDomain.
361    /// </summary>
362    /// <param name="sender"></param>
363    /// <param name="e"></param>
364    void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) {
365      if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) {
366        Logger.Info("Received new job with id " + e.Result.Obj.Id);
367        Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
368
369        String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, e.Result.Obj.Id.ToString());
370        bool pluginsPrepared = false;
371        try {
372          PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded.OrderBy(x => x.Name).ToList());
373          PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded.OrderBy(x => x.Name).ToList(), e.Result.Obj.Id);
374
375          Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
376          pluginsPrepared = true;
377        }
378        catch (Exception exception) {
379          Logger.Error(string.Format("Copying plugins for job {0} failed: {1}", e.Result.Obj.Id, exception));
380        }
381
382        if (pluginsPrepared) {
383          try {
384            AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(e.Result.Obj.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName));
385            appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
386            lock (engines) {
387              if (!jobs.ContainsKey(e.Result.Obj.Id)) {
388                jobs.Add(e.Result.Obj.Id, e.Result.Obj);
389                appDomains.Add(e.Result.Obj.Id, appDomain);
390                Logger.Debug("Creating AppDomain");
391                Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
392                Logger.Debug("Created AppDomain");
393                engine.JobId = e.Result.Obj.Id;
394                engine.Queue = MessageQueue.GetInstance();
395                Logger.Debug("Starting Engine for job " + e.Result.Obj.Id);
396                engines.Add(e.Result.Obj.Id, engine);
397                engine.Start(e.Data);
398                SlaveStatusInfo.JobsFetched++;
399                Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
400              }
401            }
402            heartbeatManager.AwakeHeartBeatThread();
403          }
404          catch (Exception exception) {
405            Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id);
406            Logger.Error("Error thrown is: ", exception);
407            CurrentlyFetching = false;
408            KillAppDomain(e.Result.Obj.Id);
409            wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, e.Result.Obj.ExecutionTime, exception.ToString(), false);
410          }
411        }
412      } else {
413        Logger.Info("No more jobs left!");
414      }
415      CurrentlyFetching = false;
416    }
417
418    /// <summary>
419    /// A finished job has been stored on the server
420    /// </summary>
421    /// <param name="sender"></param>
422    /// <param name="e"></param>
423    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
424      Logger.Info("Job submitted with id " + e.Result.JobId);
425      if (e.Result.StatusMessage == ResponseStatus.Ok) {
426        SlaveStatusInfo.JobsProcessed++;
427        Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
428        heartbeatManager.AwakeHeartBeatThread();
429      } else {
430        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
431      }
432    }
433
434    /// <summary>
435    /// A snapshot has been stored on the server
436    /// </summary>
437    /// <param name="sender"></param>
438    /// <param name="e"></param>
439    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
440      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
441    }
442
443
444    /// <summary>
445    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
446    /// </summary>
447    /// <param name="sender"></param>
448    /// <param name="e"></param>
449    void wcfService_Connected(object sender, EventArgs e) {
450      Logger.Info("WCF Service got a connection");
451      if (!UptimeManager.Instance.CalendarAvailable) {
452        Logger.Info("No local calendar available, fetch it");
453        FetchCalendarFromServer();
454      }
455      Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsAllowedToCalculate());
456      CurrentlyFetching = false;
457      CheckRunningAppDomains();
458      JobStorageManager.CheckAndSubmitJobsFromDisc();
459    }
460
461    void wcfService_GetJobFailed(object sender, EventArgs<Exception> e) {
462      Logger.Info("GetJobFailed: " + e.Value.ToString());
463      CurrentlyFetching = false;
464    }
465
466    private void FetchCalendarFromServer() {
467      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
468      if (calres.StatusMessage == ResponseStatus.Ok) {
469        if (UptimeManager.Instance.SetAppointments(false, calres)) {
470          Logger.Info("Remote calendar installed");
471          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
472        } else {
473          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
474          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
475        }
476      } else {
477        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
478        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
479      }
480    }
481
482    private void CheckRunningAppDomains() {
483      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
484        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
485          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
486          Thread finThread = new Thread(new ParameterizedThreadStart(SendFinishedJob));
487          finThread.Start(execKVP.Value.JobId);
488        }
489      }
490    }
491
492    #endregion
493
494    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
495    private void OnExceptionOccured(Exception e) {
496      Logger.Error("Error: " + e.ToString());
497      var handler = ExceptionOccured;
498      if (handler != null) handler(this, new EventArgs<Exception>(e));
499    }
500
501    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
502      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
503      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
504    }
505
506    /// <summary>
507    /// Kill a appdomain with a specific id.
508    /// </summary>
509    /// <param name="id">the GUID of the job</param>
510    private void KillAppDomain(Guid id) {
511      Logger.Debug("Shutting down Appdomain for Job " + id);
512      lock (engines) {
513        try {
514          if (engines.ContainsKey(id))
515            engines[id].Dispose();
516          if (appDomains.ContainsKey(id)) {
517            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
518
519            int repeat = 5;
520            while (repeat > 0) {
521              try {
522                AppDomain.Unload(appDomains[id]);
523                repeat = 0;
524              }
525              catch (CannotUnloadAppDomainException) {
526                Logger.Error("Could not unload AppDomain, will try again in 1 sec.");
527                Thread.Sleep(1000);
528                repeat--;
529                if (repeat == 0) {
530                  throw; // rethrow and let app crash
531                }
532              }
533            }
534            appDomains.Remove(id);
535          }
536
537          engines.Remove(id);
538          jobs.Remove(id);
539          PluginCache.Instance.DeletePluginsForJob(id);
540          GC.Collect();
541        }
542        catch (Exception ex) {
543          Logger.Error("Exception when unloading the appdomain: ", ex);
544        }
545      }
546      GC.Collect();
547    }
548  }
549
550
551}
Note: See TracBrowser for help on using the repository browser.