Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.2/sources/HeuristicLab.Hive.Client.Core/3.2/Core.cs @ 4140

Last change on this file since 4140 was 4140, checked in by kgrading, 14 years ago

#828 added various improvements to the plugin cache manager, the execution engine, the transaction handling on the serverside and the server console

File size: 20.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Client.ExecutionEngine;
27using HeuristicLab.Hive.Client.Common;
28using System.Threading;
29using System.Reflection;
30using System.Diagnostics;
31using System.Security.Permissions;
32using System.Security.Policy;
33using System.Security;
34using HeuristicLab.Hive.Client.Communication;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Hive.Contracts;
37using System.Runtime.Remoting.Messaging;
38using HeuristicLab.PluginInfrastructure;
39using System.ServiceModel;
40using System.ServiceModel.Description;
41using HeuristicLab.Hive.Client.Core.ClientConsoleService;
42using HeuristicLab.Hive.Client.Core.ConfigurationManager;
43using HeuristicLab.Hive.Client.Communication.ServerService;
44using HeuristicLab.Hive.JobBase;
45using HeuristicLab.Hive.Client.Core.JobStorage;
46using HeuristicLab.Tracing;
47using System.IO;
48
49namespace HeuristicLab.Hive.Client.Core {
50  /// <summary>
51  /// The core component of the Hive Client
52  /// </summary>
53  public class Core : MarshalByRefObject {
54    public static bool abortRequested { get; set; }
55
56    private bool _currentlyFetching;
57    private bool CurrentlyFetching {
58      get {
59        return _currentlyFetching;
60      } set {       
61        _currentlyFetching = value;
62        Logger.Debug("Set CurrentlyFetching to " + _currentlyFetching);
63      }
64    }
65
66    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
67    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
68    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
69
70    private WcfService wcfService;
71    private Heartbeat beat;
72
73    /// <summary>
74    /// Main Method for the client
75    /// </summary>
76    public void Start() {
77      abortRequested = false;
78      Logger.Info("Hive Client started");
79      ClientConsoleServer server = new ClientConsoleServer();
80      server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/ClientConsole/"));
81
82      ConfigManager manager = ConfigManager.Instance;
83      manager.Core = this;
84
85
86
87      //Register all Wcf Service references
88      wcfService = WcfService.Instance;
89      wcfService.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(wcfService_LoginCompleted);
90      wcfService.SendJobCompleted += new EventHandler<SendJobCompletedEventArgs>(wcfService_SendJobCompleted);
91      wcfService.StoreFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
92      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
93      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
94      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
95      wcfService.Connected += new EventHandler(wcfService_Connected);
96      //Recover Server IP and Port from the Settings Framework
97      ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();
98      if (cc.IPAdress != String.Empty && cc.Port != 0)
99        wcfService.SetIPAndPort(cc.IPAdress, cc.Port);
100
101      if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline())
102        wcfService.Connect();
103
104      //Initialize the heartbeat
105      beat = new Heartbeat { Interval = 10000 };
106      beat.StartHeartbeat();
107
108      MessageQueue queue = MessageQueue.GetInstance();
109
110      //Main processing loop     
111      //Todo: own thread for message handling
112      //Rly?!
113      while (!abortRequested) {
114        MessageContainer container = queue.GetMessage();       
115        DetermineAction(container);
116      }
117      Logger.Info("Program shutdown");
118    }
119
120    /// <summary>
121    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
122    /// </summary>
123    /// <param name="container">The Container, containing the message</param>
124    private void DetermineAction(MessageContainer container) {
125      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);       
126      switch (container.Message) {
127        //Server requests to abort a job
128        case MessageContainer.MessageType.AbortJob:         
129          if (engines.ContainsKey(container.JobId))
130            engines[container.JobId].Abort();
131          else
132            Logger.Error("AbortJob: Engine doesn't exist");
133          break;
134        //Job has been successfully aborted
135
136
137        case MessageContainer.MessageType.JobAborted:         
138          Guid jobId = new Guid(container.JobId.ToString());
139          KillAppDomain(jobId);
140          break;
141
142
143        //Request a Snapshot from the Execution Engine
144        case MessageContainer.MessageType.RequestSnapshot:         
145          if (engines.ContainsKey(container.JobId))
146            engines[container.JobId].RequestSnapshot();
147          else
148            Logger.Error("RequestSnapshot: Engine doesn't exist");
149          break;
150
151
152        //Snapshot is ready and can be sent back to the Server
153        case MessageContainer.MessageType.SnapshotReady:         
154          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
155          break;
156
157
158        //Pull a Job from the Server
159        case MessageContainer.MessageType.FetchJob:         
160          if (!CurrentlyFetching) {
161            wcfService.SendJobAsync(ConfigManager.Instance.GetClientInfo().Id);
162            CurrentlyFetching = true;
163          } else
164            Logger.Info("Currently fetching, won't fetch this time!");
165          break;         
166       
167       
168        //A Job has finished and can be sent back to the server
169        case MessageContainer.MessageType.FinishedJob:         
170          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
171          break;
172
173
174        //When the timeslice is up
175        case MessageContainer.MessageType.UptimeLimitDisconnect:
176          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
177
178          //check if there are running jobs
179          if (engines.Count > 0) {
180            //make sure there is no more fetching of jobs while the snapshots get processed
181            CurrentlyFetching = true;
182            //request a snapshot of each running job
183            foreach (KeyValuePair<Guid, Executor> kvp in engines) {
184              kvp.Value.RequestSnapshot();
185            }
186
187          } else {
188            //Disconnect afterwards
189            WcfService.Instance.Disconnect();
190          }
191          break;
192
193          //Fetch or Force Fetch Calendar!
194        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
195          Logger.Info("Fetch Calendar from Server");
196          FetchCalendarFromServer(); 
197        break;
198
199        //Hard shutdown of the client
200        case MessageContainer.MessageType.Shutdown:
201          Logger.Info("Shutdown Signal received");
202          lock (engines) {
203            Logger.Debug("engines locked");
204            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
205              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
206              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
207              AppDomain.Unload(kvp.Value);
208            }
209          }
210          Logger.Debug("Stopping heartbeat");
211          abortRequested = true;
212          beat.StopHeartBeat();
213          Logger.Debug("Logging out");
214          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
215          break;
216      }
217    }
218
219    //Asynchronous Threads for interaction with the Execution Engine
220    #region Async Threads for the EE
221
222    /// <summary>
223    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
224    /// once the connection gets reestablished, the job gets submitted
225    /// </summary>
226    /// <param name="jobId"></param>
227    private void GetFinishedJob(object jobId) {
228      Guid jId = (Guid)jobId;
229      Logger.Info("Getting the finished job with id: " + jId);
230      try {
231        if (!engines.ContainsKey(jId)) {
232          Logger.Info("Engine doesn't exist");
233          return;
234        }
235
236        byte[] sJob = engines[jId].GetFinishedJob();
237
238        if (WcfService.Instance.ConnState == NetworkEnum.WcfConnState.Loggedin) {
239          Logger.Info("Sending the finished job with id: " + jId);
240          wcfService.StoreFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id,
241            jId,
242            sJob,
243            1,
244            engines[jId].CurrentException,
245            true);
246        } else {
247          Logger.Info("Storing the finished job with id: " + jId + " to hdd");
248          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
249          KillAppDomain(jId);
250        }
251      }
252      catch (InvalidStateException ise) {
253        Logger.Error("Invalid State while Snapshoting:", ise);
254      }
255    }
256
257    private void GetSnapshot(object jobId) {
258      Logger.Info("Fetching a snapshot for job " + jobId);
259      Guid jId = (Guid)jobId;
260      byte[] obj = engines[jId].GetSnapshot();
261      Logger.Debug("BEGIN: Sending snapshot sync");
262      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
263        jId,
264        obj,
265        engines[jId].Progress,
266        null);
267      Logger.Debug("END: Sended snapshot sync");
268      //Uptime Limit reached, now is a good time to destroy this jobs.
269      Logger.Debug("Checking if uptime limit is reached");
270      if (!UptimeManager.Instance.IsOnline()) {
271        Logger.Debug("Uptime limit reached");
272        Logger.Debug("Killing Appdomain");
273        KillAppDomain(jId);
274        //Still anything running? 
275        if (engines.Count == 0) {
276          Logger.Info("All jobs snapshotted and sent back, disconnecting");         
277          WcfService.Instance.Disconnect();
278        } else {
279          Logger.Debug("There are still active Jobs in the Field, not disconnecting");
280        }
281
282      } else {
283        Logger.Debug("Restarting the job" + jobId);
284        engines[jId].StartOnlyJob();
285        Logger.Info("Restarted the job" + jobId);
286      }
287    }
288
289    #endregion
290
291    //Eventhandlers for the communication with the wcf Layer
292    #region wcfService Events
293    /// <summary>
294    /// Login has returned
295    /// </summary>
296    /// <param name="sender"></param>
297    /// <param name="e"></param>
298    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
299      if (e.Result.Success) {
300        CurrentlyFetching = false;
301        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
302      } else
303        Logger.Error("Error during login: " + e.Result.StatusMessage);
304    }
305
306    /// <summary>
307    /// A new Job from the wcfService has been received and will be started within a AppDomain.
308    /// </summary>
309    /// <param name="sender"></param>
310    /// <param name="e"></param>
311    void wcfService_SendJobCompleted(object sender, SendJobCompletedEventArgs e) {
312      if (e.Result.StatusMessage != ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT) {
313        Logger.Info("Received new job with id " + e.Result.Job.Id);     
314        bool sandboxed = false;       
315        Logger.Debug("Fetching plugins for job " + e.Result.Job.Id);
316
317        PluginCache.Instance.PreparePlugins(e.Result.Job.PluginsNeeded);
318
319        PluginCache.Instance.CopyPluginsForJob(e.Result.Job.PluginsNeeded, e.Result.Job.Id);
320
321        Logger.Debug("Plugins fetched for job " + e.Result.Job.Id);
322        try {
323          String pluginDir = Path.Combine(PluginCache.PLUGIN_REPO, e.Result.Job.Id.ToString());
324          AppDomain appDomain =
325            HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null);
326          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
327          lock (engines) {
328            if (!jobs.ContainsKey(e.Result.Job.Id)) {
329              jobs.Add(e.Result.Job.Id, e.Result.Job);
330              appDomains.Add(e.Result.Job.Id, appDomain);
331              Logger.Debug("Creating AppDomain");
332              Executor engine =
333                (Executor)
334                appDomain.CreateInstanceAndUnwrap(typeof (Executor).Assembly.GetName().Name, typeof (Executor).FullName);
335              Logger.Debug("Created AppDomain");
336              engine.JobId = e.Result.Job.Id;
337              engine.Queue = MessageQueue.GetInstance();
338              Logger.Debug("Starting Engine for job " + e.Result.Job.Id);
339              engine.Start(e.Data);
340              engines.Add(e.Result.Job.Id, engine);
341
342              ClientStatusInfo.JobsFetched++;
343
344              Logger.Info("Increment FetchedJobs to:" + ClientStatusInfo.JobsFetched);
345            }
346          }
347        } catch(Exception exception) {
348          Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Job.Id);
349          Logger.Error("Error thrown is: ", exception);
350          CurrentlyFetching = false;
351          KillAppDomain(e.Result.Job.Id);
352          wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Job.Id, new byte[] { },
353                                                 1, exception, true);
354        }
355      } else
356        Logger.Info("No more jobs left!");
357      CurrentlyFetching = false;
358    }
359
360    /// <summary>
361    /// A finished job has been stored on the server
362    /// </summary>
363    /// <param name="sender"></param>
364    /// <param name="e"></param>
365    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
366      Logger.Info("Job submitted with id " + e.Result.JobId);
367      KillAppDomain(e.Result.JobId);
368      if (e.Result.Success) {
369        ClientStatusInfo.JobsProcessed++;
370        Logger.Info("Increased ProcessedJobs to:" + ClientStatusInfo.JobsProcessed);
371      } else {
372        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
373      }
374    }
375
376    /// <summary>
377    /// A snapshot has been stored on the server
378    /// </summary>
379    /// <param name="sender"></param>
380    /// <param name="e"></param>
381    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
382      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
383    }
384
385    /// <summary>
386    /// The server has been changed. All Appdomains and Jobs must be aborted!
387    /// </summary>
388    /// <param name="sender"></param>
389    /// <param name="e"></param>
390    void wcfService_ServerChanged(object sender, EventArgs e) {
391      Logger.Info("ServerChanged has been called");
392      lock (engines) {
393        foreach (KeyValuePair<Guid, Executor> entries in engines) {
394          engines[entries.Key].Abort();
395          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
396          //AppDomain.Unload(appDomains[entries.Key]);
397        }
398        //appDomains = new Dictionary<Guid, AppDomain>();
399        //engines = new Dictionary<Guid, Executor>();
400        //jobs = new Dictionary<Guid, Job>();
401      }
402    }
403
404    /// <summary>
405    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
406    /// </summary>
407    /// <param name="sender"></param>
408    /// <param name="e"></param>
409    void wcfService_Connected(object sender, EventArgs e) {
410      Logger.Info("WCF Service got a connection");
411      if (!UptimeManager.Instance.CalendarAvailable) {
412        Logger.Info("No local calendar available, fetch it");
413        FetchCalendarFromServer();
414      }
415      //if the fetching from the server failed - still set the client online... maybe we get
416      //a result within the next few heartbeats     
417      if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
418        Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsOnline());
419        Logger.Info("Setting client online");
420        wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
421        JobStorageManager.CheckAndSubmitJobsFromDisc();
422        CurrentlyFetching = false;
423      }
424    }
425
426    private void FetchCalendarFromServer() {
427      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
428      if(calres.Success) {
429        if (UptimeManager.Instance.SetAppointments(false, calres)) {
430          Logger.Info("Remote calendar installed");
431          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
432        } else {
433          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
434          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
435        }
436      } else {
437        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
438        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
439      }
440    }
441
442    //this is a little bit tricky -
443    void wcfService_ConnectionRestored(object sender, EventArgs e) {
444      Logger.Info("Reconnected to old server - checking currently running appdomains");
445
446      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
447        if (!execKVP.Value.Running && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
448          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
449          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
450          finThread.Start(execKVP.Value.JobId);
451        }
452      }
453    }
454
455    #endregion
456
457    public Dictionary<Guid, Executor> GetExecutionEngines() {
458      return engines;
459    }
460
461    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
462      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());   
463    }
464
465    internal Dictionary<Guid, JobDto> GetJobs() {
466      return jobs;
467    }
468
469    /// <summary>
470    /// Kill a appdomain with a specific id.
471    /// </summary>
472    /// <param name="id">the GUID of the job</param>
473    private void KillAppDomain(Guid id) {
474      Logger.Debug("Shutting down Appdomain for Job " + id);
475      lock (engines) {
476        try {
477          if(engines.ContainsKey(id))
478            engines[id].Dispose();
479          if (appDomains.ContainsKey(id)) {
480            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
481            AppDomain.Unload(appDomains[id]);
482            appDomains.Remove(id);
483          }
484          engines.Remove(id);
485          jobs.Remove(id);
486          PluginCache.Instance.DeletePluginsForJob(id);
487          GC.Collect();
488        }
489        catch (Exception ex) {
490          Logger.Error("Exception when unloading the appdomain: ", ex);
491        }
492      }
493      GC.Collect();
494    }
495  }
496}
Note: See TracBrowser for help on using the repository browser.