Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Client.Core/3.2/Core.cs @ 2588

Last change on this file since 2588 was 2588, checked in by kgrading, 15 years ago

added logging (#826)

File size: 18.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Client.ExecutionEngine;
27using HeuristicLab.Hive.Client.Common;
28using System.Threading;
29using System.Reflection;
30using System.Diagnostics;
31using System.Security.Permissions;
32using System.Security.Policy;
33using System.Security;
34using HeuristicLab.Hive.Client.Communication;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Hive.Contracts;
37using System.Runtime.Remoting.Messaging;
38using HeuristicLab.PluginInfrastructure;
39using System.ServiceModel;
40using System.ServiceModel.Description;
41using HeuristicLab.Hive.Client.Core.ClientConsoleService;
42using HeuristicLab.Hive.Client.Core.ConfigurationManager;
43using HeuristicLab.Hive.Client.Communication.ServerService;
44using HeuristicLab.Hive.JobBase;
45using HeuristicLab.Hive.Client.Core.JobStorage;
46
47namespace HeuristicLab.Hive.Client.Core {
48  /// <summary>
49  /// The core component of the Hive Client
50  /// </summary>
51  public class Core: MarshalByRefObject {       
52    public static bool abortRequested { get; set; }
53    private bool currentlyFetching = false;
54
55    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
56    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
57    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
58
59    private WcfService wcfService;
60    private Heartbeat beat;
61   
62    /// <summary>
63    /// Main Method for the client
64    /// </summary>
65    public void Start() {     
66      abortRequested = false;
67      PluginManager.Manager.Initialize();
68      Logging.Instance.Info(this.ToString(), "Hive Client started");
69      ClientConsoleServer server = new ClientConsoleServer();
70      server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/ClientConsole/"));
71
72      ConfigManager manager = ConfigManager.Instance;
73      manager.Core = this;
74
75
76     
77      //Register all Wcf Service references
78      wcfService = WcfService.Instance;
79      wcfService.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(wcfService_LoginCompleted);
80      wcfService.SendJobCompleted += new EventHandler<SendJobCompletedEventArgs>(wcfService_SendJobCompleted);
81      wcfService.StoreFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
82      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
83      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
84      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
85      wcfService.Connected += new EventHandler(wcfService_Connected);
86      //Recover Server IP and Port from the Settings Framework
87      ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();     
88      if (cc.IPAdress != String.Empty && cc.Port != 0)
89        wcfService.SetIPAndPort(cc.IPAdress, cc.Port);
90
91      if (UptimeManager.Instance.isOnline())
92        wcfService.Connect();
93         
94      //Initialize the heartbeat
95      beat = new Heartbeat { Interval = 10000 };
96      beat.StartHeartbeat();     
97
98      MessageQueue queue = MessageQueue.GetInstance();
99     
100      //Main processing loop     
101      //Todo: own thread for message handling
102      //Rly?!
103      while (!abortRequested) {
104        MessageContainer container = queue.GetMessage();       
105        DetermineAction(container);
106      }
107      Console.WriteLine("ended!");
108    }   
109
110    /// <summary>
111    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
112    /// </summary>
113    /// <param name="container">The Container, containing the message</param>
114    private void DetermineAction(MessageContainer container) {
115      Logging.Instance.Info(this.ToString(), "Message: " + container.Message.ToString() + " for job: " + container.JobId);       
116      switch (container.Message) {
117        //Server requests to abort a job
118        case MessageContainer.MessageType.AbortJob:         
119          if(engines.ContainsKey(container.JobId))
120            engines[container.JobId].Abort();
121          else
122            Logging.Instance.Error(this.ToString(), "AbortJob: Engine doesn't exist");
123          break;
124        //Job has been successfully aborted
125
126
127        case MessageContainer.MessageType.JobAborted:         
128        //todo: thread this         
129          lock (engines) {           
130            Guid jobId = new Guid(container.JobId.ToString());
131            if(engines.ContainsKey(jobId)) {
132              appDomains[jobId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
133              AppDomain.Unload(appDomains[jobId]);
134              appDomains.Remove(jobId);
135              engines.Remove(jobId);
136              jobs.Remove(jobId);
137              GC.Collect();
138            } else
139              Logging.Instance.Error(this.ToString(), "JobAbort: Engine doesn't exist");
140          }
141          break;
142       
143       
144        //Request a Snapshot from the Execution Engine
145        case MessageContainer.MessageType.RequestSnapshot:         
146          if (engines.ContainsKey(container.JobId))
147            engines[container.JobId].RequestSnapshot();
148          else
149            Logging.Instance.Error(this.ToString(), "RequestSnapshot: Engine doesn't exist");
150          break;
151       
152       
153        //Snapshot is ready and can be sent back to the Server
154        case MessageContainer.MessageType.SnapshotReady:         
155          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);         
156          break;
157       
158       
159        //Pull a Job from the Server
160        case MessageContainer.MessageType.FetchJob:         
161          if (!currentlyFetching) {
162            wcfService.SendJobAsync(ConfigManager.Instance.GetClientInfo().Id);
163            currentlyFetching = true;
164          } else
165            Logging.Instance.Info(this.ToString(), "Currently fetching, won't fetch this time!");
166          break;         
167       
168       
169        //A Job has finished and can be sent back to the server
170        case MessageContainer.MessageType.FinishedJob:         
171          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);         
172          break;     
173       
174
175        //When the timeslice is up
176        case MessageContainer.MessageType.UptimeLimitDisconnect:
177          Logging.Instance.Info(this.ToString(), "Uptime Limit reached, storing jobs and sending them back");
178
179          //check if there are running jobs
180          if (engines.Count > 0) {
181            //make sure there is no more fetching of jobs while the snapshots get processed
182            currentlyFetching = true;
183            //request a snapshot of each running job
184            foreach (KeyValuePair<Guid, Executor> kvp in engines) {
185              kvp.Value.RequestSnapshot();
186            }
187           
188          } else {
189            //Disconnect afterwards
190            WcfService.Instance.Disconnect();
191          }
192          break;
193       
194       
195        //Hard shutdown of the client
196        case MessageContainer.MessageType.Shutdown:
197          lock (engines) {
198            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
199              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
200              AppDomain.Unload(kvp.Value);
201            }
202          }
203          abortRequested = true;
204          beat.StopHeartBeat();
205          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
206          break;
207      }
208    }
209
210    //Asynchronous Threads for interaction with the Execution Engine
211    #region Async Threads for the EE
212   
213    /// <summary>
214    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
215    /// once the connection gets reestablished, the job gets submitted
216    /// </summary>
217    /// <param name="jobId"></param>
218    private void GetFinishedJob(object jobId) {
219      Guid jId = (Guid)jobId;
220      Logging.Instance.Info(this.ToString(), "Getting the finished job with id: " + jId);
221      try {
222        if (!engines.ContainsKey(jId)) {
223          Logging.Instance.Error(this.ToString(), "GetFinishedJob: Engine doesn't exist");
224          return;
225        }
226       
227        byte[] sJob = engines[jId].GetFinishedJob();
228
229        if (WcfService.Instance.ConnState == NetworkEnum.WcfConnState.Loggedin) {
230          Logging.Instance.Info(this.ToString(), "Sending the finished job with id: " + jId);
231          wcfService.StoreFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id,
232            jId,
233            sJob,
234            1,
235            null,
236            true);
237        } else {
238          Logging.Instance.Info(this.ToString(), "Storing the finished job with id: " + jId + " to hdd");
239          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
240          lock (engines) {
241            appDomains[jId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
242            AppDomain.Unload(appDomains[jId]);
243            appDomains.Remove(jId);
244            engines.Remove(jId);
245            jobs.Remove(jId);
246          }
247        }
248      }
249      catch (InvalidStateException ise) {
250        Logging.Instance.Error(this.ToString(), "Exception: ", ise);
251      }
252    }
253
254    private void GetSnapshot(object jobId) {
255      Logging.Instance.Info(this.ToString(), "Fetching a snapshot for job " + jobId);
256      Guid jId = (Guid)jobId;
257      byte[] obj = engines[jId].GetSnapshot();
258      Logging.Instance.Info(this.ToString(), "BEGIN: Sending snapshot sync");
259      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
260        jId,
261        obj,
262        engines[jId].Progress,
263        null);
264      Logging.Instance.Info(this.ToString(), "END: Sended snapshot sync");
265      //Uptime Limit reached, now is a good time to destroy this jobs.
266      if (!UptimeManager.Instance.isOnline()) {
267        KillAppDomain(jId);       
268        //Still anything running?
269        if (engines.Count == 0)
270          WcfService.Instance.Disconnect();
271     
272      } else {
273        Logging.Instance.Info(this.ToString(), "Restarting the job" + jobId);
274        engines[jId].StartOnlyJob();
275      }
276    }
277
278    #endregion
279
280    //Eventhandlers for the communication with the wcf Layer
281    #region wcfService Events
282    /// <summary>
283    /// Login has returned
284    /// </summary>
285    /// <param name="sender"></param>
286    /// <param name="e"></param>
287    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
288      if (e.Result.Success) {
289        currentlyFetching = false;
290        Logging.Instance.Info(this.ToString(), "Login completed to Hive Server @ " + DateTime.Now);       
291      } else
292        Logging.Instance.Error(this.ToString(), e.Result.StatusMessage);
293    }   
294
295    /// <summary>
296    /// A new Job from the wcfService has been received and will be started within a AppDomain.
297    /// </summary>
298    /// <param name="sender"></param>
299    /// <param name="e"></param>
300    void wcfService_SendJobCompleted(object sender, SendJobCompletedEventArgs e) {
301      Logging.Instance.Info(this.ToString(), "Received new job with id " + e.Result.Job.Id);
302      if (e.Result.StatusMessage != ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT) {       
303        bool sandboxed = false;
304        List<byte[]> files = new List<byte[]>();
305        Logging.Instance.Info(this.ToString(), "Fetching plugins for job " + e.Result.Job.Id);
306        foreach (CachedHivePluginInfo plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
307          files.AddRange(plugininfo.PluginFiles);
308        Logging.Instance.Info(this.ToString(), "Plugins fetched for job " + e.Result.Job.Id);
309        AppDomain appDomain = PluginManager.Manager.CreateAndInitAppDomainWithSandbox(e.Result.Job.Id.ToString(), sandboxed, null, files);
310        appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
311        lock (engines) {
312          if (!jobs.ContainsKey(e.Result.Job.Id)) {
313            jobs.Add(e.Result.Job.Id, e.Result.Job);
314            appDomains.Add(e.Result.Job.Id, appDomain);
315            Logging.Instance.Info(this.ToString(), "Creating AppDomain");
316            Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
317            Logging.Instance.Info(this.ToString(), "Created AppDomain");
318            engine.JobId = e.Result.Job.Id;
319            engine.Queue = MessageQueue.GetInstance();
320            Logging.Instance.Info(this.ToString(), "Starting Engine for job " + e.Result.Job.Id);
321            engine.Start(e.Data);
322            engines.Add(e.Result.Job.Id, engine);
323
324            ClientStatusInfo.JobsFetched++;
325
326            Debug.WriteLine("Increment FetchedJobs to:" + ClientStatusInfo.JobsFetched);
327          }
328        }       
329      } else
330        Logging.Instance.Info(this.ToString(), "No more jobs left!");
331      currentlyFetching = false;
332    }
333
334    /// <summary>
335    /// A finished job has been stored on the server
336    /// </summary>
337    /// <param name="sender"></param>
338    /// <param name="e"></param>
339    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
340      Logging.Instance.Info(this.ToString(), "Job submitted with id " + e.Result.JobId);
341      KillAppDomain(e.Result.JobId);
342      if (e.Result.Success) {           
343        ClientStatusInfo.JobsProcessed++;
344        Debug.WriteLine("ProcessedJobs to:" + ClientStatusInfo.JobsProcessed);               
345      } else {       
346        Logging.Instance.Error(this.ToString(), "Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
347      }
348    }
349
350    /// <summary>
351    /// A snapshot has been stored on the server
352    /// </summary>
353    /// <param name="sender"></param>
354    /// <param name="e"></param>
355    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
356      Logging.Instance.Info(this.ToString(), "Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
357    }
358
359    /// <summary>
360    /// The server has been changed. All Appdomains and Jobs must be aborted!
361    /// </summary>
362    /// <param name="sender"></param>
363    /// <param name="e"></param>
364    void wcfService_ServerChanged(object sender, EventArgs e) {
365      Logging.Instance.Info(this.ToString(), "ServerChanged has been called");
366      lock (engines) {
367        foreach (KeyValuePair<Guid, Executor> entries in engines) {
368          engines[entries.Key].Abort();
369          //appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
370          //AppDomain.Unload(appDomains[entries.Key]);
371        }
372        //appDomains = new Dictionary<Guid, AppDomain>();
373        //engines = new Dictionary<Guid, Executor>();
374        //jobs = new Dictionary<Guid, Job>();
375      }
376    }
377
378    /// <summary>
379    /// Connnection to the server has been estabilshed => Login and Send the persistet Jobs from the harddisk.
380    /// </summary>
381    /// <param name="sender"></param>
382    /// <param name="e"></param>
383    void wcfService_Connected(object sender, EventArgs e) {
384      wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());     
385      JobStorageManager.CheckAndSubmitJobsFromDisc();
386      currentlyFetching = false;
387    }
388
389    //this is a little bit tricky -
390    void wcfService_ConnectionRestored(object sender, EventArgs e) {
391      Logging.Instance.Info(this.ToString(), "Reconnected to old server - checking currently running appdomains");                 
392
393      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
394        if (!execKVP.Value.Running && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
395          Logging.Instance.Info(this.ToString(), "Checking for JobId: " + execKVP.Value.JobId);
396          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
397          finThread.Start(execKVP.Value.JobId);
398        }
399      }
400    }
401
402    #endregion
403
404    public Dictionary<Guid, Executor> GetExecutionEngines() {
405      return engines;
406    }
407
408    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
409      Logging.Instance.Error(this.ToString(), "Exception in AppDomain: " + e.ExceptionObject.ToString());     
410    }
411
412    internal Dictionary<Guid, Job> GetJobs() {           
413      return jobs;
414    }
415
416    /// <summary>
417    /// Kill a appdomain with a specific id.
418    /// </summary>
419    /// <param name="id">the GUID of the job</param>
420    private void KillAppDomain(Guid id) {
421      Logging.Instance.Info(this.ToString(), "Shutting down Appdomain for Job " + id);
422      lock (engines) {
423        try {
424          appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
425          AppDomain.Unload(appDomains[id]);
426          appDomains.Remove(id);
427          engines.Remove(id);
428          jobs.Remove(id);
429        }       
430        catch (Exception ex) {
431          Logging.Instance.Error(this.ToString(), "Exception when unloading the appdomain: ", ex);
432        }
433      }
434      GC.Collect();
435    }
436  }
437}
Note: See TracBrowser for help on using the repository browser.