Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.2/sources/HeuristicLab.Hive.Client.Core/3.2/Core.cs @ 3931

Last change on this file since 3931 was 3931, checked in by kgrading, 15 years ago

added minor speedups and better transaction handling to the server (#828)

File size: 20.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Client.ExecutionEngine;
27using HeuristicLab.Hive.Client.Common;
28using System.Threading;
29using System.Reflection;
30using System.Diagnostics;
31using System.Security.Permissions;
32using System.Security.Policy;
33using System.Security;
34using HeuristicLab.Hive.Client.Communication;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Hive.Contracts;
37using System.Runtime.Remoting.Messaging;
38using HeuristicLab.PluginInfrastructure;
39using System.ServiceModel;
40using System.ServiceModel.Description;
41using HeuristicLab.Hive.Client.Core.ClientConsoleService;
42using HeuristicLab.Hive.Client.Core.ConfigurationManager;
43using HeuristicLab.Hive.Client.Communication.ServerService;
44using HeuristicLab.Hive.JobBase;
45using HeuristicLab.Hive.Client.Core.JobStorage;
46using HeuristicLab.Tracing;
47
48namespace HeuristicLab.Hive.Client.Core {
49  /// <summary>
50  /// The core component of the Hive Client
51  /// </summary>
52  public class Core : MarshalByRefObject {
53    public static bool abortRequested { get; set; }
54
55    private bool _currentlyFetching;
56    private bool CurrentlyFetching {
57      get {
58        return _currentlyFetching;
59      } set {       
60        _currentlyFetching = value;
61        Logger.Debug("Set CurrentlyFetching to " + _currentlyFetching);
62      }
63    }
64
65    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
66    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
67    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
68
69    private WcfService wcfService;
70    private Heartbeat beat;
71
72    /// <summary>
73    /// Main Method for the client
74    /// </summary>
75    public void Start() {
76      abortRequested = false;
77      Logger.Info("Hive Client started");
78      ClientConsoleServer server = new ClientConsoleServer();
79      server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/ClientConsole/"));
80
81      ConfigManager manager = ConfigManager.Instance;
82      manager.Core = this;
83
84
85
86      //Register all Wcf Service references
87      wcfService = WcfService.Instance;
88      wcfService.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(wcfService_LoginCompleted);
89      wcfService.SendJobCompleted += new EventHandler<SendJobCompletedEventArgs>(wcfService_SendJobCompleted);
90      wcfService.StoreFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
91      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
92      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
93      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
94      wcfService.Connected += new EventHandler(wcfService_Connected);
95      //Recover Server IP and Port from the Settings Framework
96      ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();
97      if (cc.IPAdress != String.Empty && cc.Port != 0)
98        wcfService.SetIPAndPort(cc.IPAdress, cc.Port);
99
100      if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline())
101        wcfService.Connect();
102
103      //Initialize the heartbeat
104      beat = new Heartbeat { Interval = 10000 };
105      beat.StartHeartbeat();
106
107      MessageQueue queue = MessageQueue.GetInstance();
108
109      //Main processing loop     
110      //Todo: own thread for message handling
111      //Rly?!
112      while (!abortRequested) {
113        MessageContainer container = queue.GetMessage();       
114        DetermineAction(container);
115      }
116      Logger.Info("Program shutdown");
117    }
118
119    /// <summary>
120    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
121    /// </summary>
122    /// <param name="container">The Container, containing the message</param>
123    private void DetermineAction(MessageContainer container) {
124      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);       
125      switch (container.Message) {
126        //Server requests to abort a job
127        case MessageContainer.MessageType.AbortJob:         
128          if (engines.ContainsKey(container.JobId))
129            engines[container.JobId].Abort();
130          else
131            Logger.Error("AbortJob: Engine doesn't exist");
132          break;
133        //Job has been successfully aborted
134
135
136        case MessageContainer.MessageType.JobAborted:         
137        //todo: thread this         
138          lock (engines) {
139            Guid jobId = new Guid(container.JobId.ToString());
140            if (engines.ContainsKey(jobId)) {
141              appDomains[jobId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
142              AppDomain.Unload(appDomains[jobId]);
143              appDomains.Remove(jobId);
144              engines.Remove(jobId);
145              jobs.Remove(jobId);
146              GC.Collect();
147            } else
148              Logger.Error("JobAbort: Engine doesn't exist");
149          }
150          break;
151
152
153        //Request a Snapshot from the Execution Engine
154        case MessageContainer.MessageType.RequestSnapshot:         
155          if (engines.ContainsKey(container.JobId))
156            engines[container.JobId].RequestSnapshot();
157          else
158            Logger.Error("RequestSnapshot: Engine doesn't exist");
159          break;
160
161
162        //Snapshot is ready and can be sent back to the Server
163        case MessageContainer.MessageType.SnapshotReady:         
164          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
165          break;
166
167
168        //Pull a Job from the Server
169        case MessageContainer.MessageType.FetchJob:         
170          if (!CurrentlyFetching) {
171            wcfService.SendJobAsync(ConfigManager.Instance.GetClientInfo().Id);
172            CurrentlyFetching = true;
173          } else
174            Logger.Info("Currently fetching, won't fetch this time!");
175          break;         
176       
177       
178        //A Job has finished and can be sent back to the server
179        case MessageContainer.MessageType.FinishedJob:         
180          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
181          break;
182
183
184        //When the timeslice is up
185        case MessageContainer.MessageType.UptimeLimitDisconnect:
186          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
187
188          //check if there are running jobs
189          if (engines.Count > 0) {
190            //make sure there is no more fetching of jobs while the snapshots get processed
191            CurrentlyFetching = true;
192            //request a snapshot of each running job
193            foreach (KeyValuePair<Guid, Executor> kvp in engines) {
194              kvp.Value.RequestSnapshot();
195            }
196
197          } else {
198            //Disconnect afterwards
199            WcfService.Instance.Disconnect();
200          }
201          break;
202
203          //Fetch or Force Fetch Calendar!
204        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
205          Logger.Info("Fetch Calendar from Server");
206          FetchCalendarFromServer(); 
207        break;
208
209        //Hard shutdown of the client
210        case MessageContainer.MessageType.Shutdown:
211          Logger.Info("Shutdown Signal received");
212          lock (engines) {
213            Logger.Debug("engines locked");
214            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
215              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
216              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
217              AppDomain.Unload(kvp.Value);
218            }
219          }
220          Logger.Debug("Stopping heartbeat");
221          abortRequested = true;
222          beat.StopHeartBeat();
223          Logger.Debug("Logging out");
224          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
225          break;
226      }
227    }
228
229    //Asynchronous Threads for interaction with the Execution Engine
230    #region Async Threads for the EE
231
232    /// <summary>
233    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
234    /// once the connection gets reestablished, the job gets submitted
235    /// </summary>
236    /// <param name="jobId"></param>
237    private void GetFinishedJob(object jobId) {
238      Guid jId = (Guid)jobId;
239      Logger.Info("Getting the finished job with id: " + jId);
240      try {
241        if (!engines.ContainsKey(jId)) {
242          Logger.Info("Engine doesn't exist");
243          return;
244        }
245
246        byte[] sJob = engines[jId].GetFinishedJob();
247
248        if (WcfService.Instance.ConnState == NetworkEnum.WcfConnState.Loggedin) {
249          Logger.Info("Sending the finished job with id: " + jId);
250          wcfService.StoreFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id,
251            jId,
252            sJob,
253            1,
254            null,
255            true);
256        } else {
257          Logger.Info("Storing the finished job with id: " + jId + " to hdd");
258          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
259          lock (engines) {
260            appDomains[jId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
261           
262            //Disposing it
263            engines[jId].Dispose();
264           
265            AppDomain.Unload(appDomains[jId]);
266            Logger.Debug("Unloaded appdomain");
267            appDomains.Remove(jId);           
268            engines.Remove(jId);           
269            jobs.Remove(jId);
270            Logger.Debug("Removed job from appDomains, Engines and Jobs");
271          }
272        }
273      }
274      catch (InvalidStateException ise) {
275        Logger.Error("Invalid State while Snapshoting:", ise);
276      }
277    }
278
279    private void GetSnapshot(object jobId) {
280      Logger.Info("Fetching a snapshot for job " + jobId);
281      Guid jId = (Guid)jobId;
282      byte[] obj = engines[jId].GetSnapshot();
283      Logger.Debug("BEGIN: Sending snapshot sync");
284      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
285        jId,
286        obj,
287        engines[jId].Progress,
288        null);
289      Logger.Debug("END: Sended snapshot sync");
290      //Uptime Limit reached, now is a good time to destroy this jobs.
291      Logger.Debug("Checking if uptime limit is reached");
292      if (!UptimeManager.Instance.IsOnline()) {
293        Logger.Debug("Uptime limit reached");
294        Logger.Debug("Killing Appdomain");
295        KillAppDomain(jId);
296        //Still anything running? 
297        if (engines.Count == 0) {
298          Logger.Info("All jobs snapshotted and sent back, disconnecting");         
299          WcfService.Instance.Disconnect();
300        } else {
301          Logger.Debug("There are still active Jobs in the Field, not disconnecting");
302        }
303
304      } else {
305        Logger.Debug("Restarting the job" + jobId);
306        engines[jId].StartOnlyJob();
307        Logger.Info("Restarted the job" + jobId);
308      }
309    }
310
311    #endregion
312
313    //Eventhandlers for the communication with the wcf Layer
314    #region wcfService Events
315    /// <summary>
316    /// Login has returned
317    /// </summary>
318    /// <param name="sender"></param>
319    /// <param name="e"></param>
320    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
321      if (e.Result.Success) {
322        CurrentlyFetching = false;
323        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
324      } else
325        Logger.Error("Error during login: " + e.Result.StatusMessage);
326    }
327
328    /// <summary>
329    /// A new Job from the wcfService has been received and will be started within a AppDomain.
330    /// </summary>
331    /// <param name="sender"></param>
332    /// <param name="e"></param>
333    void wcfService_SendJobCompleted(object sender, SendJobCompletedEventArgs e) {
334      if (e.Result.StatusMessage != ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT) {
335        Logger.Info("Received new job with id " + e.Result.Job.Id);     
336        bool sandboxed = false;
337        List<byte[]> files = new List<byte[]>();
338        Logger.Debug("Fetching plugins for job " + e.Result.Job.Id);
339        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
340          files.AddRange(plugininfo.PluginFiles);
341        Logger.Debug("Plugins fetched for job " + e.Result.Job.Id);
342        AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(e.Result.Job.Id.ToString(), files);
343        appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
344        lock (engines) {
345          if (!jobs.ContainsKey(e.Result.Job.Id)) {
346            jobs.Add(e.Result.Job.Id, e.Result.Job);
347            appDomains.Add(e.Result.Job.Id, appDomain);
348            Logger.Debug("Creating AppDomain");
349            Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
350            Logger.Debug("Created AppDomain");
351            engine.JobId = e.Result.Job.Id;
352            engine.Queue = MessageQueue.GetInstance();
353            Logger.Debug("Starting Engine for job " + e.Result.Job.Id);
354            engine.Start(e.Data);
355            engines.Add(e.Result.Job.Id, engine);
356
357            ClientStatusInfo.JobsFetched++;
358
359            Logger.Info("Increment FetchedJobs to:" + ClientStatusInfo.JobsFetched);
360          }
361        }
362      } else
363        Logger.Info("No more jobs left!");
364      CurrentlyFetching = false;
365    }
366
367    /// <summary>
368    /// A finished job has been stored on the server
369    /// </summary>
370    /// <param name="sender"></param>
371    /// <param name="e"></param>
372    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
373      Logger.Info("Job submitted with id " + e.Result.JobId);
374      KillAppDomain(e.Result.JobId);
375      if (e.Result.Success) {
376        ClientStatusInfo.JobsProcessed++;
377        Logger.Info("Increased ProcessedJobs to:" + ClientStatusInfo.JobsProcessed);
378      } else {
379        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
380      }
381    }
382
383    /// <summary>
384    /// A snapshot has been stored on the server
385    /// </summary>
386    /// <param name="sender"></param>
387    /// <param name="e"></param>
388    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
389      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
390    }
391
392    /// <summary>
393    /// The server has been changed. All Appdomains and Jobs must be aborted!
394    /// </summary>
395    /// <param name="sender"></param>
396    /// <param name="e"></param>
397    void wcfService_ServerChanged(object sender, EventArgs e) {
398      Logger.Info("ServerChanged has been called");
399      lock (engines) {
400        foreach (KeyValuePair<Guid, Executor> entries in engines) {
401          engines[entries.Key].Abort();
402          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
403          //AppDomain.Unload(appDomains[entries.Key]);
404        }
405        //appDomains = new Dictionary<Guid, AppDomain>();
406        //engines = new Dictionary<Guid, Executor>();
407        //jobs = new Dictionary<Guid, Job>();
408      }
409    }
410
411    /// <summary>
412    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
413    /// </summary>
414    /// <param name="sender"></param>
415    /// <param name="e"></param>
416    void wcfService_Connected(object sender, EventArgs e) {
417      Logger.Info("WCF Service got a connection");
418      if (!UptimeManager.Instance.CalendarAvailable) {
419        Logger.Info("No local calendar available, fetch it");
420        FetchCalendarFromServer();
421      }
422      //if the fetching from the server failed - still set the client online... maybe we get
423      //a result within the next few heartbeats     
424      if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
425        Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsOnline());
426        Logger.Info("Setting client online");
427        wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
428        JobStorageManager.CheckAndSubmitJobsFromDisc();
429        CurrentlyFetching = false;
430      }
431    }
432
433    private void FetchCalendarFromServer() {
434      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
435      if(calres.Success) {
436        if (UptimeManager.Instance.SetAppointments(false, calres)) {
437          Logger.Info("Remote calendar installed");
438          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
439        } else {
440          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
441          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
442        }
443      } else {
444        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
445        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
446      }
447    }
448
449    //this is a little bit tricky -
450    void wcfService_ConnectionRestored(object sender, EventArgs e) {
451      Logger.Info("Reconnected to old server - checking currently running appdomains");
452
453      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
454        if (!execKVP.Value.Running && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
455          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
456          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
457          finThread.Start(execKVP.Value.JobId);
458        }
459      }
460    }
461
462    #endregion
463
464    public Dictionary<Guid, Executor> GetExecutionEngines() {
465      return engines;
466    }
467
468    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
469      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());   
470    }
471
472    internal Dictionary<Guid, JobDto> GetJobs() {
473      return jobs;
474    }
475
476    /// <summary>
477    /// Kill a appdomain with a specific id.
478    /// </summary>
479    /// <param name="id">the GUID of the job</param>
480    private void KillAppDomain(Guid id) {
481      Logger.Debug("Shutting down Appdomain for Job " + id);
482      lock (engines) {
483        try {
484          engines[id].Dispose(); 
485          appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
486          AppDomain.Unload(appDomains[id]);
487          appDomains.Remove(id);
488          engines.Remove(id);
489          jobs.Remove(id);
490        }
491        catch (Exception ex) {
492          Logger.Error("Exception when unloading the appdomain: ", ex);
493        }
494      }
495      GC.Collect();
496    }
497  }
498}
Note: See TracBrowser for help on using the repository browser.