Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5314

Last change on this file since 5314 was 5314, checked in by ascheibe, 14 years ago

#1233

  • added ItemView and Item for the Slave
  • added a Tray Icon App for data visualization and control of the slave windows service
  • added control methods to SlaveCommunication for controlling the slave core
  • fixed typo in namespace
File size: 17.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.CompilerServices;
26using System.ServiceModel;
27using System.Threading;
28using HeuristicLab.Clients.Hive.Slave.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31using HeuristicLab.Services.Hive.Common;
32using HeuristicLab.Services.Hive.Common.DataTransfer;
33
34
35namespace HeuristicLab.Clients.Hive.Slave {
36  /// <summary>
37  /// The core component of the Hive Client
38  /// </summary>
39  public class Core : MarshalByRefObject {
40    public static bool abortRequested { get; set; }
41    private Semaphore waitShutdownSem = new Semaphore(0, 1);
42    public static ILog Log { get; set; }
43
44    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
45    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
46    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
47
48    private WcfService wcfService;
49    private HeartbeatManager heartbeatManager;
50    private int coreThreadId;
51
52    private ISlaveCommunication ClientCom;
53    private ServiceHost slaveComm;
54
55    public Dictionary<Guid, Executor> ExecutionEngines {
56      get { return engines; }
57    }
58
59    internal Dictionary<Guid, Job> Jobs {
60      get { return jobs; }
61    }
62
63    public Core() {
64    }
65
66    /// <summary>
67    /// Main Method for the client
68    /// </summary>
69    public void Start() {
70      coreThreadId = Thread.CurrentThread.ManagedThreadId;
71      abortRequested = false;
72
73      //start the client communication service (pipe between slave and slave gui)
74      slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
75      slaveComm.Open();
76
77      ClientCom = SlaveClientCom.Instance.ClientCom;
78      ClientCom.LogMessage("Hive Slave started");
79
80      ConfigManager manager = ConfigManager.Instance;
81      manager.Core = this;
82
83      wcfService = WcfService.Instance;
84      RegisterServiceEvents();
85
86      StartHeartbeats(); // Start heartbeats thread
87      DispatchMessageQueue(); // dispatch messages until abortRequested
88
89      DeRegisterServiceEvents();
90      waitShutdownSem.Release();
91    }
92
93    private void StartHeartbeats() {
94      //Initialize the heartbeat     
95      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
96      heartbeatManager.StartHeartbeat();
97    }
98
99    private void DispatchMessageQueue() {
100      MessageQueue queue = MessageQueue.GetInstance();
101      while (!abortRequested) {
102        MessageContainer container = queue.GetMessage();
103        DetermineAction(container);
104      }
105    }
106
107    private void RegisterServiceEvents() {
108      WcfService.Instance.Connected += new EventHandler(wcfService_Connected);
109      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(wcfService_ExceptionOccured);
110    }
111
112    private void DeRegisterServiceEvents() {
113      WcfService.Instance.Connected -= wcfService_Connected;
114      WcfService.Instance.ExceptionOccured -= wcfService_ExceptionOccured;
115    }
116
117    void wcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
118      ClientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
119    }
120
121    void wcfService_Connected(object sender, EventArgs e) {
122      ClientCom.LogMessage("Connected successfully to Hive server");
123    }
124
125    /// <summary>
126    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
127    /// </summary>
128    /// <param name="container">The Container, containing the message</param>
129    private void DetermineAction(MessageContainer container) {
130      ClientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
131      //TODO: find a better solution
132      if (container is ExecutorMessageContainer<Guid>) {
133        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
134        c.execute();
135      } else if (container is MessageContainer) {
136        switch (container.Message) {
137          //Server requests to abort a job
138          case MessageContainer.MessageType.AbortJob:
139            if (engines.ContainsKey(container.JobId))
140              try {
141                engines[container.JobId].Abort();
142              }
143              catch (AppDomainUnloadedException) {
144                // appdomain already unloaded. Finishing job probably ongoing
145              } else
146              ClientCom.LogMessage("AbortJob: Engine doesn't exist");
147            break;
148
149          //Pull a Job from the Server
150          case MessageContainer.MessageType.AquireJob:
151            Job myJob = wcfService.AquireJob();
152            //TODO: handle in own thread!!
153            JobData jobData = wcfService.GetJobData(myJob.Id);
154            StartJobInAppDomain(myJob, jobData);
155            break;
156
157          //Hard shutdown of the client
158          case MessageContainer.MessageType.ShutdownSlave:
159            ShutdownCore();
160            break;
161          case MessageContainer.MessageType.HardPause:
162            doHardPause();
163            break;
164          case MessageContainer.MessageType.SoftPause:
165            doSoftPause();
166            break;
167          case MessageContainer.MessageType.Restart:
168            doRestart();
169            break;
170        }
171      } else {
172        ClientCom.LogMessage("Unknown MessageContainer: " + container);
173      }
174    }
175
176    /// <summary>
177    /// reinitializes everything and continues operation,
178    /// can be called after SoftPause() or HardPause()
179    /// </summary>
180    public void Restart() {
181      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.Restart);
182      MessageQueue.GetInstance().AddMessage(mc);
183    }
184
185    private void doRestart() {
186      ClientCom.LogMessage("Restart received");
187      StartHeartbeats();
188      ClientCom.LogMessage("Restart done");
189    }
190
191    /// <summary>
192    /// wait for jobs to finish, then pause client
193    /// </summary>
194    public void SoftPause() {
195      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.SoftPause);
196      MessageQueue.GetInstance().AddMessage(mc);
197    }
198
199    private void doSoftPause() {
200      ClientCom.LogMessage("Soft pause received");
201
202      //TODO: jobs get removed from Jobs map, is this a problem?
203      foreach (Job job in Jobs.Values) {
204        engines[job.Id].Pause();
205        JobData sJob = engines[job.Id].GetFinishedJob();
206        job.Exception = engines[job.Id].CurrentException;
207        job.ExecutionTime = engines[job.Id].ExecutionTime;
208
209        try {
210          ClientCom.LogMessage("Sending the paused job with id: " + job.Id);
211          wcfService.UpdateJob(job, sJob);
212          SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
213        }
214        catch (Exception e) {
215          ClientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
216        }
217        finally {
218          KillAppDomain(job.Id); // kill app-domain in every case         
219        }
220      }
221
222      heartbeatManager.StopHeartBeat();
223      WcfService.Instance.Disconnect();
224      ClientCom.LogMessage("Soft pause done");
225    }
226
227    /// <summary>
228    /// pause slave immediately
229    /// </summary>
230    public void HardPause() {
231      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.HardPause);
232      MessageQueue.GetInstance().AddMessage(mc);
233    }
234
235    private void doHardPause() {
236      ClientCom.LogMessage("Hard pause received");
237      heartbeatManager.StopHeartBeat();
238
239      lock (engines) {
240        ClientCom.LogMessage("engines locked");
241        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
242          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
243          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
244          AppDomain.Unload(kvp.Value);
245        }
246      }
247      WcfService.Instance.Disconnect();
248      ClientCom.LogMessage("Hard pause done");
249    }
250
251    public void Shutdown() {
252      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
253      MessageQueue.GetInstance().AddMessage(mc);
254      waitShutdownSem.WaitOne();
255    }
256
257    /// <summary>
258    /// hard shutdown, should be called before the the application is exited
259    /// </summary>
260    private void ShutdownCore() {
261      ClientCom.LogMessage("Shutdown Signal received");
262      ClientCom.LogMessage("Stopping heartbeat");
263      heartbeatManager.StopHeartBeat();
264      abortRequested = true;
265      ClientCom.LogMessage("Logging out");
266
267
268      lock (engines) {
269        ClientCom.LogMessage("engines locked");
270        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
271          ClientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
272          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
273          AppDomain.Unload(kvp.Value);
274        }
275      }
276      WcfService.Instance.Disconnect();
277      ClientCom.Shutdown();
278      SlaveClientCom.Close();
279
280      if (slaveComm.State != CommunicationState.Closed)
281        slaveComm.Close();
282    }
283
284    /// <summary>
285    /// Pauses a job, which means sending it to the server and killing it locally;
286    /// atm only used when executor is waiting for child jobs
287    /// </summary>
288    /// <param name="data"></param>
289    [MethodImpl(MethodImplOptions.Synchronized)]
290    public void PauseJob(JobData data) {
291      if (!Jobs.ContainsKey(data.JobId)) {
292        ClientCom.LogMessage("Can't find job with id " + data.JobId);
293      } else {
294        Job job = Jobs[data.JobId];
295        job.JobState = JobState.WaitingForChildJobs;
296        wcfService.UpdateJob(job, data);
297      }
298      KillAppDomain(data.JobId);
299    }
300
301    /// <summary>
302    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
303    /// once the connection gets reestablished, the job gets submitted
304    /// </summary>
305    /// <param name="jobId"></param>
306    [MethodImpl(MethodImplOptions.Synchronized)]
307    public void SendFinishedJob(object jobId) {
308      try {
309        Guid jId = (Guid)jobId;
310        ClientCom.LogMessage("Getting the finished job with id: " + jId);
311        if (!engines.ContainsKey(jId)) {
312          ClientCom.LogMessage("Engine doesn't exist");
313          return;
314        }
315        if (!jobs.ContainsKey(jId)) {
316          ClientCom.LogMessage("Job doesn't exist");
317          return;
318        }
319        Job cJob = jobs[jId];
320
321        JobData sJob = engines[jId].GetFinishedJob();
322        cJob.Exception = engines[jId].CurrentException;
323        cJob.ExecutionTime = engines[jId].ExecutionTime;
324
325        try {
326          ClientCom.LogMessage("Sending the finished job with id: " + jId);
327          wcfService.UpdateJob(cJob, sJob);
328          SlaveStatusInfo.JobsProcessed++;
329        }
330        catch (Exception e) {
331          ClientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
332        }
333        finally {
334          KillAppDomain(jId); // kill app-domain in every case
335          heartbeatManager.AwakeHeartBeatThread();
336        }
337      }
338      catch (Exception e) {
339        OnExceptionOccured(e);
340      }
341    }
342
343    /// <summary>
344    /// A new Job from the wcfService has been received and will be started within a AppDomain.
345    /// </summary>
346    /// <param name="sender"></param>
347    /// <param name="e"></param>
348    private void StartJobInAppDomain(Job myJob, JobData jobData) {
349      ClientCom.LogMessage("Received new job with id " + myJob.Id);
350      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
351      bool pluginsPrepared = false;
352
353      try {
354        PluginCache.Instance.PreparePlugins(myJob, jobData);
355        ClientCom.LogMessage("Plugins fetched for job " + myJob.Id);
356        pluginsPrepared = true;
357      }
358      catch (Exception exception) {
359        ClientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
360      }
361
362      if (pluginsPrepared) {
363        try {
364          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName));
365          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
366          lock (engines) {
367            if (!jobs.ContainsKey(myJob.Id)) {
368              jobs.Add(myJob.Id, myJob);
369              appDomains.Add(myJob.Id, appDomain);
370              ClientCom.LogMessage("Creating AppDomain");
371              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
372              ClientCom.LogMessage("Created AppDomain");
373              engine.JobId = myJob.Id;
374              engine.core = this;
375              ClientCom.LogMessage("Starting Engine for job " + myJob.Id);
376              engines.Add(myJob.Id, engine);
377              engine.Start(jobData.Data);
378              SlaveStatusInfo.JobsFetched++;
379              ClientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
380            }
381          }
382          heartbeatManager.AwakeHeartBeatThread();
383        }
384        catch (Exception exception) {
385          ClientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
386          ClientCom.LogMessage("Error thrown is: " + exception.ToString());
387          KillAppDomain(myJob.Id);
388        }
389      }
390    }
391
392    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
393    private void OnExceptionOccured(Exception e) {
394      ClientCom.LogMessage("Error: " + e.ToString());
395      var handler = ExceptionOccured;
396      if (handler != null) handler(this, new EventArgs<Exception>(e));
397    }
398
399    private void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
400      ClientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
401      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
402    }
403
404    /// <summary>
405    /// Enqueues messages from the executor to the message queue.
406    /// This is necessary if the core thread has to execute certain actions, e.g.
407    /// killing of an app domain.
408    /// </summary>
409    /// <typeparam name="T"></typeparam>
410    /// <param name="action"></param>
411    /// <param name="parameter"></param>
412    /// <returns>true if the calling method can continue execution, else false</returns>
413    private bool EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
414      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
415        ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
416        container.Callback = action;
417        container.CallbackParameter = parameter;
418        MessageQueue.GetInstance().AddMessage(container);
419        return false;
420      } else {
421        return true;
422      }
423    }
424
425    /// <summary>
426    /// Kill a appdomain with a specific id.
427    /// </summary>
428    /// <param name="id">the GUID of the job</param>
429    [MethodImpl(MethodImplOptions.Synchronized)]
430    public void KillAppDomain(Guid id) {
431      if (EnqueueExecutorMessage<Guid>(KillAppDomain, id)) {
432        ClientCom.LogMessage("Shutting down Appdomain for Job " + id);
433        lock (engines) {
434          try {
435            if (engines.ContainsKey(id)) {
436              engines[id].Dispose();
437              engines.Remove(id);
438            }
439
440            if (appDomains.ContainsKey(id)) {
441              appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
442
443              int repeat = 5;
444              while (repeat > 0) {
445                try {
446                  AppDomain.Unload(appDomains[id]);
447                  repeat = 0;
448                }
449                catch (CannotUnloadAppDomainException) {
450                  ClientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
451                  Thread.Sleep(1000);
452                  repeat--;
453                  if (repeat == 0) {
454                    throw; // rethrow and let app crash
455                  }
456                }
457              }
458              appDomains.Remove(id);
459            }
460
461            jobs.Remove(id);
462            PluginCache.Instance.DeletePluginsForJob(id);
463            GC.Collect();
464          }
465          catch (Exception ex) {
466            ClientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
467          }
468        }
469        GC.Collect();
470      }
471    }
472  }
473}
Note: See TracBrowser for help on using the repository browser.