Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Client.Core/3.3/Core.cs @ 4119

Last change on this file since 4119 was 4119, checked in by cneumuel, 14 years ago

HiveExperiment is now able to send IOptimizers of an Experiment and receive the calculated result (#1115)

File size: 20.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Client.ExecutionEngine;
27using HeuristicLab.Hive.Client.Common;
28using System.Threading;
29using System.Reflection;
30using System.Diagnostics;
31using System.Security.Permissions;
32using System.Security.Policy;
33using System.Security;
34using HeuristicLab.Hive.Client.Communication;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Hive.Contracts;
37using System.Runtime.Remoting.Messaging;
38using HeuristicLab.PluginInfrastructure;
39using System.ServiceModel;
40using System.ServiceModel.Description;
41using HeuristicLab.Hive.Client.Core.ClientConsoleService;
42using HeuristicLab.Hive.Client.Core.ConfigurationManager;
43using HeuristicLab.Hive.Client.Communication.ServerService;
44using HeuristicLab.Hive.JobBase;
45using HeuristicLab.Hive.Client.Core.JobStorage;
46using HeuristicLab.Tracing;
47using HeuristicLab.Core;
48
49namespace HeuristicLab.Hive.Client.Core {
50  /// <summary>
51  /// The core component of the Hive Client
52  /// </summary>
53  public class Core : MarshalByRefObject {
54    public static bool abortRequested { get; set; }
55
56    private bool _currentlyFetching;
57    private bool CurrentlyFetching {
58      get {
59        return _currentlyFetching;
60      }
61      set {
62        _currentlyFetching = value;
63        Logger.Debug("Set CurrentlyFetching to " + _currentlyFetching);
64      }
65    }
66
67    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
68    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
69    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
70
71    private WcfService wcfService;
72    private Heartbeat beat;
73
74    /// <summary>
75    /// Main Method for the client
76    /// </summary>
77    public void Start() {
78      abortRequested = false;
79      Logger.Info("Hive Client started");
80      ClientConsoleServer server = new ClientConsoleServer();
81      server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/ClientConsole/"));
82
83      ConfigManager manager = ConfigManager.Instance;
84      manager.Core = this;
85
86
87
88      //Register all Wcf Service references
89      wcfService = WcfService.Instance;
90      wcfService.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(wcfService_LoginCompleted);
91      wcfService.SendJobCompleted += new EventHandler<SendJobCompletedEventArgs>(wcfService_SendJobCompleted);
92      wcfService.StoreFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
93      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
94      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
95      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
96      wcfService.Connected += new EventHandler(wcfService_Connected);
97      //Recover Server IP and Port from the Settings Framework
98      ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();
99      if (cc.IPAdress != String.Empty && cc.Port != 0)
100        wcfService.SetIPAndPort(cc.IPAdress, cc.Port);
101
102      //Initialize the heartbeat
103      beat = new Heartbeat { Interval = 10000 };
104      beat.StartHeartbeat();
105
106      MessageQueue queue = MessageQueue.GetInstance();
107
108      //Main processing loop     
109      //Todo: own thread for message handling
110      //Rly?!
111      while (!abortRequested) {
112        MessageContainer container = queue.GetMessage();
113        DetermineAction(container);
114      }
115      Logger.Info("Program shutdown");
116    }
117
118    /// <summary>
119    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
120    /// </summary>
121    /// <param name="container">The Container, containing the message</param>
122    private void DetermineAction(MessageContainer container) {
123      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
124      switch (container.Message) {
125        //Server requests to abort a job
126        case MessageContainer.MessageType.AbortJob:
127          if (engines.ContainsKey(container.JobId))
128            engines[container.JobId].Abort();
129          else
130            Logger.Error("AbortJob: Engine doesn't exist");
131          break;
132        //Job has been successfully aborted
133
134
135        case MessageContainer.MessageType.JobAborted:
136          //todo: thread this         
137          lock (engines) {
138            Guid jobId = new Guid(container.JobId.ToString());
139            if (engines.ContainsKey(jobId)) {
140              appDomains[jobId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
141              AppDomain.Unload(appDomains[jobId]);
142              appDomains.Remove(jobId);
143              engines.Remove(jobId);
144              jobs.Remove(jobId);
145              GC.Collect();
146            } else
147              Logger.Error("JobAbort: Engine doesn't exist");
148          }
149          break;
150
151
152        //Request a Snapshot from the Execution Engine
153        case MessageContainer.MessageType.RequestSnapshot:
154          if (engines.ContainsKey(container.JobId))
155            engines[container.JobId].RequestSnapshot();
156          else
157            Logger.Error("RequestSnapshot: Engine doesn't exist");
158          break;
159
160
161        //Snapshot is ready and can be sent back to the Server
162        case MessageContainer.MessageType.SnapshotReady:
163          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
164          break;
165
166
167        //Pull a Job from the Server
168        case MessageContainer.MessageType.FetchJob:
169          if (!CurrentlyFetching) {
170            wcfService.SendJobAsync(ConfigManager.Instance.GetClientInfo().Id);
171            CurrentlyFetching = true;
172          } else
173            Logger.Info("Currently fetching, won't fetch this time!");
174          break;
175
176
177        //A Job has finished and can be sent back to the server
178        case MessageContainer.MessageType.FinishedJob:
179          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
180          break;
181
182
183        //When the timeslice is up
184        case MessageContainer.MessageType.UptimeLimitDisconnect:
185          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
186
187          //check if there are running jobs
188          if (engines.Count > 0) {
189            //make sure there is no more fetching of jobs while the snapshots get processed
190            CurrentlyFetching = true;
191            //request a snapshot of each running job
192            foreach (KeyValuePair<Guid, Executor> kvp in engines) {
193              kvp.Value.RequestSnapshot();
194            }
195
196          } else {
197            //Disconnect afterwards
198            WcfService.Instance.Disconnect();
199          }
200          break;
201
202        //Fetch or Force Fetch Calendar!
203        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
204          Logger.Info("Fetch Calendar from Server");
205          FetchCalendarFromServer();
206          break;
207
208        //Hard shutdown of the client
209        case MessageContainer.MessageType.Shutdown:
210          Logger.Info("Shutdown Signal received");
211          lock (engines) {
212            Logger.Debug("engines locked");
213            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
214              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
215              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
216              AppDomain.Unload(kvp.Value);
217            }
218          }
219          Logger.Debug("Stopping heartbeat");
220          abortRequested = true;
221          beat.StopHeartBeat();
222          Logger.Debug("Logging out");
223          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
224          break;
225      }
226    }
227
228    //Asynchronous Threads for interaction with the Execution Engine
229    #region Async Threads for the EE
230
231    /// <summary>
232    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
233    /// once the connection gets reestablished, the job gets submitted
234    /// </summary>
235    /// <param name="jobId"></param>
236    private void GetFinishedJob(object jobId) {
237      Guid jId = (Guid)jobId;
238      Logger.Info("Getting the finished job with id: " + jId);
239      try {
240        if (!engines.ContainsKey(jId)) {
241          Logger.Info("Engine doesn't exist");
242          return;
243        }
244
245        byte[] sJob = engines[jId].GetFinishedJob();
246
247        if (WcfService.Instance.ConnState == NetworkEnum.WcfConnState.Loggedin) {
248          Logger.Info("Sending the finished job with id: " + jId);
249          wcfService.StoreFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, 1.0, null, true);
250        } else {
251          Logger.Info("Storing the finished job with id: " + jId + " to hdd");
252          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
253          lock (engines) {
254            appDomains[jId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
255
256            //Disposing it
257            engines[jId].Dispose();
258
259            AppDomain.Unload(appDomains[jId]);
260            Logger.Debug("Unloaded appdomain");
261            appDomains.Remove(jId);
262            engines.Remove(jId);
263            jobs.Remove(jId);
264            Logger.Debug("Removed job from appDomains, Engines and Jobs");
265          }
266        }
267      } catch (InvalidStateException ise) {
268        Logger.Error("Invalid State while Snapshoting:", ise);
269      }
270    }
271
272    private void GetSnapshot(object jobId) {
273      Logger.Info("Fetching a snapshot for job " + jobId);
274      Guid jId = (Guid)jobId;
275      byte[] obj = engines[jId].GetSnapshot();
276      Logger.Debug("BEGIN: Sending snapshot sync");
277      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
278        jId,
279        obj,
280        engines[jId].Progress,
281        null);
282      Logger.Debug("END: Sended snapshot sync");
283      //Uptime Limit reached, now is a good time to destroy this jobs.
284      Logger.Debug("Checking if uptime limit is reached");
285      if (!UptimeManager.Instance.IsOnline()) {
286        Logger.Debug("Uptime limit reached");
287        Logger.Debug("Killing Appdomain");
288        KillAppDomain(jId);
289        //Still anything running? 
290        if (engines.Count == 0) {
291          Logger.Info("All jobs snapshotted and sent back, disconnecting");
292          WcfService.Instance.Disconnect();
293        } else {
294          Logger.Debug("There are still active Jobs in the Field, not disconnecting");
295        }
296
297      } else {
298        Logger.Debug("Restarting the job" + jobId);
299        engines[jId].StartOnlyJob();
300        Logger.Info("Restarted the job" + jobId);
301      }
302    }
303
304    #endregion
305
306    //Eventhandlers for the communication with the wcf Layer
307    #region wcfService Events
308    /// <summary>
309    /// Login has returned
310    /// </summary>
311    /// <param name="sender"></param>
312    /// <param name="e"></param>
313    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
314      if (e.Result.Success) {
315        CurrentlyFetching = false;
316        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
317      } else
318        Logger.Error("Error during login: " + e.Result.StatusMessage);
319    }
320
321    /// <summary>
322    /// A new Job from the wcfService has been received and will be started within a AppDomain.
323    /// </summary>
324    /// <param name="sender"></param>
325    /// <param name="e"></param>
326    void wcfService_SendJobCompleted(object sender, SendJobCompletedEventArgs e) {
327      if (e.Result.StatusMessage != ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT) {
328        Logger.Info("Received new job with id " + e.Result.Job.Id);
329        bool sandboxed = false;
330        Logger.Debug("Fetching plugins for job " + e.Result.Job.Id);
331
332        PluginCache.Instance.PreparePlugins(e.Result.Job.PluginsNeeded);
333
334        //        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
335        //        files.AddRange(plugininfo.PluginFiles);
336        Logger.Debug("Plugins fetched for job " + e.Result.Job.Id);
337        try {
338          AppDomain appDomain =
339            HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(
340              e.Result.Job.Id.ToString(), null);
341          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
342          lock (engines) {
343            if (!jobs.ContainsKey(e.Result.Job.Id)) {
344              jobs.Add(e.Result.Job.Id, e.Result.Job);
345              appDomains.Add(e.Result.Job.Id, appDomain);
346              Logger.Debug("Creating AppDomain");
347              Executor engine =
348                (Executor)
349                appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
350              Logger.Debug("Created AppDomain");
351              engine.JobId = e.Result.Job.Id;
352              engine.Queue = MessageQueue.GetInstance();
353              Logger.Debug("Starting Engine for job " + e.Result.Job.Id);
354              engine.Start(e.Data);
355              engines.Add(e.Result.Job.Id, engine);
356
357              ClientStatusInfo.JobsFetched++;
358
359              Logger.Info("Increment FetchedJobs to:" + ClientStatusInfo.JobsFetched);
360            }
361          }
362        } catch (Exception exception) {
363          Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Job.Id);
364          Logger.Error("Error thrown is: ", exception);
365          CurrentlyFetching = false;
366          KillAppDomain(e.Result.Job.Id);
367        }
368      } else
369        Logger.Info("No more jobs left!");
370      CurrentlyFetching = false;
371    }
372
373    /// <summary>
374    /// A finished job has been stored on the server
375    /// </summary>
376    /// <param name="sender"></param>
377    /// <param name="e"></param>
378    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
379      Logger.Info("Job submitted with id " + e.Result.JobId);
380      KillAppDomain(e.Result.JobId);
381      if (e.Result.Success) {
382        ClientStatusInfo.JobsProcessed++;
383        Logger.Info("Increased ProcessedJobs to:" + ClientStatusInfo.JobsProcessed);
384      } else {
385        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
386      }
387    }
388
389    /// <summary>
390    /// A snapshot has been stored on the server
391    /// </summary>
392    /// <param name="sender"></param>
393    /// <param name="e"></param>
394    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
395      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
396    }
397
398    /// <summary>
399    /// The server has been changed. All Appdomains and Jobs must be aborted!
400    /// </summary>
401    /// <param name="sender"></param>
402    /// <param name="e"></param>
403    void wcfService_ServerChanged(object sender, EventArgs e) {
404      Logger.Info("ServerChanged has been called");
405      lock (engines) {
406        foreach (KeyValuePair<Guid, Executor> entries in engines) {
407          engines[entries.Key].Abort();
408          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
409          //AppDomain.Unload(appDomains[entries.Key]);
410        }
411        //appDomains = new Dictionary<Guid, AppDomain>();
412        //engines = new Dictionary<Guid, Executor>();
413        //jobs = new Dictionary<Guid, Job>();
414      }
415    }
416
417    /// <summary>
418    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
419    /// </summary>
420    /// <param name="sender"></param>
421    /// <param name="e"></param>
422    void wcfService_Connected(object sender, EventArgs e) {
423      Logger.Info("WCF Service got a connection");
424      if (!UptimeManager.Instance.CalendarAvailable) {
425        Logger.Info("No local calendar available, fetch it");
426        FetchCalendarFromServer();
427      }
428      //if the fetching from the server failed - still set the client online... maybe we get
429      //a result within the next few heartbeats     
430      if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
431        Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsOnline());
432        Logger.Info("Setting client online");
433        wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
434        JobStorageManager.CheckAndSubmitJobsFromDisc();
435        CurrentlyFetching = false;
436      }
437    }
438
439    private void FetchCalendarFromServer() {
440      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
441      if (calres.Success) {
442        if (UptimeManager.Instance.SetAppointments(false, calres)) {
443          Logger.Info("Remote calendar installed");
444          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
445        } else {
446          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
447          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
448        }
449      } else {
450        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
451        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
452      }
453    }
454
455    //this is a little bit tricky -
456    void wcfService_ConnectionRestored(object sender, EventArgs e) {
457      Logger.Info("Reconnected to old server - checking currently running appdomains");
458
459      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
460        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
461          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
462          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
463          finThread.Start(execKVP.Value.JobId);
464        }
465      }
466    }
467
468    #endregion
469
470    public Dictionary<Guid, Executor> GetExecutionEngines() {
471      return engines;
472    }
473
474    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
475      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
476    }
477
478    internal Dictionary<Guid, JobDto> GetJobs() {
479      return jobs;
480    }
481
482    /// <summary>
483    /// Kill a appdomain with a specific id.
484    /// </summary>
485    /// <param name="id">the GUID of the job</param>
486    private void KillAppDomain(Guid id) {
487      Logger.Debug("Shutting down Appdomain for Job " + id);
488      lock (engines) {
489        try {
490          if (engines.ContainsKey(id))
491            engines[id].Dispose();
492          if (appDomains.ContainsKey(id)) {
493            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
494            AppDomain.Unload(appDomains[id]);
495            appDomains.Remove(id);
496          }
497          engines.Remove(id);
498          jobs.Remove(id);
499          GC.Collect();
500        } catch (Exception ex) {
501          Logger.Error("Exception when unloading the appdomain: ", ex);
502        }
503      }
504      GC.Collect();
505    }
506  }
507
508
509}
Note: See TracBrowser for help on using the repository browser.