Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5156

Last change on this file since 5156 was 5156, checked in by ascheibe, 13 years ago

#1233

  • Implemented communication interface between Slave and a Slave Client using Named Pipes and callbacks
  • Added new project for testing Slave - Client communication
  • Added some copyright info headers
File size: 14.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.CompilerServices;
26using System.Threading;
27using HeuristicLab.Clients.Hive.Slave;
28using HeuristicLab.Clients.Hive.Slave.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using HeuristicLab.Services.Hive.Common;
32using HeuristicLab.Services.Hive.Common.DataTransfer;
33
34
35namespace HeuristicLab.Clients.Hive.Salve {
36  /// <summary>
37  /// The core component of the Hive Client
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    public static bool abortRequested { get; set; }
41    public static ILog Log { get; set; }
42
43    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
44    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
45    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
46
47    private WcfService wcfService;
48    private HeartbeatManager heartbeatManager;
49    private int coreThreadId;
50
51    private ISlaveCommunication ClientCom;
52
53    public Dictionary<Guid, Executor> ExecutionEngines {
54      get { return engines; }
55    }
56
57    internal Dictionary<Guid, Job> Jobs {
58      get { return jobs; }
59    }
60
61    public Core() {
62      coreThreadId = Thread.CurrentThread.ManagedThreadId;
63    }
64
65    /// <summary>
66    /// Main Method for the client
67    /// </summary>
68    public void Start() {
69      abortRequested = false;
70
71      ClientCom = SlaveClientCom.Instance.ClientCom;
72      ClientCom.LogMessage("Hive Slave started");
73
74      ConfigManager manager = ConfigManager.Instance;
75      manager.Core = this;
76
77      wcfService = WcfService.Instance;
78      RegisterServiceEvents();
79
80      StartHeartbeats(); // Start heartbeats thread
81      DispatchMessageQueue(); // dispatch messages until abortRequested
82
83      DeRegisterServiceEvents();
84
85      ClientCom.LogMessage("Program shutdown");
86    }
87
88    private void StartHeartbeats() {
89      //Initialize the heartbeat     
90      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
91      heartbeatManager.StartHeartbeat();
92    }
93
94    private void DispatchMessageQueue() {
95      MessageQueue queue = MessageQueue.GetInstance();
96      while (!abortRequested) {
97        MessageContainer container = queue.GetMessage();
98        DetermineAction(container);
99      }
100    }
101
102    private void RegisterServiceEvents() {
103      WcfService.Instance.Connected += new EventHandler(wcfService_Connected);
104      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(wcfService_ExceptionOccured);
105    }
106
107    private void DeRegisterServiceEvents() {
108      WcfService.Instance.Connected -= wcfService_Connected;
109      WcfService.Instance.ExceptionOccured -= wcfService_ExceptionOccured;
110    }
111
112    void wcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
113      ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
114      ShutdownCore();
115    }
116
117    void wcfService_Connected(object sender, EventArgs e) {
118      ClientCom.LogMessage("Connected successfully to Hive server");
119    }
120
121    /// <summary>
122    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
123    /// </summary>
124    /// <param name="container">The Container, containing the message</param>
125    private void DetermineAction(MessageContainer container) {
126      ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
127      //TODO: find a better solution
128      if (container is ExecutorMessageContainer<Guid>) {
129        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
130        c.execute();
131      } else if (container is MessageContainer) {
132        switch (container.Message) {
133          //Server requests to abort a job
134          case MessageContainer.MessageType.AbortJob:
135            if (engines.ContainsKey(container.JobId))
136              try {
137                engines[container.JobId].Abort();
138              }
139              catch (AppDomainUnloadedException) {
140                // appdomain already unloaded. Finishing job probably ongoing
141              } else
142              ClientCom.LogMessage("AbortJob: Engine doesn't exist");
143            break;
144
145          //Pull a Job from the Server
146          case MessageContainer.MessageType.AquireJob:
147            Job myJob = wcfService.AquireJob();
148            //TODO: handle in own thread!!
149            JobData jobData = wcfService.GetJobData(myJob.Id);
150            StartJobInAppDomain(myJob, jobData);
151            break;
152
153          //Hard shutdown of the client
154          case MessageContainer.MessageType.ShutdownSlave:
155            ShutdownCore();
156            break;
157        }
158      } else {
159        ClientCom.LogMessage("Unknown MessageContainer: " + container);
160      }
161    }
162
163    public void ShutdownCore() {
164      ClientCom.LogMessage("Shutdown Signal received");
165      ClientCom.LogMessage("Stopping heartbeat");
166      heartbeatManager.StopHeartBeat();
167      abortRequested = true;
168      ClientCom.LogMessage("Logging out");
169
170      lock (engines) {
171        ClientCom.LogMessage("engines locked");
172        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
173          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
174          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
175          AppDomain.Unload(kvp.Value);
176        }
177      }
178      WcfService.Instance.Disconnect();
179      ClientCom.Shutdown();
180      SlaveClientCom.Close();
181    }
182
183    /// <summary>
184    /// Pauses a job, which means sending it to the server and killing it locally;
185    /// atm only used when executor is waiting for child jobs
186    /// </summary>
187    /// <param name="data"></param>
188    [MethodImpl(MethodImplOptions.Synchronized)]
189    public void PauseJob(JobData data) {
190      if (!Jobs.ContainsKey(data.JobId)) {
191        ClientCom.LogMessage("Can't find job with id " + data.JobId);
192      } else {
193        Job job = Jobs[data.JobId];
194        job.JobState = JobState.WaitingForChildJobs;
195        wcfService.UpdateJob(job, data);
196      }
197      KillAppDomain(data.JobId);
198    }
199
200    /// <summary>
201    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
202    /// once the connection gets reestablished, the job gets submitted
203    /// </summary>
204    /// <param name="jobId"></param>
205    [MethodImpl(MethodImplOptions.Synchronized)]
206    public void SendFinishedJob(object jobId) {
207      try {
208        Guid jId = (Guid)jobId;
209        ClientCom.LogMessage("Getting the finished job with id: " + jId);
210        if (!engines.ContainsKey(jId)) {
211          ClientCom.LogMessage("Engine doesn't exist");
212          return;
213        }
214        if (!jobs.ContainsKey(jId)) {
215          ClientCom.LogMessage("Job doesn't exist");
216          return;
217        }
218        Job cJob = jobs[jId];
219
220        JobData sJob = engines[jId].GetFinishedJob();
221        cJob.Exception = engines[jId].CurrentException;
222        cJob.ExecutionTime = engines[jId].ExecutionTime;
223
224        try {
225          ClientCom.LogMessage("Sending the finished job with id: " + jId);
226          wcfService.UpdateJob(cJob, sJob);
227          SlaveStatusInfo.JobsProcessed++;
228        }
229        catch (Exception e) {
230          ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
231        }
232        finally {
233          KillAppDomain(jId); // kill app-domain in every case
234          heartbeatManager.AwakeHeartBeatThread();
235        }
236      }
237      catch (Exception e) {
238        OnExceptionOccured(e);
239      }
240    }
241
242    /// <summary>
243    /// A new Job from the wcfService has been received and will be started within a AppDomain.
244    /// </summary>
245    /// <param name="sender"></param>
246    /// <param name="e"></param>
247    private void StartJobInAppDomain(Job myJob, JobData jobData) {
248      ClientCom.LogMessage("Received new job with id " + myJob.Id);
249      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
250      bool pluginsPrepared = false;
251
252      try {
253        PluginCache.Instance.PreparePlugins(myJob, jobData);
254        ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
255        pluginsPrepared = true;
256      }
257      catch (Exception exception) {
258        ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
259      }
260
261      if (pluginsPrepared) {
262        try {
263          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName));
264          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
265          lock (engines) {
266            if (!jobs.ContainsKey(myJob.Id)) {
267              jobs.Add(myJob.Id, myJob);
268              appDomains.Add(myJob.Id, appDomain);
269              ClientCom.LogMessage("Creating AppDomain");
270              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
271              ClientCom.LogMessage("Created AppDomain");
272              engine.JobId = myJob.Id;
273              engine.core = this;
274              ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
275              engines.Add(myJob.Id, engine);
276              engine.Start(jobData.Data);
277              SlaveStatusInfo.JobsFetched++;
278              ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
279            }
280          }
281          heartbeatManager.AwakeHeartBeatThread();
282        }
283        catch (Exception exception) {
284          ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
285          ClientCom.LogMessage("Error thrown is: " + exception.ToString());
286          KillAppDomain(myJob.Id);
287        }
288      }
289    }
290
291    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
292    private void OnExceptionOccured(Exception e) {
293      ClientCom.LogMessage("Error: " + e.ToString());
294      var handler = ExceptionOccured;
295      if (handler != null) handler(this, new EventArgs<Exception>(e));
296    }
297
298    private void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
299      ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
300      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
301    }
302
303    /// <summary>
304    /// Enqueues messages from the executor to the message queue.
305    /// This is necessary if the core thread has to execute certain actions, e.g.
306    /// killing of an app domain.
307    /// </summary>
308    /// <typeparam name="T"></typeparam>
309    /// <param name="action"></param>
310    /// <param name="parameter"></param>
311    /// <returns>true if the calling method can continue execution, else false</returns>
312    private bool EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
313      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
314        ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
315        container.Callback = action;
316        container.CallbackParameter = parameter;
317        MessageQueue.GetInstance().AddMessage(container);
318        return false;
319      } else {
320        return true;
321      }
322    }
323
324    /// <summary>
325    /// Kill a appdomain with a specific id.
326    /// </summary>
327    /// <param name="id">the GUID of the job</param>
328    [MethodImpl(MethodImplOptions.Synchronized)]
329    public void KillAppDomain(Guid id) {
330      if (EnqueueExecutorMessage<Guid>(KillAppDomain, id)) {
331        ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
332        lock (engines) {
333          try {
334            if (engines.ContainsKey(id)) {
335              engines[id].Dispose();
336              engines.Remove(id);
337            }
338
339            if (appDomains.ContainsKey(id)) {
340              appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
341
342              int repeat = 5;
343              while (repeat > 0) {
344                try {
345                  AppDomain.Unload(appDomains[id]);
346                  repeat = 0;
347                }
348                catch (CannotUnloadAppDomainException) {
349                  ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
350                  Thread.Sleep(1000);
351                  repeat--;
352                  if (repeat == 0) {
353                    throw; // rethrow and let app crash
354                  }
355                }
356              }
357              appDomains.Remove(id);
358            }
359
360            jobs.Remove(id);
361            PluginCache.Instance.DeletePluginsForJob(id);
362            GC.Collect();
363          }
364          catch (Exception ex) {
365            ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
366          }
367        }
368        GC.Collect();
369      }
370    }
371  }
372}
Note: See TracBrowser for help on using the repository browser.