Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Client.Core/3.3/Core.cs @ 4171

Last change on this file since 4171 was 4141, checked in by cneumuel, 14 years ago

merged with changes from Hive-3.2

File size: 19.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Client.ExecutionEngine;
27using HeuristicLab.Hive.Client.Common;
28using System.Threading;
29using System.Reflection;
30using System.Diagnostics;
31using System.Security.Permissions;
32using System.Security.Policy;
33using System.Security;
34using HeuristicLab.Hive.Client.Communication;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Hive.Contracts;
37using System.Runtime.Remoting.Messaging;
38using HeuristicLab.PluginInfrastructure;
39using System.ServiceModel;
40using System.ServiceModel.Description;
41using HeuristicLab.Hive.Client.Core.ClientConsoleService;
42using HeuristicLab.Hive.Client.Core.ConfigurationManager;
43using HeuristicLab.Hive.Client.Communication.ServerService;
44using HeuristicLab.Hive.JobBase;
45using HeuristicLab.Hive.Client.Core.JobStorage;
46using HeuristicLab.Tracing;
47using HeuristicLab.Core;
48using System.IO;
49
50namespace HeuristicLab.Hive.Client.Core {
51  /// <summary>
52  /// The core component of the Hive Client
53  /// </summary>
54  public class Core : MarshalByRefObject {
55    public static bool abortRequested { get; set; }
56
57    private bool _currentlyFetching;
58    private bool CurrentlyFetching {
59      get {
60        return _currentlyFetching;
61      }
62      set {
63        _currentlyFetching = value;
64        Logger.Debug("Set CurrentlyFetching to " + _currentlyFetching);
65      }
66    }
67
68    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
69    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
70    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
71
72    private WcfService wcfService;
73    private Heartbeat beat;
74
75    /// <summary>
76    /// Main Method for the client
77    /// </summary>
78    public void Start() {
79      abortRequested = false;
80      Logger.Info("Hive Client started");
81      ClientConsoleServer server = new ClientConsoleServer();
82      server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/ClientConsole/"));
83
84      ConfigManager manager = ConfigManager.Instance;
85      manager.Core = this;
86
87
88
89      //Register all Wcf Service references
90      wcfService = WcfService.Instance;
91      wcfService.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(wcfService_LoginCompleted);
92      wcfService.SendJobCompleted += new EventHandler<SendJobCompletedEventArgs>(wcfService_SendJobCompleted);
93      wcfService.StoreFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
94      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
95      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
96      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
97      wcfService.Connected += new EventHandler(wcfService_Connected);
98      //Recover Server IP and Port from the Settings Framework
99      ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();
100      if (cc.IPAdress != String.Empty && cc.Port != 0)
101        wcfService.SetIPAndPort(cc.IPAdress, cc.Port);
102
103      //Initialize the heartbeat
104      beat = new Heartbeat { Interval = 10000 };
105      beat.StartHeartbeat();
106
107      MessageQueue queue = MessageQueue.GetInstance();
108
109      //Main processing loop     
110      //Todo: own thread for message handling
111      //Rly?!
112      while (!abortRequested) {
113        MessageContainer container = queue.GetMessage();
114        DetermineAction(container);
115      }
116      Logger.Info("Program shutdown");
117    }
118
119    /// <summary>
120    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
121    /// </summary>
122    /// <param name="container">The Container, containing the message</param>
123    private void DetermineAction(MessageContainer container) {
124      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
125      switch (container.Message) {
126        //Server requests to abort a job
127        case MessageContainer.MessageType.AbortJob:
128          if (engines.ContainsKey(container.JobId))
129            engines[container.JobId].Abort();
130          else
131            Logger.Error("AbortJob: Engine doesn't exist");
132          break;
133        //Job has been successfully aborted
134
135
136        case MessageContainer.MessageType.JobAborted:
137          Guid jobId = new Guid(container.JobId.ToString());
138          KillAppDomain(jobId);
139          break;
140
141
142        //Request a Snapshot from the Execution Engine
143        case MessageContainer.MessageType.RequestSnapshot:
144          if (engines.ContainsKey(container.JobId))
145            engines[container.JobId].RequestSnapshot();
146          else
147            Logger.Error("RequestSnapshot: Engine doesn't exist");
148          break;
149
150
151        //Snapshot is ready and can be sent back to the Server
152        case MessageContainer.MessageType.SnapshotReady:
153          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
154          break;
155
156
157        //Pull a Job from the Server
158        case MessageContainer.MessageType.FetchJob:
159          if (!CurrentlyFetching) {
160            wcfService.SendJobAsync(ConfigManager.Instance.GetClientInfo().Id);
161            CurrentlyFetching = true;
162          } else
163            Logger.Info("Currently fetching, won't fetch this time!");
164          break;
165
166
167        //A Job has finished and can be sent back to the server
168        case MessageContainer.MessageType.FinishedJob:
169          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
170          break;
171
172
173        //When the timeslice is up
174        case MessageContainer.MessageType.UptimeLimitDisconnect:
175          Logger.Info("Uptime Limit reached, storing jobs and sending them back");
176
177          //check if there are running jobs
178          if (engines.Count > 0) {
179            //make sure there is no more fetching of jobs while the snapshots get processed
180            CurrentlyFetching = true;
181            //request a snapshot of each running job
182            foreach (KeyValuePair<Guid, Executor> kvp in engines) {
183              kvp.Value.RequestSnapshot();
184            }
185
186          } else {
187            //Disconnect afterwards
188            WcfService.Instance.Disconnect();
189          }
190          break;
191
192        //Fetch or Force Fetch Calendar!
193        case MessageContainer.MessageType.FetchOrForceFetchCalendar:
194          Logger.Info("Fetch Calendar from Server");
195          FetchCalendarFromServer();
196          break;
197
198        //Hard shutdown of the client
199        case MessageContainer.MessageType.Shutdown:
200          Logger.Info("Shutdown Signal received");
201          lock (engines) {
202            Logger.Debug("engines locked");
203            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
204              Logger.Debug("Shutting down Appdomain for " + kvp.Key);
205              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
206              AppDomain.Unload(kvp.Value);
207            }
208          }
209          Logger.Debug("Stopping heartbeat");
210          abortRequested = true;
211          beat.StopHeartBeat();
212          Logger.Debug("Logging out");
213          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
214          break;
215      }
216    }
217
218    //Asynchronous Threads for interaction with the Execution Engine
219    #region Async Threads for the EE
220
221    /// <summary>
222    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
223    /// once the connection gets reestablished, the job gets submitted
224    /// </summary>
225    /// <param name="jobId"></param>
226    private void GetFinishedJob(object jobId) {
227      Guid jId = (Guid)jobId;
228      Logger.Info("Getting the finished job with id: " + jId);
229      try {
230        if (!engines.ContainsKey(jId)) {
231          Logger.Info("Engine doesn't exist");
232          return;
233        }
234
235        byte[] sJob = engines[jId].GetFinishedJob();
236
237        if (WcfService.Instance.ConnState == NetworkEnum.WcfConnState.Loggedin) {
238          Logger.Info("Sending the finished job with id: " + jId);
239          wcfService.StoreFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, 1.0, engines[jId].CurrentException, true);
240        } else {
241          Logger.Info("Storing the finished job with id: " + jId + " to hdd");
242          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
243          KillAppDomain(jId);
244        }
245      } catch (InvalidStateException ise) {
246        Logger.Error("Invalid State while Snapshoting:", ise);
247      }
248    }
249
250    private void GetSnapshot(object jobId) {
251      Logger.Info("Fetching a snapshot for job " + jobId);
252      Guid jId = (Guid)jobId;
253      byte[] obj = engines[jId].GetSnapshot();
254      Logger.Debug("BEGIN: Sending snapshot sync");
255      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
256        jId,
257        obj,
258        engines[jId].Progress,
259        null);
260      Logger.Debug("END: Sended snapshot sync");
261      //Uptime Limit reached, now is a good time to destroy this jobs.
262      Logger.Debug("Checking if uptime limit is reached");
263      if (!UptimeManager.Instance.IsOnline()) {
264        Logger.Debug("Uptime limit reached");
265        Logger.Debug("Killing Appdomain");
266        KillAppDomain(jId);
267        //Still anything running? 
268        if (engines.Count == 0) {
269          Logger.Info("All jobs snapshotted and sent back, disconnecting");
270          WcfService.Instance.Disconnect();
271        } else {
272          Logger.Debug("There are still active Jobs in the Field, not disconnecting");
273        }
274
275      } else {
276        Logger.Debug("Restarting the job" + jobId);
277        engines[jId].StartOnlyJob();
278        Logger.Info("Restarted the job" + jobId);
279      }
280    }
281
282    #endregion
283
284    //Eventhandlers for the communication with the wcf Layer
285    #region wcfService Events
286    /// <summary>
287    /// Login has returned
288    /// </summary>
289    /// <param name="sender"></param>
290    /// <param name="e"></param>
291    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
292      if (e.Result.Success) {
293        CurrentlyFetching = false;
294        Logger.Info("Login completed to Hive Server @ " + DateTime.Now);
295      } else
296        Logger.Error("Error during login: " + e.Result.StatusMessage);
297    }
298
299    /// <summary>
300    /// A new Job from the wcfService has been received and will be started within a AppDomain.
301    /// </summary>
302    /// <param name="sender"></param>
303    /// <param name="e"></param>
304    void wcfService_SendJobCompleted(object sender, SendJobCompletedEventArgs e) {
305      if (e.Result.StatusMessage != ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT) {
306        Logger.Info("Received new job with id " + e.Result.Job.Id);
307        bool sandboxed = false;
308        Logger.Debug("Fetching plugins for job " + e.Result.Job.Id);
309
310        PluginCache.Instance.PreparePlugins(e.Result.Job.PluginsNeeded);
311
312        PluginCache.Instance.CopyPluginsForJob(e.Result.Job.PluginsNeeded, e.Result.Job.Id);
313
314        //        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
315        //        files.AddRange(plugininfo.PluginFiles);
316        Logger.Debug("Plugins fetched for job " + e.Result.Job.Id);
317        try {
318          String pluginDir = Path.Combine(PluginCache.PLUGIN_REPO, e.Result.Job.Id.ToString());
319         
320          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(pluginDir, null);
321          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
322          lock (engines) {
323            if (!jobs.ContainsKey(e.Result.Job.Id)) {
324              jobs.Add(e.Result.Job.Id, e.Result.Job);
325              appDomains.Add(e.Result.Job.Id, appDomain);
326              Logger.Debug("Creating AppDomain");
327              Executor engine =
328                (Executor)
329                appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
330              Logger.Debug("Created AppDomain");
331              engine.JobId = e.Result.Job.Id;
332              engine.Queue = MessageQueue.GetInstance();
333              Logger.Debug("Starting Engine for job " + e.Result.Job.Id);
334              engine.Start(e.Data);
335              engines.Add(e.Result.Job.Id, engine);
336
337              ClientStatusInfo.JobsFetched++;
338
339              Logger.Info("Increment FetchedJobs to:" + ClientStatusInfo.JobsFetched);
340            }
341          }
342        } catch (Exception exception) {
343          Logger.Error("Creating the Appdomain and loading the job failed for job " + e.Result.Job.Id);
344          Logger.Error("Error thrown is: ", exception);
345          CurrentlyFetching = false;
346          KillAppDomain(e.Result.Job.Id);
347          wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Job.Id, new byte[] { }, 1, exception, true);
348        }
349      } else
350        Logger.Info("No more jobs left!");
351      CurrentlyFetching = false;
352    }
353
354    /// <summary>
355    /// A finished job has been stored on the server
356    /// </summary>
357    /// <param name="sender"></param>
358    /// <param name="e"></param>
359    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
360      Logger.Info("Job submitted with id " + e.Result.JobId);
361      KillAppDomain(e.Result.JobId);
362      if (e.Result.Success) {
363        ClientStatusInfo.JobsProcessed++;
364        Logger.Info("Increased ProcessedJobs to:" + ClientStatusInfo.JobsProcessed);
365      } else {
366        Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
367      }
368    }
369
370    /// <summary>
371    /// A snapshot has been stored on the server
372    /// </summary>
373    /// <param name="sender"></param>
374    /// <param name="e"></param>
375    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
376      Logger.Info("Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
377    }
378
379    /// <summary>
380    /// The server has been changed. All Appdomains and Jobs must be aborted!
381    /// </summary>
382    /// <param name="sender"></param>
383    /// <param name="e"></param>
384    void wcfService_ServerChanged(object sender, EventArgs e) {
385      Logger.Info("ServerChanged has been called");
386      lock (engines) {
387        foreach (KeyValuePair<Guid, Executor> entries in engines) {
388          engines[entries.Key].Abort();
389          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
390          //AppDomain.Unload(appDomains[entries.Key]);
391        }
392        //appDomains = new Dictionary<Guid, AppDomain>();
393        //engines = new Dictionary<Guid, Executor>();
394        //jobs = new Dictionary<Guid, Job>();
395      }
396    }
397
398    /// <summary>
399    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
400    /// </summary>
401    /// <param name="sender"></param>
402    /// <param name="e"></param>
403    void wcfService_Connected(object sender, EventArgs e) {
404      Logger.Info("WCF Service got a connection");
405      if (!UptimeManager.Instance.CalendarAvailable) {
406        Logger.Info("No local calendar available, fetch it");
407        FetchCalendarFromServer();
408      }
409      //if the fetching from the server failed - still set the client online... maybe we get
410      //a result within the next few heartbeats     
411      if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {
412        Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsOnline());
413        Logger.Info("Setting client online");
414        wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
415        JobStorageManager.CheckAndSubmitJobsFromDisc();
416        CurrentlyFetching = false;
417      }
418    }
419
420    private void FetchCalendarFromServer() {
421      ResponseCalendar calres = wcfService.GetCalendarSync(ConfigManager.Instance.GetClientInfo().Id);
422      if (calres.Success) {
423        if (UptimeManager.Instance.SetAppointments(false, calres)) {
424          Logger.Info("Remote calendar installed");
425          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.Fetched);
426        } else {
427          Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
428          wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
429        }
430      } else {
431        Logger.Info("Remote calendar installation failed, setting state to " + CalendarState.NotAllowedToFetch);
432        wcfService.SetCalendarStatus(ConfigManager.Instance.GetClientInfo().Id, CalendarState.NotAllowedToFetch);
433      }
434    }
435
436    //this is a little bit tricky -
437    void wcfService_ConnectionRestored(object sender, EventArgs e) {
438      Logger.Info("Reconnected to old server - checking currently running appdomains");
439
440      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
441        if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
442          Logger.Info("Checking for JobId: " + execKVP.Value.JobId);
443          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
444          finThread.Start(execKVP.Value.JobId);
445        }
446      }
447    }
448
449    #endregion
450
451    public Dictionary<Guid, Executor> GetExecutionEngines() {
452      return engines;
453    }
454
455    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
456      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
457    }
458
459    internal Dictionary<Guid, JobDto> GetJobs() {
460      return jobs;
461    }
462
463    /// <summary>
464    /// Kill a appdomain with a specific id.
465    /// </summary>
466    /// <param name="id">the GUID of the job</param>
467    private void KillAppDomain(Guid id) {
468      Logger.Debug("Shutting down Appdomain for Job " + id);
469      lock (engines) {
470        try {
471          if (engines.ContainsKey(id))
472            engines[id].Dispose();
473          if (appDomains.ContainsKey(id)) {
474            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
475            AppDomain.Unload(appDomains[id]);
476            appDomains.Remove(id);
477          }
478          engines.Remove(id);
479          jobs.Remove(id);
480          PluginCache.Instance.DeletePluginsForJob(id);
481          GC.Collect();
482        } catch (Exception ex) {
483          Logger.Error("Exception when unloading the appdomain: ", ex);
484        }
485      }
486      GC.Collect();
487    }
488  }
489
490
491}
Note: See TracBrowser for help on using the repository browser.