Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5105

Last change on this file since 5105 was 5105, checked in by ascheibe, 13 years ago

ported most of the slave code to 3.4 #1233

File size: 12.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Services.Hive.Common.DataTransfer;
29using HeuristicLab.Services.Hive.Common;
30using HeuristicLab.Clients.Hive.Slave;
31
32
33namespace HeuristicLab.Clients.Hive.Salve {
34  /// <summary>
35  /// The core component of the Hive Client
36  /// </summary>
37  public class Core : MarshalByRefObject {
38    public static bool abortRequested { get; set; }
39
40    public static ILog Log { get; set; }
41
42    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
43    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
44    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
45
46    private WcfService wcfService;
47    private HeartbeatManager heartbeatManager;
48
49    private bool currentlyFetching;
50    private bool CurrentlyFetching {
51      get {
52        return currentlyFetching;
53      }
54      set {
55        currentlyFetching = value;
56        Logger.Debug("Set CurrentlyFetching to " + currentlyFetching);
57      }
58    }
59
60    public Dictionary<Guid, Executor> ExecutionEngines {
61      get { return engines; }
62    }
63
64    internal Dictionary<Guid, Job> Jobs {   
65      get { return jobs; }
66    }
67
68    /// <summary>
69    /// Main Method for the client
70    /// </summary>
71    public void Start() {
72      abortRequested = false;
73      Logger.Info("Hive Slave started");
74      //TODO: implement slave console server
75      //SlaveConsoleServer server = new SlaveConsoleServer();
76      //server.Start();
77
78      ConfigManager manager = ConfigManager.Instance;
79      manager.Core = this;
80
81      wcfService = WcfService.Instance;
82      RegisterServiceEvents();
83
84      StartHeartbeats(); // Start heartbeats thread
85      DispatchMessageQueue(); // dispatch messages until abortRequested
86
87      DeRegisterServiceEvents();
88     
89      //server.Close();
90      Logger.Info("Program shutdown");
91    }
92
93    private void StartHeartbeats() {
94      //Initialize the heartbeat
95      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
96      heartbeatManager.StartHeartbeat();
97    }
98
99    private void DispatchMessageQueue() {
100      MessageQueue queue = MessageQueue.GetInstance();
101      while (!abortRequested) {
102        MessageContainer container = queue.GetMessage();
103        DetermineAction(container);
104      }
105    }
106
107    private void RegisterServiceEvents() {
108      //TODO
109    }
110
111    private void DeRegisterServiceEvents() {
112      //TODO
113    }
114
115    /// <summary>
116    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
117    /// </summary>
118    /// <param name="container">The Container, containing the message</param>
119    private void DetermineAction(MessageContainer container) {
120      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
121      switch (container.Message) {
122        //Server requests to abort a job
123        case MessageContainer.MessageType.AbortJob:
124          if (engines.ContainsKey(container.JobId))
125            try {
126              engines[container.JobId].Abort();
127            }
128            catch (AppDomainUnloadedException) {
129              // appdomain already unloaded. Finishing job probably ongoing
130            }
131          else
132            Logger.Error("AbortJob: Engine doesn't exist");
133          break;
134                       
135        //Pull a Job from the Server
136        case MessageContainer.MessageType.AquireJob:         
137          if (!CurrentlyFetching) {
138            //TODO: handle in own thread!!
139            Job myJob = wcfService.AquireJob();
140            JobData jobData = wcfService.GetJobData(myJob.Id);
141            wcfService_GetJobCompleted(myJob, jobData);           
142            CurrentlyFetching = true;
143          } else
144            Logger.Info("Currently fetching, won't fetch this time!");
145          break;
146
147        //A Job has finished and can be sent back to the server
148        case MessageContainer.MessageType.JobStopped:
149          SendFinishedJob(container.JobId);
150          break;
151
152        case MessageContainer.MessageType.JobFailed:
153          SendFinishedJob(container.JobId);
154          break;       
155             
156        //Hard shutdown of the client
157        case MessageContainer.MessageType.ShutdownSlave:
158          ShutdownCore();
159          break;
160      }
161    }
162
163    public void ShutdownCore() {
164      Logger.Info("Shutdown Signal received");
165     
166      Logger.Debug("Stopping heartbeat");
167      heartbeatManager.StopHeartBeat();
168      abortRequested = true;     
169      Logger.Debug("Logging out");
170
171      lock (engines) {
172        Logger.Debug("engines locked");
173        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
174          Logger.Debug("Shutting down Appdomain for " + kvp.Key);
175          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
176          AppDomain.Unload(kvp.Value);
177        }
178      }
179     
180      WcfService.Instance.Disconnect();   
181    }
182
183    //TODO: make synchronized
184    //TODO: start own thread?
185    //only called when waiting for child jobs?
186    public void PauseJob(JobData data ) {
187      if(!Jobs.ContainsKey(data.JobId)) {
188        //TODO: error handling
189      }
190      Job job = Jobs[data.JobId];
191      job.JobState = JobState.WaitingForChildJobs;
192
193      wcfService.UpdateJob(job, data);     
194      KillAppDomain(data.JobId);     
195    }
196           
197   
198    /// <summary>
199    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
200    /// once the connection gets reestablished, the job gets submitted
201    /// </summary>
202    /// <param name="jobId"></param>
203    public void SendFinishedJob(object jobId) {
204      try {
205        Guid jId = (Guid)jobId;
206        Logger.Info("Getting the finished job with id: " + jId);
207        if (!engines.ContainsKey(jId)) {
208          Logger.Info("Engine doesn't exist");
209          return;
210        } 
211        if (!jobs.ContainsKey(jId)) {
212          Logger.Info("Job doesn't exist");
213          return;
214        }
215        Job cJob = jobs[jId];
216
217        JobData sJob = engines[jId].GetFinishedJob();
218        cJob.Exception = engines[jId].CurrentException;
219        cJob.ExecutionTime = engines[jId].ExecutionTime;
220       
221        try {
222          Logger.Info("Sending the finished job with id: " + jId);
223          wcfService.UpdateJob(cJob, sJob);
224          SlaveStatusInfo.JobsProcessed++;         
225        }
226        catch (Exception e) {
227          Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");         
228        }
229        finally {
230          KillAppDomain(jId); // kill app-domain in every case
231          heartbeatManager.AwakeHeartBeatThread();
232        }
233      }
234      catch (Exception e) {
235        OnExceptionOccured(e);
236      }
237    }
238 
239    /// <summary>
240    /// A new Job from the wcfService has been received and will be started within a AppDomain.
241    /// </summary>
242    /// <param name="sender"></param>
243    /// <param name="e"></param>
244    void wcfService_GetJobCompleted(Job myJob, JobData jobData) {     
245        Logger.Info("Received new job with id " + myJob.Id);
246        String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
247        bool pluginsPrepared = false;
248
249        try {       
250          PluginCache.Instance.PreparePlugins(myJob, jobData);
251          Logger.Debug("Plugins fetched for job " + myJob.Id);
252          pluginsPrepared = true;
253        }
254        catch (Exception exception) {
255          Logger.Error(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
256        }
257
258        if (pluginsPrepared) {
259          try {
260            AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName));
261            appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
262            lock (engines) {
263              if (!jobs.ContainsKey(myJob.Id)) {
264                jobs.Add(myJob.Id, myJob);
265                appDomains.Add(myJob.Id, appDomain);
266                Logger.Debug("Creating AppDomain");
267                Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
268                Logger.Debug("Created AppDomain");
269                engine.JobId = myJob.Id;
270                engine.core = this;
271                Logger.Debug("Starting Engine for job " + myJob.Id);
272                engines.Add(myJob.Id, engine);
273                engine.Start(jobData.Data);
274                SlaveStatusInfo.JobsFetched++;
275                Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
276              }
277            }
278            heartbeatManager.AwakeHeartBeatThread();
279          }
280          catch (Exception exception) {
281            Logger.Error("Creating the Appdomain and loading the job failed for job " + myJob.Id);
282            Logger.Error("Error thrown is: ", exception);
283            CurrentlyFetching = false;
284            KillAppDomain(myJob.Id);
285          }
286        }     
287      CurrentlyFetching = false;
288    }
289 
290    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
291    private void OnExceptionOccured(Exception e) {
292      Logger.Error("Error: " + e.ToString());
293      var handler = ExceptionOccured;
294      if (handler != null) handler(this, new EventArgs<Exception>(e));
295    }
296
297    void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
298      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
299      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
300    }
301
302    /// <summary>
303    /// Kill a appdomain with a specific id.
304    /// </summary>
305    /// <param name="id">the GUID of the job</param>
306    public void KillAppDomain(Guid id) {
307      Logger.Debug("Shutting down Appdomain for Job " + id);
308      lock (engines) {
309        try {
310          if (engines.ContainsKey(id))
311            engines[id].Dispose();
312          if (appDomains.ContainsKey(id)) {
313            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
314
315            int repeat = 5;
316            while (repeat > 0) {
317              try {
318                AppDomain.Unload(appDomains[id]);
319                repeat = 0;
320              }
321              catch (CannotUnloadAppDomainException) {
322                Logger.Error("Could not unload AppDomain, will try again in 1 sec.");
323                Thread.Sleep(1000);
324                repeat--;
325                if (repeat == 0) {
326                  throw; // rethrow and let app crash
327                }
328              }
329            }
330            appDomains.Remove(id);
331          }
332
333          engines.Remove(id);
334          jobs.Remove(id);
335          PluginCache.Instance.DeletePluginsForJob(id);
336          GC.Collect();
337        }
338        catch (Exception ex) {
339          Logger.Error("Exception when unloading the appdomain: ", ex);
340        }
341      }
342      GC.Collect();
343    }
344  }   
345
346
347}
Note: See TracBrowser for help on using the repository browser.