Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.2/sources/HeuristicLab.Hive.Client.Core/3.2/Core.cs @ 4042

Last change on this file since 4042 was 4042, checked in by kgrading, 14 years ago

#828 added various improvements to the plugin cache manager, the execution engine, the transaction handling on the serverside and the server console

File size: 20.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Client.ExecutionEngine;
27using HeuristicLab.Hive.Client.Common;
28using System.Threading;
29using System.Reflection;
30using System.Diagnostics;
31using System.Security.Permissions;
32using System.Security.Policy;
33using System.Security;
34using HeuristicLab.Hive.Client.Communication;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Hive.Contracts;
37using System.Runtime.Remoting.Messaging;
38using HeuristicLab.PluginInfrastructure;
39using System.ServiceModel;
40using System.ServiceModel.Description;
41using HeuristicLab.Hive.Client.Core.ClientConsoleService;
42using HeuristicLab.Hive.Client.Core.ConfigurationManager;
43using HeuristicLab.Hive.Client.Communication.ServerService;
44using HeuristicLab.Hive.JobBase;
45using HeuristicLab.Hive.Client.Core.JobStorage;
46using HeuristicLab.Tracing;
47
48namespace HeuristicLab.Hive.Client.Core {
49  /// <summary>
50  /// The core component of the Hive Client
51  /// </summary>
52  public class Core : MarshalByRefObject {
53    public static bool abortRequested { get; set; }
54
55    private bool _currentlyFetching;
56    private bool CurrentlyFetching {
57      get {
58        return _currentlyFetching;
59      } set {       
60        _currentlyFetching = value;
61        Logger.Debug("Set CurrentlyFetching to " + _currentlyFetching);
62      }
63    }
64
65    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
66    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
67    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
68
69    private WcfService wcfService;
70    private Heartbeat beat;
71
72    /// <summary>
73    /// Main Method for the client
74    /// </summary>
75    public void Start() {
76      abortRequested = false;
77      Logger.Info("Hive Client started");
78      ClientConsoleServer server = new ClientConsoleServer();
79      server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/ClientConsole/"));
80
81      ConfigManager manager = ConfigManager.Instance;
82      manager.Core = this;
83
84
85
86      //Register all Wcf Service references
87      wcfService = WcfService.Instance;
88      wcfService.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(wcfService_LoginCompleted);
89      wcfService.SendJobCompleted += new EventHandler<SendJobCompletedEventArgs>(wcfService_SendJobCompleted);
90      wcfService.StoreFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
91      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
92      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
93      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
94      wcfService.Connected += new EventHandler(wcfService_Connected);
95      //Recover Server IP and Port from the Settings Framework
96      ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();
97      if (cc.IPAdress != String.Empty && cc.Port != 0)
98        wcfService.SetIPAndPort(cc.IPAdress, cc.Port);
99
100      if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline())
101        wcfService.Connect();
102
103      //Initialize the heartbeat
104      beat = new Heartbeat { Interval = 10000 };
105      beat.StartHeartbeat();
106
107      MessageQueue queue = MessageQueue.GetInstance();
108
109      //Main processing loop     
110      //Todo: own thread for message handling
111      //Rly?!
112      while (!abortRequested) {
113        MessageContainer container = queue.GetMessage();       
114        DetermineAction(container);
115      }
116      Logger.Info("Program shutdown");
117    }
118
119    /// <summary>
120    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
121    /// </summary>
122    /// <param name="container">The Container, containing the message</param>
123    private void DetermineAction(MessageContainer container) {
124      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);       
125      switch (container.Message) {
126        //Server requests to abort a job
127        case MessageContainer.MessageType.AbortJob:         
128          if (engines.ContainsKey(container.JobId))
129            engines[container.JobId].Abort();
130          else
131            Logger.Error("AbortJob: Engine doesn't exist");
132          break;
133        //Job has been successfully aborted
134
135
136        case MessageContainer.MessageType.JobAborted:         
137        //todo: thread this         
138          lock (engines) {
139            Guid jobId = new Guid(container.JobId.ToString());
140            if (engines.ContainsKey(jobId)) {
141              appDomains[jobId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
142              AppDomain.Unload(appDomains[jobId]);
143              appDomains.Remove(jobId);
144              engines.Remove(jobId);
145              jobs.Remove(jobId);
146              GC.Collect();
147            } else
148              Logger.Error("JobAbort: Engine doesn't exist");
149          }
150          break;
151
152
153        //Request a Snapshot from the Execution Engine
154        case MessageContainer.MessageType.RequestSnapshot:         
155          if (engines.ContainsKey(container.JobId))
156            engines[container.JobId].RequestSnapshot();
157          else
158            Logger.Error("RequestSnapshot: Engine doesn't exist");
159          break;
160
161
162        //Snapshot is ready and can be sent back to the Server
163        case MessageContainer.MessageType.SnapshotReady:         
164          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
165          break;
166
167
168        //Pull a Job from the Server
169        case MessageContainer.MessageType.FetchJob:         
170          if (!CurrentlyFetching) {
171            wcfService.SendJobAsync(ConfigManager.Instance.GetClientInfo().Id);
172            CurrentlyFetching = true;
173          } else
174            Logger.Info("Currently fetching, won't fetch this time!");
175          break;         
176       
177       
178        //A Job has finished and can be sent back to the server
179        case MessageContainer.MessageType.FinishedJob:         
180          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
181          break;
182
183
184        //When the timeslice is up
185        case MessageContainer.MessageType.UptimeLimitDisconnect:
186          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
187
188          //check if there are running jobs
189          if (engines.Count > 0) {
190            //make sure there is no more fetching of jobs while the snapshots get processed
191            CurrentlyFetching = true;
192            //request a snapshot of each running job
193            foreach (KeyValuePair<Guid, Executor> kvp in engines) {
194              kvp.Value.RequestSnapshot();
195            }
196
197          } else {
198            //Disconnect afterwards
199            WcfService.Instance.Disconnect();
200          }
201          break;
202
203          //Fetch or Force Fetch Calendar!
204        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
205          Logger.Info("Fetch Calendar from Server");
206          FetchCalendarFromServer(); 
207        break;
208
209        //Hard shutdown of the client
210        case MessageContainer.MessageType.Shutdown:
211          Logger.Info("Shutdown Signal received");
212          lock (engines) {
213            Logger.Debug("engines locked");
214            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
215              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
216              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
217              AppDomain.Unload(kvp.Value);
218            }
219          }
220          Logger.Debug("Stopping heartbeat");
221          abortRequested = true;
222          beat.StopHeartBeat();
223          Logger.Debug("Logging out");
224          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
225          break;
226      }
227    }
228
229    //Asynchronous Threads for interaction with the Execution Engine
230    #region Async Threads for the EE
231
232    /// <summary>
233    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
234    /// once the connection gets reestablished, the job gets submitted
235    /// </summary>
236    /// <param name="jobId"></param>
237    private void GetFinishedJob(object jobId) {
238      Guid jId = (Guid)jobId;
239      Logger.Info("Getting the finished job with id: " + jId);
240      try {
241        if (!engines.ContainsKey(jId)) {
242          Logger.Info("Engine doesn't exist");
243          return;
244        }
245
246        byte[] sJob = engines[jId].GetFinishedJob();
247
248        if (WcfService.Instance.ConnState == NetworkEnum.WcfConnState.Loggedin) {
249          Logger.Info("Sending the finished job with id: " + jId);
250          wcfService.StoreFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id,
251            jId,
252            sJob,
253            1,
254            null,
255            true);
256        } else {
257          Logger.Info("Storing the finished job with id: " + jId + " to hdd");
258          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
259          lock (engines) {
260            appDomains[jId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
261           
262            //Disposing it
263            engines[jId].Dispose();
264           
265            AppDomain.Unload(appDomains[jId]);
266            Logger.Debug("Unloaded appdomain");
267            appDomains.Remove(jId);           
268            engines.Remove(jId);           
269            jobs.Remove(jId);
270            Logger.Debug("Removed job from appDomains, Engines and Jobs");
271          }
272        }
273      }
274      catch (InvalidStateException ise) {
275        Logger.Error("Invalid State while Snapshoting:", ise);
276      }
277    }
278
279    private void GetSnapshot(object jobId) {
280      Logger.Info("Fetching a snapshot for job " + jobId);
281      Guid jId = (Guid)jobId;
282      byte[] obj = engines[jId].GetSnapshot();
283      Logger.Debug("BEGIN: Sending snapshot sync");
284      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
285        jId,
286        obj,
287        engines[jId].Progress,
288        null);
289      Logger.Debug("END: Sended snapshot sync");
290      //Uptime Limit reached, now is a good time to destroy this jobs.
291      Logger.Debug("Checking if uptime limit is reached");
292      if (!UptimeManager.Instance.IsOnline()) {
293        Logger.Debug("Uptime limit reached");
294        Logger.Debug("Killing Appdomain");
295        KillAppDomain(jId);
296        //Still anything running? 
297        if (engines.Count == 0) {
298          Logger.Info("All jobs snapshotted and sent back, disconnecting");         
299          WcfService.Instance.Disconnect();
300        } else {
301          Logger.Debug("There are still active Jobs in the Field, not disconnecting");
302        }
303
304      } else {
305        Logger.Debug("Restarting the job" + jobId);
306        engines[jId].StartOnlyJob();
307        Logger.Info("Restarted the job" + jobId);
308      }
309    }
310
311    #endregion
312
313    //Eventhandlers for the communication with the wcf Layer
314    #region wcfService Events
315    /// <summary>
316    /// Login has returned
317    /// </summary>
318    /// <param name="sender"></param>
319    /// <param name="e"></param>
320    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
321      if (e.Result.Success) {
322        CurrentlyFetching = false;
323        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
324      } else
325        Logger.Error("Error during login: " + e.Result.StatusMessage);
326    }
327
328    /// <summary>
329    /// A new Job from the wcfService has been received and will be started within a AppDomain.
330    /// </summary>
331    /// <param name="sender"></param>
332    /// <param name="e"></param>
333    void wcfService_SendJobCompleted(object sender, SendJobCompletedEventArgs e) {
334      if (e.Result.StatusMessage != ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT) {
335        Logger.Info("Received new job with id " + e.Result.Job.Id);     
336        bool sandboxed = false;       
337        Logger.Debug("Fetching plugins for job " + e.Result.Job.Id);
338
339        PluginCache.Instance.PreparePlugins(e.Result.Job.PluginsNeeded);
340
341//        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
342  //        files.AddRange(plugininfo.PluginFiles);
343        Logger.Debug("Plugins fetched for job " + e.Result.Job.Id);
344        try {
345          AppDomain appDomain =
346            HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(
347              e.Result.Job.Id.ToString(), null);
348          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
349          lock (engines) {
350            if (!jobs.ContainsKey(e.Result.Job.Id)) {
351              jobs.Add(e.Result.Job.Id, e.Result.Job);
352              appDomains.Add(e.Result.Job.Id, appDomain);
353              Logger.Debug("Creating AppDomain");
354              Executor engine =
355                (Executor)
356                appDomain.CreateInstanceAndUnwrap(typeof (Executor).Assembly.GetName().Name, typeof (Executor).FullName);
357              Logger.Debug("Created AppDomain");
358              engine.JobId = e.Result.Job.Id;
359              engine.Queue = MessageQueue.GetInstance();
360              Logger.Debug("Starting Engine for job " + e.Result.Job.Id);
361              engine.Start(e.Data);
362              engines.Add(e.Result.Job.Id, engine);
363
364              ClientStatusInfo.JobsFetched++;
365
366              Logger.Info("Increment FetchedJobs to:" + ClientStatusInfo.JobsFetched);
367            }
368          }
369        } catch(Exception exception) {
370          Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Job.Id);
371          Logger.Error("Error thrown is: ", exception);
372          CurrentlyFetching = false;
373          KillAppDomain(e.Result.Job.Id);
374        }
375      } else
376        Logger.Info("No more jobs left!");
377      CurrentlyFetching = false;
378    }
379
380    /// <summary>
381    /// A finished job has been stored on the server
382    /// </summary>
383    /// <param name="sender"></param>
384    /// <param name="e"></param>
385    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
386      Logger.Info("Job submitted with id " + e.Result.JobId);
387      KillAppDomain(e.Result.JobId);
388      if (e.Result.Success) {
389        ClientStatusInfo.JobsProcessed++;
390        Logger.Info("Increased ProcessedJobs to:" + ClientStatusInfo.JobsProcessed);
391      } else {
392        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
393      }
394    }
395
396    /// <summary>
397    /// A snapshot has been stored on the server
398    /// </summary>
399    /// <param name="sender"></param>
400    /// <param name="e"></param>
401    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
402      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
403    }
404
405    /// <summary>
406    /// The server has been changed. All Appdomains and Jobs must be aborted!
407    /// </summary>
408    /// <param name="sender"></param>
409    /// <param name="e"></param>
410    void wcfService_ServerChanged(object sender, EventArgs e) {
411      Logger.Info("ServerChanged has been called");
412      lock (engines) {
413        foreach (KeyValuePair<Guid, Executor> entries in engines) {
414          engines[entries.Key].Abort();
415          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
416          //AppDomain.Unload(appDomains[entries.Key]);
417        }
418        //appDomains = new Dictionary<Guid, AppDomain>();
419        //engines = new Dictionary<Guid, Executor>();
420        //jobs = new Dictionary<Guid, Job>();
421      }
422    }
423
424    /// <summary>
425    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
426    /// </summary>
427    /// <param name="sender"></param>
428    /// <param name="e"></param>
429    void wcfService_Connected(object sender, EventArgs e) {
430      Logger.Info("WCF Service got a connection");
431      if (!UptimeManager.Instance.CalendarAvailable) {
432        Logger.Info("No local calendar available, fetch it");
433        FetchCalendarFromServer();
434      }
435      //if the fetching from the server failed - still set the client online... maybe we get
436      //a result within the next few heartbeats     
437      if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
438        Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsOnline());
439        Logger.Info("Setting client online");
440        wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
441        JobStorageManager.CheckAndSubmitJobsFromDisc();
442        CurrentlyFetching = false;
443      }
444    }
445
446    private void FetchCalendarFromServer() {
447      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
448      if(calres.Success) {
449        if (UptimeManager.Instance.SetAppointments(false, calres)) {
450          Logger.Info("Remote calendar installed");
451          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
452        } else {
453          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
454          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
455        }
456      } else {
457        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
458        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
459      }
460    }
461
462    //this is a little bit tricky -
463    void wcfService_ConnectionRestored(object sender, EventArgs e) {
464      Logger.Info("Reconnected to old server - checking currently running appdomains");
465
466      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
467        if (!execKVP.Value.Running && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
468          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
469          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
470          finThread.Start(execKVP.Value.JobId);
471        }
472      }
473    }
474
475    #endregion
476
477    public Dictionary<Guid, Executor> GetExecutionEngines() {
478      return engines;
479    }
480
481    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
482      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());   
483    }
484
485    internal Dictionary<Guid, JobDto> GetJobs() {
486      return jobs;
487    }
488
489    /// <summary>
490    /// Kill a appdomain with a specific id.
491    /// </summary>
492    /// <param name="id">the GUID of the job</param>
493    private void KillAppDomain(Guid id) {
494      Logger.Debug("Shutting down Appdomain for Job " + id);
495      lock (engines) {
496        try {
497          if(engines.ContainsKey(id))
498            engines[id].Dispose();
499          if (appDomains.ContainsKey(id)) {
500            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
501            AppDomain.Unload(appDomains[id]);
502            appDomains.Remove(id);
503          }
504          engines.Remove(id);
505          jobs.Remove(id);
506          GC.Collect();
507        }
508        catch (Exception ex) {
509          Logger.Error("Exception when unloading the appdomain: ", ex);
510        }
511      }
512      GC.Collect();
513    }
514  }
515}
Note: See TracBrowser for help on using the repository browser.