Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/Core.cs @ 4424

Last change on this file since 4424 was 4424, checked in by cneumuel, 14 years ago
  • Added and updated License Information in every file
  • Sort and remove usings in every file
  • Deleted obsolete DataAccess.ADOHelper
  • Deleted some obsolete files
File size: 21.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts.ResponseObjects;
31using HeuristicLab.Hive.Slave.Common;
32using HeuristicLab.Hive.Slave.Communication;
33using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
34using HeuristicLab.Hive.Slave.Core.ConfigurationManager;
35using HeuristicLab.Hive.Slave.Core.JobStorage;
36using HeuristicLab.Hive.Slave.Core.SlaveConsoleService;
37using HeuristicLab.Hive.Slave.ExecutionEngine;
38using HeuristicLab.Tracing;
39
40namespace HeuristicLab.Hive.Slave.Core {
41  /// <summary>
42  /// The core component of the Hive Client
43  /// </summary>
44  public class Core : MarshalByRefObject {
45    public static bool abortRequested { get; set; }
46
47    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
48    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
49    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
50
51    private WcfService wcfService;
52    private HeartbeatManager heartbeatManager;
53
54    private bool currentlyFetching;
55    private bool CurrentlyFetching {
56      get {
57        return currentlyFetching;
58      }
59      set {
60        currentlyFetching = value;
61        Logger.Debug("Set CurrentlyFetching to " + currentlyFetching);
62      }
63    }
64
65    public Dictionary<Guid, Executor> ExecutionEngines {
66      get { return engines; }
67    }
68
69    internal Dictionary<Guid, JobDto> Jobs {
70      get { return jobs; }
71    }
72
73    /// <summary>
74    /// Main Method for the client
75    /// </summary>
76    public void Start() {
77      abortRequested = false;
78      Logger.Info("Hive Slave started");
79      SlaveConsoleServer server = new SlaveConsoleServer();
80      server.Start();
81
82      ConfigManager manager = ConfigManager.Instance;
83      manager.Core = this;
84
85      wcfService = WcfService.Instance;
86      RegisterServiceEvents();
87
88      RecoverSettings(); // recover server IP from the settings framework
89      StartHeartbeats(); // Start heartbeats thread
90      DispatchMessageQueue(); // dispatch messages until abortRequested
91
92      DeRegisterServiceEvents();
93      server.Close();
94      Logger.Info("Program shutdown");
95    }
96
97    private void RecoverSettings() {
98      ConnectionContainer cc = ConfigManager.Instance.GetServerIP();
99      if (cc.IPAdress != String.Empty) {
100        wcfService.ServerIp = cc.IPAdress;
101      }
102    }
103
104    private void StartHeartbeats() {
105      //Initialize the heartbeat
106      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
107      heartbeatManager.StartHeartbeat();
108    }
109
110    private void DispatchMessageQueue() {
111      MessageQueue queue = MessageQueue.GetInstance();
112      while (!abortRequested) {
113        MessageContainer container = queue.GetMessage();
114        DetermineAction(container);
115      }
116    }
117
118    private void RegisterServiceEvents() {
119      wcfService.GetJobCompleted += new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
120      wcfService.GetFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
121      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
122      wcfService.Connected += new EventHandler(wcfService_Connected);
123    }
124
125    private void DeRegisterServiceEvents() {
126      wcfService.GetJobCompleted -= new EventHandler<GetJobCompletedEventArgs>(wcfService_GetJobCompleted);
127      wcfService.GetFinishedJobResultCompleted -= new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
128      wcfService.ProcessSnapshotCompleted -= new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
129      wcfService.Connected -= new EventHandler(wcfService_Connected);
130    }
131
132    /// <summary>
133    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
134    /// </summary>
135    /// <param name="container">The Container, containing the message</param>
136    private void DetermineAction(MessageContainer container) {
137      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
138      switch (container.Message) {
139        //Server requests to abort a job
140        case MessageContainer.MessageType.AbortJob:
141          if (engines.ContainsKey(container.JobId))
142            try {
143              engines[container.JobId].Abort();
144            }
145            catch (AppDomainUnloadedException) {
146              // appdomain already unloaded. Finishing job probably ongoing
147            }
148          else
149            Logger.Error("AbortJob: Engine doesn't exist");
150          break;
151
152        //Job has been successfully aborted
153        case MessageContainer.MessageType.JobAborted:
154          Guid jobId = new Guid(container.JobId.ToString());
155          KillAppDomain(jobId);
156          break;
157
158        //Request a Snapshot from the Execution Engine
159        case MessageContainer.MessageType.RequestSnapshot:
160          if (engines.ContainsKey(container.JobId))
161            engines[container.JobId].RequestSnapshot();
162          else
163            Logger.Error("RequestSnapshot: Engine with Job doesn't exist");
164          break;
165
166        //Snapshot is ready and can be sent back to the Server
167        case MessageContainer.MessageType.SnapshotReady:
168          GetSnapshot(container.JobId);
169          break;
170
171        //Pull a Job from the Server
172        case MessageContainer.MessageType.FetchJob:
173          if (!CurrentlyFetching) {
174            wcfService.GetJobAsync(ConfigManager.Instance.GetClientInfo().Id);
175            CurrentlyFetching = true;
176          } else
177            Logger.Info("Currently fetching, won't fetch this time!");
178          break;
179
180        //A Job has finished and can be sent back to the server
181        case MessageContainer.MessageType.FinishedJob:
182          SendFinishedJob(container.JobId);
183          break;
184
185        //When the timeslice is up
186        case MessageContainer.MessageType.UptimeLimitDisconnect:
187          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
188          ShutdownRunningJobsAndSubmitSnapshots();
189          break;
190
191        //Fetch or Force Fetch Calendar!
192        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
193          Logger.Info("Fetch Calendar from Server");
194          FetchCalendarFromServer();
195          break;
196
197        //Hard shutdown of the client
198        case MessageContainer.MessageType.Shutdown:
199          Logger.Info("Shutdown Signal received");
200          lock (engines) {
201            Logger.Debug("engines locked");
202            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
203              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
204              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
205              AppDomain.Unload(kvp.Value);
206            }
207          }
208          Logger.Debug("Stopping heartbeat");
209          abortRequested = true;
210          heartbeatManager.StopHeartBeat();
211          Logger.Debug("Logging out");
212          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
213          break;
214
215        case MessageContainer.MessageType.AddChildJob:
216          AddChildJob((MessageContainerWithJob)container);
217          break;
218
219        case MessageContainer.MessageType.PauseJob:
220          // send the job back to hive
221          PauseJob((MessageContainerWithJob)container);
222          break;
223
224        case MessageContainer.MessageType.GetChildJobs:
225          GetChildJobs((MessageContainerWithCallback<SerializedJobList>)container);
226          break;
227
228        case MessageContainer.MessageType.DeleteChildJobs:
229          wcfService.DeleteChildJobs(container.JobId);
230          break;
231      }
232    }
233
234    private void GetChildJobs(MessageContainerWithCallback<SerializedJobList> mc) {
235      ResponseObject<SerializedJobList> response = wcfService.GetChildJobs(mc.JobId);
236      if (response != null && response.StatusMessage == ResponseStatus.Ok) {
237        mc.Callback(response.Obj);
238      } else {
239        if (response != null) {
240          Logger.Error(string.Format("GetChildJobs failed: {0}", response.StatusMessage));
241        } else {
242          Logger.Error("GetChildJobs failed.");
243        }
244      }
245    }
246
247    private void PauseJob(MessageContainerWithJob mc) {
248      ResponseObject<JobDto> response = wcfService.PauseJob(mc.SerializedJob);
249      KillAppDomain(mc.JobId);
250      if (response == null || response.StatusMessage != ResponseStatus.Ok) {
251        Logger.Error("PauseJob failed: " + response.StatusMessage);
252      }
253    }
254
255    private ResponseObject<JobDto> AddChildJob(MessageContainerWithJob mc) {
256      ResponseObject<JobDto> response = wcfService.AddChildJob(mc.JobId, mc.SerializedJob);
257      if (response == null || response.StatusMessage != ResponseStatus.Ok) {
258        Logger.Error("AddChildJob failed: " + response.StatusMessage);
259      }
260      return response;
261    }
262
263    private void ShutdownRunningJobsAndSubmitSnapshots() {
264      //check if there are running jobs
265      if (engines.Count > 0) {
266        //make sure there is no more fetching of jobs while the snapshots get processed
267        CurrentlyFetching = true;
268        //request a snapshot of each running job
269        foreach (KeyValuePair<Guid, Executor> kvp in engines) {
270          kvp.Value.RequestSnapshot();
271        }
272      }
273    }
274
275    //Asynchronous Threads for interaction with the Execution Engine
276    #region Async Threads for the EE
277
278    /// <summary>
279    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
280    /// once the connection gets reestablished, the job gets submitted
281    /// </summary>
282    /// <param name="jobId"></param>
283    private void SendFinishedJob(object jobId) {
284      try {
285        Guid jId = (Guid)jobId;
286        Logger.Info("Getting the finished job with id: " + jId);
287        if (!engines.ContainsKey(jId)) {
288          Logger.Info("Engine doesn't exist");
289          return;
290        }
291
292        byte[] sJob = engines[jId].GetFinishedJob();
293
294        try {
295          Logger.Info("Sending the finished job with id: " + jId);
296          wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, engines[jId].ExecutionTime, engines[jId].CurrentException, true);
297        }
298        catch (Exception e) {
299          Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
300          JobStorageManager.PersistObjectToDisc(wcfService.ServerIp, 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment
301        }
302        finally {
303          KillAppDomain(jId); // kill app-domain in every case
304        }
305      }
306      catch (Exception e) {
307        OnExceptionOccured(e);
308      }
309    }
310
311    private void GetSnapshot(object jobId) {
312      try {
313        Logger.Info("Fetching a snapshot for job " + jobId);
314        Guid jId = (Guid)jobId;
315        byte[] obj = engines[jId].GetSnapshot();
316        wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id, jId, obj, engines[jId].ExecutionTime, null);
317
318        //Uptime Limit reached, now is a good time to destroy this jobs.
319        Logger.Debug("Checking if uptime limit is reached");
320        if (!UptimeManager.Instance.IsAllowedToCalculate()) {
321          Logger.Debug("Uptime limit reached");
322          Logger.Debug("Killing Appdomain");
323          KillAppDomain(jId);
324          //Still anything running? 
325          if (engines.Count == 0) {
326            Logger.Info("All jobs snapshotted and sent back, disconnecting");
327            WcfService.Instance.Disconnect();
328          } else {
329            Logger.Debug("There are still active Jobs in the Field, not disconnecting");
330          }
331        } else {
332          Logger.Debug("Restarting the job" + jobId);
333          engines[jId].StartOnlyJob();
334          Logger.Info("Restarted the job" + jobId);
335        }
336      }
337      catch (Exception e) {
338        OnExceptionOccured(e);
339      }
340    }
341
342    #endregion
343
344    //Eventhandlers for the communication with the wcf Layer
345    #region wcfService Events
346    /// <summary>
347    /// Login has returned
348    /// </summary>
349    /// <param name="sender"></param>
350    /// <param name="e"></param>
351    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
352      if (e.Result.StatusMessage == ResponseStatus.Ok) {
353        CurrentlyFetching = false;
354        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
355      } else
356        Logger.Error("Error during login: " + e.Result.StatusMessage.ToString());
357    }
358
359    /// <summary>
360    /// A new Job from the wcfService has been received and will be started within a AppDomain.
361    /// </summary>
362    /// <param name="sender"></param>
363    /// <param name="e"></param>
364    void wcfService_GetJobCompleted(object sender, GetJobCompletedEventArgs e) {
365      if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) {
366        Logger.Info("Received new job with id " + e.Result.Obj.Id);
367        Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id);
368        try {
369          PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded);
370          PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id);
371
372          Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id);
373          String pluginDir = Path.Combine(PluginCache.Instance.PluginRepositoryDir, e.Result.Obj.Id.ToString());
374
375          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null);
376          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
377          lock (engines) {
378            if (!jobs.ContainsKey(e.Result.Obj.Id)) {
379              jobs.Add(e.Result.Obj.Id, e.Result.Obj);
380              appDomains.Add(e.Result.Obj.Id, appDomain);
381              Logger.Debug("Creating AppDomain");
382              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
383              Logger.Debug("Created AppDomain");
384              engine.JobId = e.Result.Obj.Id;
385              engine.Queue = MessageQueue.GetInstance();
386              Logger.Debug("Starting Engine for job " + e.Result.Obj.Id);
387              engine.Start(e.Data);
388              engines.Add(e.Result.Obj.Id, engine);
389              SlaveStatusInfo.JobsFetched++;
390              Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
391            }
392          }
393          heartbeatManager.AwakeHeartBeatThread();
394        }
395        catch (Exception exception) {
396          Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Obj.Id);
397          Logger.Error("Error thrown is: ", exception);
398          CurrentlyFetching = false;
399          KillAppDomain(e.Result.Obj.Id);
400          wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, e.Result.Obj.ExecutionTime, exception.ToString(), true);
401        }
402      } else {
403        Logger.Info("No more jobs left!");
404      }
405      CurrentlyFetching = false;
406    }
407
408    /// <summary>
409    /// A finished job has been stored on the server
410    /// </summary>
411    /// <param name="sender"></param>
412    /// <param name="e"></param>
413    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
414      Logger.Info("Job submitted with id " + e.Result.JobId);
415      KillAppDomain(e.Result.JobId);
416      if (e.Result.StatusMessage == ResponseStatus.Ok) {
417        SlaveStatusInfo.JobsProcessed++;
418        Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed);
419        heartbeatManager.AwakeHeartBeatThread();
420      } else {
421        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
422      }
423    }
424
425    /// <summary>
426    /// A snapshot has been stored on the server
427    /// </summary>
428    /// <param name="sender"></param>
429    /// <param name="e"></param>
430    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
431      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
432    }
433
434
435    /// <summary>
436    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
437    /// </summary>
438    /// <param name="sender"></param>
439    /// <param name="e"></param>
440    void wcfService_Connected(object sender, EventArgs e) {
441      Logger.Info("WCF Service got a connection");
442      if (!UptimeManager.Instance.CalendarAvailable) {
443        Logger.Info("No local calendar available, fetch it");
444        FetchCalendarFromServer();
445      }
446      Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsAllowedToCalculate());
447      CurrentlyFetching = false;
448      CheckRunningAppDomains();
449      JobStorageManager.CheckAndSubmitJobsFromDisc();
450    }
451
452    private void FetchCalendarFromServer() {
453      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
454      if (calres.StatusMessage == ResponseStatus.Ok) {
455        if (UptimeManager.Instance.SetAppointments(false, calres)) {
456          Logger.Info("Remote calendar installed");
457          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
458        } else {
459          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
460          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
461        }
462      } else {
463        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
464        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
465      }
466    }
467
468    private void CheckRunningAppDomains() {
469      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
470        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
471          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
472          Thread finThread = new Thread(new ParameterizedThreadStart(SendFinishedJob));
473          finThread.Start(execKVP.Value.JobId);
474        }
475      }
476    }
477
478    #endregion
479
480    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
481    private void OnExceptionOccured(Exception e) {
482      Logger.Error("Error: " + e.ToString());
483      var handler = ExceptionOccured;
484      if (handler != null) handler(this, new EventArgs<Exception>(e));
485    }
486
487    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
488      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
489    }
490
491    /// <summary>
492    /// Kill a appdomain with a specific id.
493    /// </summary>
494    /// <param name="id">the GUID of the job</param>
495    private void KillAppDomain(Guid id) {
496      Logger.Debug("Shutting down Appdomain for Job " + id);
497      lock (engines) {
498        try {
499          if (engines.ContainsKey(id))
500            engines[id].Dispose();
501          if (appDomains.ContainsKey(id)) {
502            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
503
504            int repeat = 5;
505            while (repeat > 0) {
506              try {
507                AppDomain.Unload(appDomains[id]);
508                repeat = 0;
509              }
510              catch (CannotUnloadAppDomainException) {
511                Logger.Error("Could not unload AppDomain, will try again in 1 sec.");
512                Thread.Sleep(1000);
513                repeat--;
514                if (repeat == 0) {
515                  throw; // rethrow and let app crash
516                }
517              }
518            }
519            appDomains.Remove(id);
520          }
521
522          engines.Remove(id);
523          jobs.Remove(id);
524          PluginCache.Instance.DeletePluginsForJob(id);
525          GC.Collect();
526        }
527        catch (Exception ex) {
528          Logger.Error("Exception when unloading the appdomain: ", ex);
529        }
530      }
531      GC.Collect();
532    }
533  }
534
535
536}
Note: See TracBrowser for help on using the repository browser.