Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Client.Core/3.2/Core.cs @ 1861

Last change on this file since 1861 was 1830, checked in by kgrading, 16 years ago

removed handle on appdomains before unloading, made the whole core more stable for concurrency errors (#467)

File size: 14.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Client.ExecutionEngine;
27using HeuristicLab.Hive.Client.Common;
28using System.Threading;
29using System.Reflection;
30using System.Diagnostics;
31using System.Security.Permissions;
32using System.Security.Policy;
33using System.Security;
34using HeuristicLab.Hive.Client.Communication;
35using HeuristicLab.Hive.Contracts.BusinessObjects;
36using HeuristicLab.Hive.Contracts;
37using System.Runtime.Remoting.Messaging;
38using HeuristicLab.PluginInfrastructure;
39using System.ServiceModel;
40using System.ServiceModel.Description;
41using HeuristicLab.Hive.Client.Core.ClientConsoleService;
42using HeuristicLab.Hive.Client.Core.ConfigurationManager;
43using HeuristicLab.Hive.Client.Communication.ServerService;
44using HeuristicLab.Hive.JobBase;
45using HeuristicLab.Hive.Client.Core.JobStorage;
46
47namespace HeuristicLab.Hive.Client.Core {
48  /// <summary>
49  /// The core component of the Hive Client
50  /// </summary>
51  public class Core: MarshalByRefObject {       
52    public static bool abortRequested { get; set; }
53    private bool currentlyFetching = false;
54
55    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
56    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
57    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
58
59    private WcfService wcfService;
60    private Heartbeat beat;
61   
62    /// <summary>
63    /// Main Method for the client
64    /// </summary>
65    public void Start() {     
66      abortRequested = false;
67      PluginManager.Manager.Initialize();
68      Logging.Instance.Info(this.ToString(), "Hive Client started");
69      ClientConsoleServer server = new ClientConsoleServer();
70      server.StartClientConsoleServer(new Uri("net.tcp://127.0.0.1:8000/ClientConsole/"));
71
72      ConfigManager manager = ConfigManager.Instance;
73      manager.Core = this;
74     
75      //Register all Wcf Service references
76      wcfService = WcfService.Instance;
77      wcfService.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(wcfService_LoginCompleted);
78      wcfService.SendJobCompleted += new EventHandler<SendJobCompletedEventArgs>(wcfService_SendJobCompleted);
79      wcfService.StoreFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted);
80      wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted);
81      wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);
82      wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);
83      wcfService.Connected += new EventHandler(wcfService_Connected);
84      //Recover Server IP and Port from the Settings Framework
85      ConnectionContainer cc = ConfigManager.Instance.GetServerIPAndPort();     
86      if (cc.IPAdress != String.Empty && cc.Port != 0) {
87        wcfService.Connect(cc.IPAdress, cc.Port);
88      }
89   
90      //Initialize the heartbeat
91      beat = new Heartbeat { Interval = 10000 };
92      beat.StartHeartbeat();     
93
94      MessageQueue queue = MessageQueue.GetInstance();
95     
96      //Main processing loop     
97      //Todo: own thread for message handling
98      //Rly?!
99      while (!abortRequested) {
100        MessageContainer container = queue.GetMessage();
101        Debug.WriteLine("Main loop received this message: " + container.Message.ToString());
102        Logging.Instance.Info(this.ToString(), container.Message.ToString());
103        DetermineAction(container);
104      }
105      Console.WriteLine("ended!");
106    }   
107
108    /// <summary>
109    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
110    /// </summary>
111    /// <param name="container">The Container, containing the message</param>
112    private void DetermineAction(MessageContainer container) {           
113      switch (container.Message) {
114        //Server requests to abort a job
115        case MessageContainer.MessageType.AbortJob:
116          if(engines.ContainsKey(container.JobId))
117            engines[container.JobId].Abort();
118          else
119            Logging.Instance.Error(this.ToString(), "AbortJob: Engine doesn't exist");
120          break;
121        //Job has been successfully aborted
122        case MessageContainer.MessageType.JobAborted:
123          //todo: thread this
124          Debug.WriteLine("Job aborted, he's dead");
125          lock (engines) {           
126            Guid jobId = new Guid(container.JobId.ToString());
127            if(engines.ContainsKey(jobId)) {
128              appDomains[jobId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
129              AppDomain.Unload(appDomains[jobId]);
130              appDomains.Remove(jobId);
131              engines.Remove(jobId);
132              jobs.Remove(jobId);
133              GC.Collect();
134            } else
135              Logging.Instance.Error(this.ToString(), "JobAbort: Engine doesn't exist");
136          }
137          break;
138        //Request a Snapshot from the Execution Engine
139        case MessageContainer.MessageType.RequestSnapshot:
140          if (engines.ContainsKey(container.JobId))
141            engines[container.JobId].RequestSnapshot();
142          else
143            Logging.Instance.Error(this.ToString(), "RequestSnapshot: Engine doesn't exist");
144          break;
145        //Snapshot is ready and can be sent back to the Server
146        case MessageContainer.MessageType.SnapshotReady:
147          ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot), container.JobId);         
148          break;
149        //Pull a Job from the Server
150        case MessageContainer.MessageType.FetchJob:
151          if (!currentlyFetching) {
152            wcfService.SendJobAsync(ConfigManager.Instance.GetClientInfo().Id);
153            currentlyFetching = true;
154          }         
155          break;         
156        //A Job has finished and can be sent back to the server
157        case MessageContainer.MessageType.FinishedJob:
158          ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId);         
159          break;     
160        //Hard shutdown of the client
161        case MessageContainer.MessageType.Shutdown:
162          lock (engines) {
163            foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
164              appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
165              AppDomain.Unload(kvp.Value);
166            }
167          }
168          abortRequested = true;
169          beat.StopHeartBeat();
170          WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id);
171          break;
172      }
173    }
174
175    //Asynchronous Threads for interaction with the Execution Engine
176    #region Async Threads for the EE
177   
178    private void GetFinishedJob(object jobId) {
179      Guid jId = (Guid)jobId;     
180      try {
181        if (!engines.ContainsKey(jId)) {
182          Logging.Instance.Error(this.ToString(), "GetFinishedJob: Engine doesn't exist");
183          return;
184        }
185       
186        byte[] sJob = engines[jId].GetFinishedJob();
187
188        if (WcfService.Instance.ConnState == NetworkEnum.WcfConnState.Loggedin) {
189          wcfService.StoreFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id,
190            jId,
191            sJob,
192            1,
193            null,
194            true);
195        } else {
196          JobStorageManager.PersistObjectToDisc(wcfService.ServerIP, wcfService.ServerPort, jId, sJob);
197          lock (engines) {
198            appDomains[jId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
199            AppDomain.Unload(appDomains[jId]);
200            appDomains.Remove(jId);
201            engines.Remove(jId);
202            jobs.Remove(jId);
203          }
204        }
205      }
206      catch (InvalidStateException ise) {
207        Logging.Instance.Error(this.ToString(), "Exception: ", ise);
208      }
209    }
210
211    private void GetSnapshot(object jobId) {
212      Guid jId = (Guid)jobId;
213      byte[] obj = engines[jId].GetSnapshot();
214      wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id,
215        jId,
216        obj,
217        engines[jId].Progress,
218        null);
219      engines[jId].StartOnlyJob();
220    }
221
222    #endregion
223
224    //Eventhandlers for the communication with the wcf Layer
225    #region wcfService Events
226
227    void wcfService_LoginCompleted(object sender, LoginCompletedEventArgs e) {
228      if (e.Result.Success) {
229        Logging.Instance.Info(this.ToString(), "Login completed to Hive Server @ " + DateTime.Now);       
230      } else
231        Logging.Instance.Error(this.ToString(), e.Result.StatusMessage);
232    }   
233
234    void wcfService_SendJobCompleted(object sender, SendJobCompletedEventArgs e) {
235      if (e.Result.StatusMessage != ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT) {       
236        bool sandboxed = false;
237        //todo: For testing!!!
238        //beat.StopHeartBeat();       
239        //Todo: make a set & override the equals method
240        List<byte[]> files = new List<byte[]>();
241        //foreach (CachedHivePluginInfo plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))
242        //  files.AddRange(plugininfo.PluginFiles);
243       
244        AppDomain appDomain = PluginManager.Manager.CreateAndInitAppDomainWithSandbox(e.Result.Job.Id.ToString(), sandboxed, null, files);
245        appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
246        lock (engines) {                   
247          if (!jobs.ContainsKey(e.Result.Job.Id)) {
248            jobs.Add(e.Result.Job.Id, e.Result.Job);
249            appDomains.Add(e.Result.Job.Id, appDomain);
250
251            Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
252            engine.JobId = e.Result.Job.Id;
253            engine.Queue = MessageQueue.GetInstance();
254            engine.Start(e.Result.Job.SerializedJob);
255            engines.Add(e.Result.Job.Id, engine);
256
257            ClientStatusInfo.JobsFetched++;
258
259            Debug.WriteLine("Increment FetchedJobs to:" + ClientStatusInfo.JobsFetched);
260          }
261        }       
262      }
263      currentlyFetching = false;
264    }
265   
266
267    void wcfService_StoreFinishedJobResultCompleted(object sender, StoreFinishedJobResultCompletedEventArgs e) {
268      lock(engines) {
269        try {
270          appDomains[e.Result.JobId].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
271          AppDomain.Unload(appDomains[e.Result.JobId]);
272          appDomains.Remove(e.Result.JobId);
273          engines.Remove(e.Result.JobId);
274          jobs.Remove(e.Result.JobId);
275        }
276        catch (Exception ex) {
277          Logging.Instance.Error(this.ToString(), "Exception when unloading the appdomain: ", ex);
278        }
279      }
280      if (e.Result.Success) {       
281     
282        //if the engine is running again -> we sent an snapshot. Otherwise the job was finished
283        //this method has a risk concerning race conditions.
284        //better expand the sendjobresultcompltedeventargs with a boolean "snapshot?" flag
285
286        ClientStatusInfo.JobsProcessed++;
287        Debug.WriteLine("ProcessedJobs to:" + ClientStatusInfo.JobsProcessed);               
288      } else {       
289        Logging.Instance.Error(this.ToString(), "Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage);
290      }
291    }
292
293    void wcfService_ProcessSnapshotCompleted(object sender, ProcessSnapshotCompletedEventArgs e) {
294      Logging.Instance.Info(this.ToString(), "Snapshot " + e.Result.JobId + " has been transmitted according to plan.");
295    }
296
297    //Todo: First stop all threads, then terminate
298    void wcfService_ServerChanged(object sender, EventArgs e) {
299      Logging.Instance.Info(this.ToString(), "ServerChanged has been called");
300      lock (engines) {
301        foreach (KeyValuePair<Guid, AppDomain> entries in appDomains) {
302          appDomains[entries.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
303          AppDomain.Unload(appDomains[entries.Key]);
304        }
305        appDomains = new Dictionary<Guid, AppDomain>();
306        engines = new Dictionary<Guid, Executor>();
307      }
308    }
309
310    void wcfService_Connected(object sender, EventArgs e) {
311      wcfService.LoginSync(ConfigManager.Instance.GetClientInfo());
312      JobStorageManager.CheckAndSubmitJobsFromDisc();
313    }
314
315    //this is a little bit tricky -
316    void wcfService_ConnectionRestored(object sender, EventArgs e) {
317      Logging.Instance.Info(this.ToString(), "Reconnected to old server - checking currently running appdomains");                 
318
319      foreach (KeyValuePair<Guid, Executor> execKVP in engines) {
320        if (!execKVP.Value.Running && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) {
321          Logging.Instance.Info(this.ToString(), "Checking for JobId: " + execKVP.Value.JobId);
322          Thread finThread = new Thread(new ParameterizedThreadStart(GetFinishedJob));
323          finThread.Start(execKVP.Value.JobId);
324        }
325      }
326    }
327
328    #endregion
329
330    public Dictionary<Guid, Executor> GetExecutionEngines() {
331      return engines;
332    }
333
334    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
335      Logging.Instance.Error(this.ToString(), "Exception in AppDomain: " + e.ExceptionObject.ToString());
336     
337    }
338
339    internal Dictionary<Guid, Job> GetJobs() {
340      return jobs;
341    }
342  }
343}
Note: See TracBrowser for help on using the repository browser.