Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Client.Core/3.2/Core.cs @ 3115

Last change on this file since 3115 was 3022, checked in by kgrading, 15 years ago

added the calendar in the dal and the server console. Works on every Resource (Group / Client) (#908)

File size: 18.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Client.ExecutionEngine;
27using HeuristicLab.Hive.Client.Common;
28using System.Threading;
29using System.Reflection;
30using System.Diagnostics;
31using System.Security.Permissions;
32using System.Security.Policy;
33using System.Security;
34using HeuristicLab.Hive.Client.Communication;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Hive.Contracts;
37using System.Runtime.Remoting.Messaging;
38using HeuristicLab.PluginInfrastructure;
39using System.ServiceModel;
40using System.ServiceModel.Description;
41using HeuristicLab.Hive.Client.Core.ClientConsoleService;
42using HeuristicLab.Hive.Client.Core.ConfigurationManager;
43using HeuristicLab.Hive.Client.Communication.ServerService;
44using HeuristicLab.Hive.JobBase;
45using HeuristicLab.Hive.Client.Core.JobStorage;
46
47namespace HeuristicLab.Hive.Client.Core {
48  /// <summary>
49  /// The core component of the Hive Client
50  /// </summary>
51  public class Core : MarshalByRefObject {
52    public static bool abortRequested { get; set; }
53    private bool currentlyFetching = false;
54
55    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
56    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
57    private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>();
58
59    private WcfService wcfService;
60    private Heartbeat beat;
61
62    /// <summary>
63    /// Main Method for the client
64    /// </summary>
65    public void Start() {
66      abortRequested = false;
67      Logging.Instance.Info(this.ToString(), "Hive Client started");
68      ClientConsoleServer server = new ClientConsoleServer();
69      server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/ClientConsole/"));
70
71      ConfigManager manager = ConfigManager.Instance;
72      manager.Core = this;
73
74
75
76      //Register all Wcf Service references
77      wcfService = WcfService.Instance;
78      wcfService.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(wcfService_LoginCompleted);
79      wcfService.SendJobCompleted += new EventHandler<SendJobCompletedEventArgs>(wcfService_SendJobCompleted);
80      wcfService.StoreFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
81      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
82      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
83      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
84      wcfService.Connected += new EventHandler(wcfService_Connected);
85      //Recover Server IP and Port from the Settings Framework
86      ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();
87      if (cc.IPAdress != String.Empty && cc.Port != 0)
88        wcfService.SetIPAndPort(cc.IPAdress, cc.Port);
89
90      if (UptimeManager.Instance.isOnline())
91        wcfService.Connect();
92
93      //Initialize the heartbeat
94      beat = new Heartbeat { Interval = 10000 };
95      beat.StartHeartbeat();
96
97      MessageQueue queue = MessageQueue.GetInstance();
98
99      //Main processing loop     
100      //Todo: own thread for message handling
101      //Rly?!
102      while (!abortRequested) {
103        MessageContainer container = queue.GetMessage();       
104        DetermineAction(container);
105      }
106      System.Console.WriteLine("ended");
107    }
108
109    /// <summary>
110    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
111    /// </summary>
112    /// <param name="container">The Container, containing the message</param>
113    private void DetermineAction(MessageContainer container) {
114      Logging.Instance.Info(this.ToString(), "Message: " + container.Message.ToString() + " for job: " + container.JobId);       
115      switch (container.Message) {
116        //Server requests to abort a job
117        case MessageContainer.MessageType.AbortJob:         
118          if (engines.ContainsKey(container.JobId))
119            engines[container.JobId].Abort();
120          else
121            Logging.Instance.Error(this.ToString(), "AbortJob: Engine doesn't exist");
122          break;
123        //Job has been successfully aborted
124
125
126        case MessageContainer.MessageType.JobAborted:         
127        //todo: thread this         
128          lock (engines) {
129            Guid jobId = new Guid(container.JobId.ToString());
130            if (engines.ContainsKey(jobId)) {
131              appDomains[jobId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
132              AppDomain.Unload(appDomains[jobId]);
133              appDomains.Remove(jobId);
134              engines.Remove(jobId);
135              jobs.Remove(jobId);
136              GC.Collect();
137            } else
138              Logging.Instance.Error(this.ToString(), "JobAbort: Engine doesn't exist");
139          }
140          break;
141
142
143        //Request a Snapshot from the Execution Engine
144        case MessageContainer.MessageType.RequestSnapshot:         
145          if (engines.ContainsKey(container.JobId))
146            engines[container.JobId].RequestSnapshot();
147          else
148            Logging.Instance.Error(this.ToString(), "RequestSnapshot: Engine doesn't exist");
149          break;
150
151
152        //Snapshot is ready and can be sent back to the Server
153        case MessageContainer.MessageType.SnapshotReady:         
154          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);
155          break;
156
157
158        //Pull a Job from the Server
159        case MessageContainer.MessageType.FetchJob:         
160          if (!currentlyFetching) {
161            wcfService.SendJobAsync(ConfigManager.Instance.GetClientInfo().Id);
162            currentlyFetching = true;
163          } else
164            Logging.Instance.Info(this.ToString(), "Currently fetching, won't fetch this time!");
165          break;         
166       
167       
168        //A Job has finished and can be sent back to the server
169        case MessageContainer.MessageType.FinishedJob:         
170          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);
171          break;
172
173
174        //When the timeslice is up
175        case MessageContainer.MessageType.UptimeLimitDisconnect:
176          Logging.Instance.Info(this.ToString(), "Uptime Limit reached, storing jobs and sending them back");
177
178          //check if there are running jobs
179          if (engines.Count > 0) {
180            //make sure there is no more fetching of jobs while the snapshots get processed
181            currentlyFetching = true;
182            //request a snapshot of each running job
183            foreach (KeyValuePair<Guid, Executor> kvp in engines) {
184              kvp.Value.RequestSnapshot();
185            }
186
187          } else {
188            //Disconnect afterwards
189            WcfService.Instance.Disconnect();
190          }
191          break;
192
193
194        //Hard shutdown of the client
195        case MessageContainer.MessageType.Shutdown:
196          lock (engines) {
197            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
198              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
199              AppDomain.Unload(kvp.Value);
200            }
201          }
202          abortRequested = true;
203          beat.StopHeartBeat();
204          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
205          break;
206      }
207    }
208
209    //Asynchronous Threads for interaction with the Execution Engine
210    #region Async Threads for the EE
211
212    /// <summary>
213    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
214    /// once the connection gets reestablished, the job gets submitted
215    /// </summary>
216    /// <param name="jobId"></param>
217    private void GetFinishedJob(object jobId) {
218      Guid jId = (Guid)jobId;
219      Logging.Instance.Info(this.ToString(), "Getting the finished job with id: " + jId);
220      try {
221        if (!engines.ContainsKey(jId)) {
222          Logging.Instance.Error(this.ToString(), "GetFinishedJob: Engine doesn't exist");
223          return;
224        }
225
226        byte[] sJob = engines[jId].GetFinishedJob();
227
228        if (WcfService.Instance.ConnState == NetworkEnum.WcfConnState.Loggedin) {
229          Logging.Instance.Info(this.ToString(), "Sending the finished job with id: " + jId);
230          wcfService.StoreFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id,
231            jId,
232            sJob,
233            1,
234            null,
235            true);
236        } else {
237          Logging.Instance.Info(this.ToString(), "Storing the finished job with id: " + jId + " to hdd");
238          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
239          lock (engines) {
240            appDomains[jId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
241            AppDomain.Unload(appDomains[jId]);
242            appDomains.Remove(jId);
243            engines.Remove(jId);
244            jobs.Remove(jId);
245          }
246        }
247      }
248      catch (InvalidStateException ise) {
249        Logging.Instance.Error(this.ToString(), "Exception: ", ise);
250      }
251    }
252
253    private void GetSnapshot(object jobId) {
254      Logging.Instance.Info(this.ToString(), "Fetching a snapshot for job " + jobId);
255      Guid jId = (Guid)jobId;
256      byte[] obj = engines[jId].GetSnapshot();
257      Logging.Instance.Info(this.ToString(), "BEGIN: Sending snapshot sync");
258      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
259        jId,
260        obj,
261        engines[jId].Progress,
262        null);
263      Logging.Instance.Info(this.ToString(), "END: Sended snapshot sync");
264      //Uptime Limit reached, now is a good time to destroy this jobs.
265      if (!UptimeManager.Instance.isOnline()) {
266        KillAppDomain(jId);
267        //Still anything running?
268        if (engines.Count == 0)
269          WcfService.Instance.Disconnect();
270
271      } else {
272        Logging.Instance.Info(this.ToString(), "Restarting the job" + jobId);
273        engines[jId].StartOnlyJob();
274      }
275    }
276
277    #endregion
278
279    //Eventhandlers for the communication with the wcf Layer
280    #region wcfService Events
281    /// <summary>
282    /// Login has returned
283    /// </summary>
284    /// <param name="sender"></param>
285    /// <param name="e"></param>
286    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
287      if (e.Result.Success) {
288        currentlyFetching = false;
289        Logging.Instance.Info(this.ToString(), "Login completed to Hive Server @ " + DateTime.Now);
290      } else
291        Logging.Instance.Error(this.ToString(), e.Result.StatusMessage);
292    }
293
294    /// <summary>
295    /// A new Job from the wcfService has been received and will be started within a AppDomain.
296    /// </summary>
297    /// <param name="sender"></param>
298    /// <param name="e"></param>
299    void wcfService_SendJobCompleted(object sender, SendJobCompletedEventArgs e) {
300      Logging.Instance.Info(this.ToString(), "Received new job with id " + e.Result.Job.Id);
301      if (e.Result.StatusMessage != ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT) {       
302        bool sandboxed = false;
303        List<byte[]> files = new List<byte[]>();
304        Logging.Instance.Info(this.ToString(), "Fetching plugins for job " + e.Result.Job.Id);
305        foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
306          files.AddRange(plugininfo.PluginFiles);
307        Logging.Instance.Info(this.ToString(), "Plugins fetched for job " + e.Result.Job.Id);
308        AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(e.Result.Job.Id.ToString(), files);
309        appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
310        lock (engines) {
311          if (!jobs.ContainsKey(e.Result.Job.Id)) {
312            jobs.Add(e.Result.Job.Id, e.Result.Job);
313            appDomains.Add(e.Result.Job.Id, appDomain);
314            Logging.Instance.Info(this.ToString(), "Creating AppDomain");
315            Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
316            Logging.Instance.Info(this.ToString(), "Created AppDomain");
317            engine.JobId = e.Result.Job.Id;
318            engine.Queue = MessageQueue.GetInstance();
319            Logging.Instance.Info(this.ToString(), "Starting Engine for job " + e.Result.Job.Id);
320            engine.Start(e.Data);
321            engines.Add(e.Result.Job.Id, engine);
322
323            ClientStatusInfo.JobsFetched++;
324
325            Debug.WriteLine("Increment FetchedJobs to:" + ClientStatusInfo.JobsFetched);
326          }
327        }
328      } else
329        Logging.Instance.Info(this.ToString(), "No more jobs left!");
330      currentlyFetching = false;
331    }
332
333    /// <summary>
334    /// A finished job has been stored on the server
335    /// </summary>
336    /// <param name="sender"></param>
337    /// <param name="e"></param>
338    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
339      Logging.Instance.Info(this.ToString(), "Job submitted with id " + e.Result.JobId);
340      KillAppDomain(e.Result.JobId);
341      if (e.Result.Success) {
342        ClientStatusInfo.JobsProcessed++;
343        Debug.WriteLine("ProcessedJobs to:" + ClientStatusInfo.JobsProcessed);
344      } else {
345        Logging.Instance.Error(this.ToString(), "Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
346      }
347    }
348
349    /// <summary>
350    /// A snapshot has been stored on the server
351    /// </summary>
352    /// <param name="sender"></param>
353    /// <param name="e"></param>
354    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
355      Logging.Instance.Info(this.ToString(), "Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
356    }
357
358    /// <summary>
359    /// The server has been changed. All Appdomains and Jobs must be aborted!
360    /// </summary>
361    /// <param name="sender"></param>
362    /// <param name="e"></param>
363    void wcfService_ServerChanged(object sender, EventArgs e) {
364      Logging.Instance.Info(this.ToString(), "ServerChanged has been called");
365      lock (engines) {
366        foreach (KeyValuePair<Guid, Executor> entries in engines) {
367          engines[entries.Key].Abort();
368          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
369          //AppDomain.Unload(appDomains[entries.Key]);
370        }
371        //appDomains = new Dictionary<Guid, AppDomain>();
372        //engines = new Dictionary<Guid, Executor>();
373        //jobs = new Dictionary<Guid, Job>();
374      }
375    }
376
377    /// <summary>
378    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
379    /// </summary>
380    /// <param name="sender"></param>
381    /// <param name="e"></param>
382    void wcfService_Connected(object sender, EventArgs e) {
383      wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
384      JobStorageManager.CheckAndSubmitJobsFromDisc();
385      currentlyFetching = false;
386    }
387
388    //this is a little bit tricky -
389    void wcfService_ConnectionRestored(object sender, EventArgs e) {
390      Logging.Instance.Info(this.ToString(), "Reconnected to old server - checking currently running appdomains");
391
392      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
393        if (!execKVP.Value.Running && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
394          Logging.Instance.Info(this.ToString(), "Checking for JobId: " + execKVP.Value.JobId);
395          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
396          finThread.Start(execKVP.Value.JobId);
397        }
398      }
399    }
400
401    #endregion
402
403    public Dictionary<Guid, Executor> GetExecutionEngines() {
404      return engines;
405    }
406
407    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
408      Logging.Instance.Error(this.ToString(), "Exception in AppDomain: " + e.ExceptionObject.ToString());
409    }
410
411    internal Dictionary<Guid, JobDto> GetJobs() {
412      return jobs;
413    }
414
415    /// <summary>
416    /// Kill a appdomain with a specific id.
417    /// </summary>
418    /// <param name="id">the GUID of the job</param>
419    private void KillAppDomain(Guid id) {
420      Logging.Instance.Info(this.ToString(), "Shutting down Appdomain for Job " + id);
421      lock (engines) {
422        try {
423          appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
424          AppDomain.Unload(appDomains[id]);
425          appDomains.Remove(id);
426          engines.Remove(id);
427          jobs.Remove(id);
428        }
429        catch (Exception ex) {
430          Logging.Instance.Error(this.ToString(), "Exception when unloading the appdomain: ", ex);
431        }
432      }
433      GC.Collect();
434    }
435  }
436}
Note: See TracBrowser for help on using the repository browser.