Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Client.Core/3.2/Core.cs @ 3578

Last change on this file since 3578 was 3578, checked in by kgrading, 14 years ago

Removed References to HiveLogging and updated the default logging mechanism (#991)

File size: 20.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Client.ExecutionEngine;
27using HeuristicLab.Hive.Client.Common;
28using System.Threading;
29using System.Reflection;
30using System.Diagnostics;
31using System.Security.Permissions;
32using System.Security.Policy;
33using System.Security;
34using HeuristicLab.Hive.Client.Communication;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Hive.Contracts;
37using System.Runtime.Remoting.Messaging;
38using HeuristicLab.PluginInfrastructure;
39using System.ServiceModel;
40using System.ServiceModel.Description;
41using HeuristicLab.Hive.Client.Core.ClientConsoleService;
42using HeuristicLab.Hive.Client.Core.ConfigurationManager;
43using HeuristicLab.Hive.Client.Communication.ServerService;
44using HeuristicLab.Hive.JobBase;
45using HeuristicLab.Hive.Client.Core.JobStorage;
46using HeuristicLab.Tracing;
47
48namespace HeuristicLab.Hive.Client.Core {
49  /// <summary>
50  /// The core component of the Hive Client
51  /// </summary>
52  public class Core : MarshalByRefObject {
53    public static bool abortRequested { get; set; }
54
55    private bool _currentlyFetching;
56    private bool CurrentlyFetching {
57      get {
58        return _currentlyFetching;
59      } set {       
60        _currentlyFetching = value;
61        Logger.Debug("Set CurrentlyFetching to " + _currentlyFetching);
62      }
63    }
64
65    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
66    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
67    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
68
69    private WcfService wcfService;
70    private Heartbeat beat;
71
72    /// <summary>
73    /// Main Method for the client
74    /// </summary>
75    public void Start() {
76      abortRequested = false;
77      Logger.Info("Hive Client started");
78      ClientConsoleServer server = new ClientConsoleServer();
79      server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/ClientConsole/"));
80
81      ConfigManager manager = ConfigManager.Instance;
82      manager.Core = this;
83
84
85
86      //Register all Wcf Service references
87      wcfService = WcfService.Instance;
88      wcfService.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(wcfService_LoginCompleted);
89      wcfService.SendJobCompleted += new EventHandler<SendJobCompletedEventArgs>(wcfService_SendJobCompleted);
90      wcfService.StoreFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
91      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
92      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
93      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
94      wcfService.Connected += new EventHandler(wcfService_Connected);
95      //Recover Server IP and Port from the Settings Framework
96      ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();
97      if (cc.IPAdress != String.Empty && cc.Port != 0)
98        wcfService.SetIPAndPort(cc.IPAdress, cc.Port);
99
100      if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline())
101        wcfService.Connect();
102
103      //Initialize the heartbeat
104      beat = new Heartbeat { Interval = 10000 };
105      beat.StartHeartbeat();
106
107      MessageQueue queue = MessageQueue.GetInstance();
108
109      //Main processing loop     
110      //Todo: own thread for message handling
111      //Rly?!
112      while (!abortRequested) {
113        MessageContainer container = queue.GetMessage();       
114        DetermineAction(container);
115      }
116      Logger.Info("Program shutdown");
117    }
118
119    /// <summary>
120    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
121    /// </summary>
122    /// <param name="container">The Container, containing the message</param>
123    private void DetermineAction(MessageContainer container) {
124      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);       
125      switch (container.Message) {
126        //Server requests to abort a job
127        case MessageContainer.MessageType.AbortJob:         
128          if (engines.ContainsKey(container.JobId))
129            engines[container.JobId].Abort();
130          else
131            Logger.Error("AbortJob: Engine doesn't exist");
132          break;
133        //Job has been successfully aborted
134
135
136        case MessageContainer.MessageType.JobAborted:         
137        //todo: thread this         
138          lock (engines) {
139            Guid jobId = new Guid(container.JobId.ToString());
140            if (engines.ContainsKey(jobId)) {
141              appDomains[jobId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
142              AppDomain.Unload(appDomains[jobId]);
143              appDomains.Remove(jobId);
144              engines.Remove(jobId);
145              jobs.Remove(jobId);
146              GC.Collect();
147            } else
148              Logger.Error("JobAbort: Engine doesn't exist");
149          }
150          break;
151
152
153        //Request a Snapshot from the Execution Engine
154        case MessageContainer.MessageType.RequestSnapshot:         
155          if (engines.ContainsKey(container.JobId))
156            engines[container.JobId].RequestSnapshot();
157          else
158            Logger.Error("RequestSnapshot: Engine doesn't exist");
159          break;
160
161
162        //Snapshot is ready and can be sent back to the Server
163        case MessageContainer.MessageType.SnapshotReady:         
164          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
165          break;
166
167
168        //Pull a Job from the Server
169        case MessageContainer.MessageType.FetchJob:         
170          if (!CurrentlyFetching) {
171            wcfService.SendJobAsync(ConfigManager.Instance.GetClientInfo().Id);
172            CurrentlyFetching = true;
173          } else
174            Logger.Info("Currently fetching, won't fetch this time!");
175          break;         
176       
177       
178        //A Job has finished and can be sent back to the server
179        case MessageContainer.MessageType.FinishedJob:         
180          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
181          break;
182
183
184        //When the timeslice is up
185        case MessageContainer.MessageType.UptimeLimitDisconnect:
186          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
187
188          //check if there are running jobs
189          if (engines.Count > 0) {
190            //make sure there is no more fetching of jobs while the snapshots get processed
191            CurrentlyFetching = true;
192            //request a snapshot of each running job
193            foreach (KeyValuePair<Guid, Executor> kvp in engines) {
194              kvp.Value.RequestSnapshot();
195            }
196
197          } else {
198            //Disconnect afterwards
199            WcfService.Instance.Disconnect();
200          }
201          break;
202
203          //Fetch or Force Fetch Calendar!
204        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
205          Logger.Info("Fetch Calendar from Server");
206          FetchCalendarFromServer(); 
207        break;
208
209        //Hard shutdown of the client
210        case MessageContainer.MessageType.Shutdown:
211          Logger.Info("Shutdown Signal received");
212          lock (engines) {
213            Logger.Debug("engines locked");
214            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
215              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
216              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
217              AppDomain.Unload(kvp.Value);
218            }
219          }
220          Logger.Debug("Stopping heartbeat");
221          abortRequested = true;
222          beat.StopHeartBeat();
223          Logger.Debug("Logging out");
224          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
225          break;
226      }
227    }
228
229    //Asynchronous Threads for interaction with the Execution Engine
230    #region Async Threads for the EE
231
232    /// <summary>
233    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
234    /// once the connection gets reestablished, the job gets submitted
235    /// </summary>
236    /// <param name="jobId"></param>
237    private void GetFinishedJob(object jobId) {
238      Guid jId = (Guid)jobId;
239      Logger.Info("Getting the finished job with id: " + jId);
240      try {
241        if (!engines.ContainsKey(jId)) {
242          Logger.Info("Engine doesn't exist");
243          return;
244        }
245
246        byte[] sJob = engines[jId].GetFinishedJob();
247
248        if (WcfService.Instance.ConnState == NetworkEnum.WcfConnState.Loggedin) {
249          Logger.Info("Sending the finished job with id: " + jId);
250          wcfService.StoreFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id,
251            jId,
252            sJob,
253            1,
254            null,
255            true);
256        } else {
257          Logger.Info("Storing the finished job with id: " + jId + " to hdd");
258          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
259          lock (engines) {
260            appDomains[jId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
261            AppDomain.Unload(appDomains[jId]);
262            Logger.Debug("Unloaded appdomain");
263            appDomains.Remove(jId);           
264            engines.Remove(jId);           
265            jobs.Remove(jId);
266            Logger.Debug("Removed job from appDomains, Engines and Jobs");
267          }
268        }
269      }
270      catch (InvalidStateException ise) {
271        Logger.Error("Invalid State while Snapshoting:", ise);
272      }
273    }
274
275    private void GetSnapshot(object jobId) {
276      Logger.Info("Fetching a snapshot for job " + jobId);
277      Guid jId = (Guid)jobId;
278      byte[] obj = engines[jId].GetSnapshot();
279      Logger.Debug("BEGIN: Sending snapshot sync");
280      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
281        jId,
282        obj,
283        engines[jId].Progress,
284        null);
285      Logger.Debug("END: Sended snapshot sync");
286      //Uptime Limit reached, now is a good time to destroy this jobs.
287      Logger.Debug("Checking if uptime limit is reached");
288      if (!UptimeManager.Instance.IsOnline()) {
289        Logger.Debug("Uptime limit reached");
290        Logger.Debug("Killing Appdomain");
291        KillAppDomain(jId);
292        //Still anything running? 
293        if (engines.Count == 0) {
294          Logger.Info("All jobs snapshotted and sent back, disconnecting");         
295          WcfService.Instance.Disconnect();
296        } else {
297          Logger.Debug("There are still active Jobs in the Field, not disconnecting");
298        }
299
300      } else {
301        Logger.Debug("Restarting the job" + jobId);
302        engines[jId].StartOnlyJob();
303        Logger.Info("Restarted the job" + jobId);
304      }
305    }
306
307    #endregion
308
309    //Eventhandlers for the communication with the wcf Layer
310    #region wcfService Events
311    /// <summary>
312    /// Login has returned
313    /// </summary>
314    /// <param name="sender"></param>
315    /// <param name="e"></param>
316    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
317      if (e.Result.Success) {
318        CurrentlyFetching = false;
319        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
320      } else
321        Logger.Error("Error during login: " + e.Result.StatusMessage);
322    }
323
324    /// <summary>
325    /// A new Job from the wcfService has been received and will be started within a AppDomain.
326    /// </summary>
327    /// <param name="sender"></param>
328    /// <param name="e"></param>
329    void wcfService_SendJobCompleted(object sender, SendJobCompletedEventArgs e) {
330      if (e.Result.StatusMessage != ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT) {
331        Logger.Info("Received new job with id " + e.Result.Job.Id);     
332        bool sandboxed = false;
333        List<byte[]> files = new List<byte[]>();
334        Logger.Debug("Fetching plugins for job " + e.Result.Job.Id);
335        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
336          files.AddRange(plugininfo.PluginFiles);
337        Logger.Debug("Plugins fetched for job " + e.Result.Job.Id);
338        AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(e.Result.Job.Id.ToString(), files);
339        appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
340        lock (engines) {
341          if (!jobs.ContainsKey(e.Result.Job.Id)) {
342            jobs.Add(e.Result.Job.Id, e.Result.Job);
343            appDomains.Add(e.Result.Job.Id, appDomain);
344            Logger.Debug("Creating AppDomain");
345            Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
346            Logger.Debug("Created AppDomain");
347            engine.JobId = e.Result.Job.Id;
348            engine.Queue = MessageQueue.GetInstance();
349            Logger.Debug("Starting Engine for job " + e.Result.Job.Id);
350            engine.Start(e.Data);
351            engines.Add(e.Result.Job.Id, engine);
352
353            ClientStatusInfo.JobsFetched++;
354
355            Logger.Info("Increment FetchedJobs to:" + ClientStatusInfo.JobsFetched);
356          }
357        }
358      } else
359        Logger.Info("No more jobs left!");
360      CurrentlyFetching = false;
361    }
362
363    /// <summary>
364    /// A finished job has been stored on the server
365    /// </summary>
366    /// <param name="sender"></param>
367    /// <param name="e"></param>
368    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
369      Logger.Info("Job submitted with id " + e.Result.JobId);
370      KillAppDomain(e.Result.JobId);
371      if (e.Result.Success) {
372        ClientStatusInfo.JobsProcessed++;
373        Logger.Info("Increased ProcessedJobs to:" + ClientStatusInfo.JobsProcessed);
374      } else {
375        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
376      }
377    }
378
379    /// <summary>
380    /// A snapshot has been stored on the server
381    /// </summary>
382    /// <param name="sender"></param>
383    /// <param name="e"></param>
384    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
385      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
386    }
387
388    /// <summary>
389    /// The server has been changed. All Appdomains and Jobs must be aborted!
390    /// </summary>
391    /// <param name="sender"></param>
392    /// <param name="e"></param>
393    void wcfService_ServerChanged(object sender, EventArgs e) {
394      Logger.Info("ServerChanged has been called");
395      lock (engines) {
396        foreach (KeyValuePair<Guid, Executor> entries in engines) {
397          engines[entries.Key].Abort();
398          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
399          //AppDomain.Unload(appDomains[entries.Key]);
400        }
401        //appDomains = new Dictionary<Guid, AppDomain>();
402        //engines = new Dictionary<Guid, Executor>();
403        //jobs = new Dictionary<Guid, Job>();
404      }
405    }
406
407    /// <summary>
408    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
409    /// </summary>
410    /// <param name="sender"></param>
411    /// <param name="e"></param>
412    void wcfService_Connected(object sender, EventArgs e) {
413      Logger.Info("WCF Service got a connection");
414      if (!UptimeManager.Instance.CalendarAvailable) {
415        Logger.Info("No local calendar available, fetch it");
416        FetchCalendarFromServer();
417      }
418      //if the fetching from the server failed - still set the client online... maybe we get
419      //a result within the next few heartbeats     
420      if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
421        Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsOnline());
422        Logger.Info("Setting client online");
423        wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
424        JobStorageManager.CheckAndSubmitJobsFromDisc();
425        CurrentlyFetching = false;
426      }
427    }
428
429    private void FetchCalendarFromServer() {
430      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
431      if(calres.Success) {
432        if (UptimeManager.Instance.SetAppointments(false, calres)) {
433          Logger.Info("Remote calendar installed");
434          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
435        } else {
436          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
437          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
438        }
439      } else {
440        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
441        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
442      }
443    }
444
445    //this is a little bit tricky -
446    void wcfService_ConnectionRestored(object sender, EventArgs e) {
447      Logger.Info("Reconnected to old server - checking currently running appdomains");
448
449      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
450        if (!execKVP.Value.Running && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
451          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
452          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
453          finThread.Start(execKVP.Value.JobId);
454        }
455      }
456    }
457
458    #endregion
459
460    public Dictionary<Guid, Executor> GetExecutionEngines() {
461      return engines;
462    }
463
464    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
465      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());   
466    }
467
468    internal Dictionary<Guid, JobDto> GetJobs() {
469      return jobs;
470    }
471
472    /// <summary>
473    /// Kill a appdomain with a specific id.
474    /// </summary>
475    /// <param name="id">the GUID of the job</param>
476    private void KillAppDomain(Guid id) {
477      Logger.Debug("Shutting down Appdomain for Job " + id);
478      lock (engines) {
479        try {
480          appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
481          AppDomain.Unload(appDomains[id]);
482          appDomains.Remove(id);
483          engines.Remove(id);
484          jobs.Remove(id);
485        }
486        catch (Exception ex) {
487          Logger.Error("Exception when unloading the appdomain: ", ex);
488        }
489      }
490      GC.Collect();
491    }
492  }
493}
Note: See TracBrowser for help on using the repository browser.