Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5158

Last change on this file since 5158 was 5158, checked in by ascheibe, 14 years ago

Adapt test cases to slave - client communication. #1233

File size: 13.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.CompilerServices;
26using System.Threading;
27using HeuristicLab.Clients.Hive.Slave;
28using HeuristicLab.Clients.Hive.Slave.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using HeuristicLab.Services.Hive.Common;
32using HeuristicLab.Services.Hive.Common.DataTransfer;
33
34
35namespace HeuristicLab.Clients.Hive.Salve {
36  /// <summary>
37  /// The core component of the Hive Client
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    public static bool abortRequested { get; set; }
41    public static ILog Log { get; set; }
42
43    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
44    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
45    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
46
47    private WcfService wcfService;
48    private HeartbeatManager heartbeatManager;
49    private int coreThreadId;
50
51    private ISlaveCommunication ClientCom;
52
53    public Dictionary<Guid, Executor> ExecutionEngines {
54      get { return engines; }
55    }
56
57    internal Dictionary<Guid, Job> Jobs {
58      get { return jobs; }
59    }
60
61    public Core() {
62      coreThreadId = Thread.CurrentThread.ManagedThreadId;
63    }
64
65    /// <summary>
66    /// Main Method for the client
67    /// </summary>
68    public void Start() {
69      abortRequested = false;
70
71      ClientCom = SlaveClientCom.Instance.ClientCom;
72      ClientCom.LogMessage("Hive Slave started");
73
74      ConfigManager manager = ConfigManager.Instance;
75      manager.Core = this;
76
77      wcfService = WcfService.Instance;
78      RegisterServiceEvents();
79
80      StartHeartbeats(); // Start heartbeats thread
81      DispatchMessageQueue(); // dispatch messages until abortRequested
82
83      DeRegisterServiceEvents();
84    }
85
86    private void StartHeartbeats() {
87      //Initialize the heartbeat     
88      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
89      heartbeatManager.StartHeartbeat();
90    }
91
92    private void DispatchMessageQueue() {
93      MessageQueue queue = MessageQueue.GetInstance();
94      while (!abortRequested) {
95        MessageContainer container = queue.GetMessage();
96        DetermineAction(container);
97      }
98    }
99
100    private void RegisterServiceEvents() {
101      WcfService.Instance.Connected += new EventHandler(wcfService_Connected);
102      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(wcfService_ExceptionOccured);
103    }
104
105    private void DeRegisterServiceEvents() {
106      WcfService.Instance.Connected -= wcfService_Connected;
107      WcfService.Instance.ExceptionOccured -= wcfService_ExceptionOccured;
108    }
109
110    void wcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
111      ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
112      ShutdownCore();
113    }
114
115    void wcfService_Connected(object sender, EventArgs e) {
116      ClientCom.LogMessage("Connected successfully to Hive server");
117    }
118
119    /// <summary>
120    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
121    /// </summary>
122    /// <param name="container">The Container, containing the message</param>
123    private void DetermineAction(MessageContainer container) {
124      ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
125      //TODO: find a better solution
126      if (container is ExecutorMessageContainer<Guid>) {
127        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
128        c.execute();
129      } else if (container is MessageContainer) {
130        switch (container.Message) {
131          //Server requests to abort a job
132          case MessageContainer.MessageType.AbortJob:
133            if (engines.ContainsKey(container.JobId))
134              try {
135                engines[container.JobId].Abort();
136              }
137              catch (AppDomainUnloadedException) {
138                // appdomain already unloaded. Finishing job probably ongoing
139              } else
140              ClientCom.LogMessage("AbortJob: Engine doesn't exist");
141            break;
142
143          //Pull a Job from the Server
144          case MessageContainer.MessageType.AquireJob:
145            Job myJob = wcfService.AquireJob();
146            //TODO: handle in own thread!!
147            JobData jobData = wcfService.GetJobData(myJob.Id);
148            StartJobInAppDomain(myJob, jobData);
149            break;
150
151          //Hard shutdown of the client
152          case MessageContainer.MessageType.ShutdownSlave:
153            ShutdownCore();
154            break;
155        }
156      } else {
157        ClientCom.LogMessage("Unknown MessageContainer: " + container);
158      }
159    }
160
161    public void ShutdownCore() {
162      ClientCom.LogMessage("Shutdown Signal received");
163      ClientCom.LogMessage("Stopping heartbeat");
164      heartbeatManager.StopHeartBeat();
165      abortRequested = true;
166      ClientCom.LogMessage("Logging out");
167
168      lock (engines) {
169        ClientCom.LogMessage("engines locked");
170        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
171          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
172          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
173          AppDomain.Unload(kvp.Value);
174        }
175      }
176      WcfService.Instance.Disconnect();
177      ClientCom.Shutdown();
178      SlaveClientCom.Close();
179    }
180
181    /// <summary>
182    /// Pauses a job, which means sending it to the server and killing it locally;
183    /// atm only used when executor is waiting for child jobs
184    /// </summary>
185    /// <param name="data"></param>
186    [MethodImpl(MethodImplOptions.Synchronized)]
187    public void PauseJob(JobData data) {
188      if (!Jobs.ContainsKey(data.JobId)) {
189        ClientCom.LogMessage("Can't find job with id " + data.JobId);
190      } else {
191        Job job = Jobs[data.JobId];
192        job.JobState = JobState.WaitingForChildJobs;
193        wcfService.UpdateJob(job, data);
194      }
195      KillAppDomain(data.JobId);
196    }
197
198    /// <summary>
199    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
200    /// once the connection gets reestablished, the job gets submitted
201    /// </summary>
202    /// <param name="jobId"></param>
203    [MethodImpl(MethodImplOptions.Synchronized)]
204    public void SendFinishedJob(object jobId) {
205      try {
206        Guid jId = (Guid)jobId;
207        ClientCom.LogMessage("Getting the finished job with id: " + jId);
208        if (!engines.ContainsKey(jId)) {
209          ClientCom.LogMessage("Engine doesn't exist");
210          return;
211        }
212        if (!jobs.ContainsKey(jId)) {
213          ClientCom.LogMessage("Job doesn't exist");
214          return;
215        }
216        Job cJob = jobs[jId];
217
218        JobData sJob = engines[jId].GetFinishedJob();
219        cJob.Exception = engines[jId].CurrentException;
220        cJob.ExecutionTime = engines[jId].ExecutionTime;
221
222        try {
223          ClientCom.LogMessage("Sending the finished job with id: " + jId);
224          wcfService.UpdateJob(cJob, sJob);
225          SlaveStatusInfo.JobsProcessed++;
226        }
227        catch (Exception e) {
228          ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
229        }
230        finally {
231          KillAppDomain(jId); // kill app-domain in every case
232          heartbeatManager.AwakeHeartBeatThread();
233        }
234      }
235      catch (Exception e) {
236        OnExceptionOccured(e);
237      }
238    }
239
240    /// <summary>
241    /// A new Job from the wcfService has been received and will be started within a AppDomain.
242    /// </summary>
243    /// <param name="sender"></param>
244    /// <param name="e"></param>
245    private void StartJobInAppDomain(Job myJob, JobData jobData) {
246      ClientCom.LogMessage("Received new job with id " + myJob.Id);
247      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
248      bool pluginsPrepared = false;
249
250      try {
251        PluginCache.Instance.PreparePlugins(myJob, jobData);
252        ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
253        pluginsPrepared = true;
254      }
255      catch (Exception exception) {
256        ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
257      }
258
259      if (pluginsPrepared) {
260        try {
261          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName));
262          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
263          lock (engines) {
264            if (!jobs.ContainsKey(myJob.Id)) {
265              jobs.Add(myJob.Id, myJob);
266              appDomains.Add(myJob.Id, appDomain);
267              ClientCom.LogMessage("Creating AppDomain");
268              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
269              ClientCom.LogMessage("Created AppDomain");
270              engine.JobId = myJob.Id;
271              engine.core = this;
272              ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
273              engines.Add(myJob.Id, engine);
274              engine.Start(jobData.Data);
275              SlaveStatusInfo.JobsFetched++;
276              ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
277            }
278          }
279          heartbeatManager.AwakeHeartBeatThread();
280        }
281        catch (Exception exception) {
282          ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
283          ClientCom.LogMessage("Error thrown is: " + exception.ToString());
284          KillAppDomain(myJob.Id);
285        }
286      }
287    }
288
289    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
290    private void OnExceptionOccured(Exception e) {
291      ClientCom.LogMessage("Error: " + e.ToString());
292      var handler = ExceptionOccured;
293      if (handler != null) handler(this, new EventArgs<Exception>(e));
294    }
295
296    private void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
297      ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
298      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
299    }
300
301    /// <summary>
302    /// Enqueues messages from the executor to the message queue.
303    /// This is necessary if the core thread has to execute certain actions, e.g.
304    /// killing of an app domain.
305    /// </summary>
306    /// <typeparam name="T"></typeparam>
307    /// <param name="action"></param>
308    /// <param name="parameter"></param>
309    /// <returns>true if the calling method can continue execution, else false</returns>
310    private bool EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
311      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
312        ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
313        container.Callback = action;
314        container.CallbackParameter = parameter;
315        MessageQueue.GetInstance().AddMessage(container);
316        return false;
317      } else {
318        return true;
319      }
320    }
321
322    /// <summary>
323    /// Kill a appdomain with a specific id.
324    /// </summary>
325    /// <param name="id">the GUID of the job</param>
326    [MethodImpl(MethodImplOptions.Synchronized)]
327    public void KillAppDomain(Guid id) {
328      if (EnqueueExecutorMessage<Guid>(KillAppDomain, id)) {
329        ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
330        lock (engines) {
331          try {
332            if (engines.ContainsKey(id)) {
333              engines[id].Dispose();
334              engines.Remove(id);
335            }
336
337            if (appDomains.ContainsKey(id)) {
338              appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
339
340              int repeat = 5;
341              while (repeat > 0) {
342                try {
343                  AppDomain.Unload(appDomains[id]);
344                  repeat = 0;
345                }
346                catch (CannotUnloadAppDomainException) {
347                  ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
348                  Thread.Sleep(1000);
349                  repeat--;
350                  if (repeat == 0) {
351                    throw; // rethrow and let app crash
352                  }
353                }
354              }
355              appDomains.Remove(id);
356            }
357
358            jobs.Remove(id);
359            PluginCache.Instance.DeletePluginsForJob(id);
360            GC.Collect();
361          }
362          catch (Exception ex) {
363            ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
364          }
365        }
366        GC.Collect();
367      }
368    }
369  }
370}
Note: See TracBrowser for help on using the repository browser.