Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5402

Last change on this file since 5402 was 5402, checked in by cneumuel, 13 years ago

#1233

  • single sign on with HL
  • local plugins are uploaded if not available online (user can force the useage of local plugins)
  • changed plugin and plugindata db-schema
  • plugin dao tests
File size: 17.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.CompilerServices;
26using System.ServiceModel;
27using System.Threading;
28using HeuristicLab.Clients.Hive.Slave.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using HeuristicLab.Services.Hive.Common;
32using HeuristicLab.Services.Hive.Common.DataTransfer;
33
34
35namespace HeuristicLab.Clients.Hive.Slave {
36  /// <summary>
37  /// The core component of the Hive Client
38  /// </summary>
39  public class Core : MarshalByRefObject {
40
41    //TODO: this class should be a singleton; there is only one instance, the reference is meanwhile save in TheCore
42    public static Core theCore;
43
44    public static bool abortRequested { get; set; }
45    private Semaphore waitShutdownSem = new Semaphore(0, 1);
46    public static ILog Log { get; set; }
47
48    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
49    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
50    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
51
52    private WcfService wcfService;
53    private HeartbeatManager heartbeatManager;
54    private int coreThreadId;
55
56    private ISlaveCommunication ClientCom;
57    private ServiceHost slaveComm;
58
59    public Dictionary<Guid, Executor> ExecutionEngines {
60      get { return engines; }
61    }
62
63    internal Dictionary<Guid, Job> Jobs {
64      get { return jobs; }
65    }
66
67    public Core() {
68      theCore = this;
69    }
70
71    /// <summary>
72    /// Main Method for the client
73    /// </summary>
74    public void Start() {
75      coreThreadId = Thread.CurrentThread.ManagedThreadId;
76      abortRequested = false;
77
78      //start the client communication service (pipe between slave and slave gui)
79      slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
80      slaveComm.Open();
81
82      ClientCom = SlaveClientCom.Instance.ClientCom;
83      ClientCom.LogMessage("Hive Slave started");
84
85      ConfigManager manager = ConfigManager.Instance;
86      manager.Core = this;
87
88      wcfService = WcfService.Instance;
89      RegisterServiceEvents();
90
91      StartHeartbeats(); // Start heartbeats thread
92      DispatchMessageQueue(); // dispatch messages until abortRequested
93
94      DeRegisterServiceEvents();
95      waitShutdownSem.Release();
96    }
97
98    private void StartHeartbeats() {
99      //Initialize the heartbeat     
100      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
101      heartbeatManager.StartHeartbeat();
102    }
103
104    private void DispatchMessageQueue() {
105      MessageQueue queue = MessageQueue.GetInstance();
106      while (!abortRequested) {
107        MessageContainer container = queue.GetMessage();
108        DetermineAction(container);
109      }
110    }
111
112    private void RegisterServiceEvents() {
113      WcfService.Instance.Connected += new EventHandler(wcfService_Connected);
114      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(wcfService_ExceptionOccured);
115    }
116
117    private void DeRegisterServiceEvents() {
118      WcfService.Instance.Connected -= wcfService_Connected;
119      WcfService.Instance.ExceptionOccured -= wcfService_ExceptionOccured;
120    }
121
122    void wcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
123      ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
124    }
125
126    void wcfService_Connected(object sender, EventArgs e) {
127      ClientCom.LogMessage("Connected successfully to Hive server");
128    }
129
130    /// <summary>
131    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
132    /// </summary>
133    /// <param name="container">The Container, containing the message</param>
134    private void DetermineAction(MessageContainer container) {
135      ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
136      //TODO: find a better solution
137      if (container is ExecutorMessageContainer<Guid>) {
138        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
139        c.execute();
140      } else if (container is MessageContainer) {
141        switch (container.Message) {
142          //Server requests to abort a job
143          case MessageContainer.MessageType.AbortJob:
144            if (engines.ContainsKey(container.JobId))
145              try {
146                engines[container.JobId].Abort();
147              }
148              catch (AppDomainUnloadedException) {
149                // appdomain already unloaded. Finishing job probably ongoing
150              } else
151              ClientCom.LogMessage("AbortJob: Engine doesn't exist");
152            break;
153
154          //Pull a Job from the Server
155          case MessageContainer.MessageType.AquireJob:
156            Job myJob = wcfService.AquireJob();
157            //TODO: handle in own thread!!
158            JobData jobData = wcfService.GetJobData(myJob.Id);
159            StartJobInAppDomain(myJob, jobData);
160            break;
161
162          //Hard shutdown of the client
163          case MessageContainer.MessageType.ShutdownSlave:
164            ShutdownCore();
165            break;
166          case MessageContainer.MessageType.HardPause:
167            doHardPause();
168            break;
169          case MessageContainer.MessageType.SoftPause:
170            doSoftPause();
171            break;
172          case MessageContainer.MessageType.Restart:
173            doRestart();
174            break;
175        }
176      } else {
177        ClientCom.LogMessage("Unknown MessageContainer: " + container);
178      }
179    }
180
181    /// <summary>
182    /// reinitializes everything and continues operation,
183    /// can be called after SoftPause() or HardPause()
184    /// </summary>
185    public void Restart() {
186      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.Restart);
187      MessageQueue.GetInstance().AddMessage(mc);
188    }
189
190    private void doRestart() {
191      ClientCom.LogMessage("Restart received");
192      StartHeartbeats();
193      ClientCom.LogMessage("Restart done");
194    }
195
196    /// <summary>
197    /// wait for jobs to finish, then pause client
198    /// </summary>
199    public void SoftPause() {
200      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.SoftPause);
201      MessageQueue.GetInstance().AddMessage(mc);
202    }
203
204    private void doSoftPause() {
205      ClientCom.LogMessage("Soft pause received");
206
207      //TODO: jobs get removed from Jobs map, is this a problem?
208      foreach (Job job in Jobs.Values) {
209        engines[job.Id].Pause();
210        JobData sJob = engines[job.Id].GetFinishedJob();
211        job.Exception = engines[job.Id].CurrentException;
212        job.ExecutionTime = engines[job.Id].ExecutionTime;
213
214        try {
215          ClientCom.LogMessage("Sending the paused job with id: " + job.Id);
216          wcfService.UpdateJob(job, sJob);
217          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
218        }
219        catch (Exception e) {
220          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
221        }
222        finally {
223          KillAppDomain(job.Id); // kill app-domain in every case         
224        }
225      }
226
227      heartbeatManager.StopHeartBeat();
228      WcfService.Instance.Disconnect();
229      ClientCom.LogMessage("Soft pause done");
230    }
231
232    /// <summary>
233    /// pause slave immediately
234    /// </summary>
235    public void HardPause() {
236      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.HardPause);
237      MessageQueue.GetInstance().AddMessage(mc);
238    }
239
240    private void doHardPause() {
241      ClientCom.LogMessage("Hard pause received");
242      heartbeatManager.StopHeartBeat();
243
244      lock (engines) {
245        ClientCom.LogMessage("engines locked");
246        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
247          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
248          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
249          AppDomain.Unload(kvp.Value);
250        }
251      }
252      WcfService.Instance.Disconnect();
253      ClientCom.LogMessage("Hard pause done");
254    }
255
256    public void Shutdown() {
257      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
258      MessageQueue.GetInstance().AddMessage(mc);
259      waitShutdownSem.WaitOne();
260    }
261
262    /// <summary>
263    /// hard shutdown, should be called before the the application is exited
264    /// </summary>
265    private void ShutdownCore() {
266      ClientCom.LogMessage("Shutdown Signal received");
267      ClientCom.LogMessage("Stopping heartbeat");
268      heartbeatManager.StopHeartBeat();
269      abortRequested = true;
270      ClientCom.LogMessage("Logging out");
271
272
273      lock (engines) {
274        ClientCom.LogMessage("engines locked");
275        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
276          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
277          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
278          AppDomain.Unload(kvp.Value);
279        }
280      }
281      WcfService.Instance.Disconnect();
282      ClientCom.Shutdown();
283      SlaveClientCom.Close();
284
285      if (slaveComm.State != CommunicationState.Closed)
286        slaveComm.Close();
287    }
288
289    /// <summary>
290    /// Pauses a job, which means sending it to the server and killing it locally;
291    /// atm only used when executor is waiting for child jobs
292    /// </summary>
293    /// <param name="data"></param>
294    [MethodImpl(MethodImplOptions.Synchronized)]
295    public void PauseJob(JobData data) {
296      if (!Jobs.ContainsKey(data.JobId)) {
297        ClientCom.LogMessage("Can't find job with id " + data.JobId);
298      } else {
299        Job job = Jobs[data.JobId];
300        job.JobState = JobState.WaitingForChildJobs;
301        wcfService.UpdateJob(job, data);
302      }
303      KillAppDomain(data.JobId);
304    }
305
306    /// <summary>
307    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
308    /// once the connection gets reestablished, the job gets submitted
309    /// </summary>
310    /// <param name="jobId"></param>
311    [MethodImpl(MethodImplOptions.Synchronized)]
312    public void SendFinishedJob(object jobId) {
313      try {
314        Guid jId = (Guid)jobId;
315        ClientCom.LogMessage("Getting the finished job with id: " + jId);
316        if (!engines.ContainsKey(jId)) {
317          ClientCom.LogMessage("Engine doesn't exist");
318          return;
319        }
320        if (!jobs.ContainsKey(jId)) {
321          ClientCom.LogMessage("Job doesn't exist");
322          return;
323        }
324        Job cJob = jobs[jId];
325
326        JobData sJob = engines[jId].GetFinishedJob();
327        cJob.Exception = engines[jId].CurrentException;
328        cJob.ExecutionTime = engines[jId].ExecutionTime;
329
330        try {
331          ClientCom.LogMessage("Sending the finished job with id: " + jId);
332          wcfService.UpdateJob(cJob, sJob);
333          SlaveStatusInfo.JobsProcessed++;
334        }
335        catch (Exception e) {
336          ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
337        }
338        finally {
339          KillAppDomain(jId); // kill app-domain in every case
340          heartbeatManager.AwakeHeartBeatThread();
341        }
342      }
343      catch (Exception e) {
344        OnExceptionOccured(e);
345      }
346    }
347
348    /// <summary>
349    /// A new Job from the wcfService has been received and will be started within a AppDomain.
350    /// </summary>
351    /// <param name="sender"></param>
352    /// <param name="e"></param>
353    private void StartJobInAppDomain(Job myJob, JobData jobData) {
354      ClientCom.LogMessage("Received new job with id " + myJob.Id);
355      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
356      bool pluginsPrepared = false;
357
358      try {
359        PluginCache.Instance.PreparePlugins(myJob, jobData);
360        ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
361        pluginsPrepared = true;
362      }
363      catch (Exception exception) {
364        ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
365      }
366
367      if (pluginsPrepared) {
368        try {
369          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName));
370          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
371          lock (engines) {
372            if (!jobs.ContainsKey(myJob.Id)) {
373              jobs.Add(myJob.Id, myJob);
374              appDomains.Add(myJob.Id, appDomain);
375              ClientCom.LogMessage("Creating AppDomain");
376              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
377              ClientCom.LogMessage("Created AppDomain");
378              engine.JobId = myJob.Id;
379              engine.core = this;
380              ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
381              engines.Add(myJob.Id, engine);
382              engine.Start(jobData.Data);
383              SlaveStatusInfo.JobsFetched++;
384              ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
385            }
386          }
387          heartbeatManager.AwakeHeartBeatThread();
388        }
389        catch (Exception exception) {
390          ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
391          ClientCom.LogMessage("Error thrown is: " + exception.ToString());
392          KillAppDomain(myJob.Id);
393        }
394      }
395    }
396
397    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
398    private void OnExceptionOccured(Exception e) {
399      ClientCom.LogMessage("Error: " + e.ToString());
400      var handler = ExceptionOccured;
401      if (handler != null) handler(this, new EventArgs<Exception>(e));
402    }
403
404    private void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
405      ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
406      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
407    }
408
409    /// <summary>
410    /// Enqueues messages from the executor to the message queue.
411    /// This is necessary if the core thread has to execute certain actions, e.g.
412    /// killing of an app domain.
413    /// </summary>
414    /// <typeparam name="T"></typeparam>
415    /// <param name="action"></param>
416    /// <param name="parameter"></param>
417    /// <returns>true if the calling method can continue execution, else false</returns>
418    private bool EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
419      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
420        ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
421        container.Callback = action;
422        container.CallbackParameter = parameter;
423        MessageQueue.GetInstance().AddMessage(container);
424        return false;
425      } else {
426        return true;
427      }
428    }
429
430    /// <summary>
431    /// Kill a appdomain with a specific id.
432    /// </summary>
433    /// <param name="id">the GUID of the job</param>
434    [MethodImpl(MethodImplOptions.Synchronized)]
435    public void KillAppDomain(Guid id) {
436      if (EnqueueExecutorMessage<Guid>(KillAppDomain, id)) {
437        ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
438        lock (engines) {
439          try {
440            if (engines.ContainsKey(id)) {
441              engines[id].Dispose();
442              engines.Remove(id);
443            }
444
445            if (appDomains.ContainsKey(id)) {
446              appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
447
448              int repeat = 5;
449              while (repeat > 0) {
450                try {
451                  AppDomain.Unload(appDomains[id]);
452                  repeat = 0;
453                }
454                catch (CannotUnloadAppDomainException) {
455                  ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
456                  Thread.Sleep(1000);
457                  repeat--;
458                  if (repeat == 0) {
459                    throw; // rethrow and let app crash
460                  }
461                }
462              }
463              appDomains.Remove(id);
464            }
465
466            jobs.Remove(id);
467            PluginCache.Instance.DeletePluginsForJob(id);
468            GC.Collect();
469          }
470          catch (Exception ex) {
471            ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
472          }
473        }
474        GC.Collect();
475      }
476    }
477  }
478}
Note: See TracBrowser for help on using the repository browser.