Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 5797

Last change on this file since 5797 was 5793, checked in by cneumuel, 14 years ago

#1233

  • implemented correct downloading of paused jobs. its now also possible to change parameters and resume a algorithm
  • removed Prepare() calls in ExperimentManager and in slave, as it prevents corrent resuming of paused jobs
  • made events in ItemTreeView be invoked in the correct thread
  • reduced log output in ExperimentManager
File size: 20.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Diagnostics;
25using System.IO;
26using System.Runtime.CompilerServices;
27using System.ServiceModel;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
31using HeuristicLab.Common;
32using HeuristicLab.Core;
33
34
35
36namespace HeuristicLab.Clients.Hive.SlaveCore {
37  /// <summary>
38  /// The core component of the Hive Client
39  /// </summary>
40  public class Core : MarshalByRefObject {
41
42    //TODO: this class should be a singleton; there is only one instance, the reference is meanwhile save in TheCore
43    public static Core TheCore;
44
45    public EventLog ServiceEventLog { get; set; }
46
47    public static bool abortRequested { get; set; }
48    private Semaphore waitShutdownSem = new Semaphore(0, 1);
49    public static ILog Log { get; set; }
50
51    private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>();
52    private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>();
53    private Dictionary<Guid, Job> jobs = new Dictionary<Guid, Job>();
54
55    private WcfService wcfService;
56    private HeartbeatManager heartbeatManager;
57    private int coreThreadId;
58
59    private ISlaveCommunication clientCom;
60    private ServiceHost slaveComm;
61
62    public Dictionary<Guid, Executor> ExecutionEngines {
63      get { return engines; }
64    }
65
66    internal Dictionary<Guid, Job> Jobs {
67      get { return jobs; }
68    }
69
70    public Core() {
71      TheCore = this;
72    }
73
74    /// <summary>
75    /// Main Method for the client
76    /// </summary>
77    public void Start() {
78      coreThreadId = Thread.CurrentThread.ManagedThreadId;
79      abortRequested = false;
80
81      try {
82        //start the client communication service (pipe between slave and slave gui)
83        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
84        slaveComm.Open();
85
86        clientCom = SlaveClientCom.Instance.ClientCom;
87        clientCom.LogMessage("Hive Slave started");
88
89        ConfigManager manager = ConfigManager.Instance;
90        manager.Core = this;
91
92        wcfService = WcfService.Instance;
93        RegisterServiceEvents();
94
95        StartHeartbeats(); // Start heartbeats thread
96        DispatchMessageQueue(); // dispatch messages until abortRequested
97      }
98      catch (Exception ex) {
99        if (ServiceEventLog != null) {
100          try {
101            ServiceEventLog.WriteEntry("Hive Slave threw exception: " + ex.ToString() + " with stack trace: " + ex.StackTrace);
102          }
103          catch (Exception) { }
104        } else {
105          throw ex;
106        }
107      }
108      finally {
109        DeRegisterServiceEvents();
110        waitShutdownSem.Release();
111      }
112    }
113
114    private void StartHeartbeats() {
115      //Initialize the heartbeat     
116      if (heartbeatManager == null) {
117        heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
118        heartbeatManager.StartHeartbeat();
119      }
120    }
121
122    private void DispatchMessageQueue() {
123      MessageQueue queue = MessageQueue.GetInstance();
124      while (!abortRequested) {
125        MessageContainer container = queue.GetMessage();
126        DetermineAction(container);
127      }
128    }
129
130    private void RegisterServiceEvents() {
131      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
132      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
133    }
134
135    private void DeRegisterServiceEvents() {
136      WcfService.Instance.Connected -= WcfService_Connected;
137      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
138    }
139
140    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
141      clientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
142    }
143
144    void WcfService_Connected(object sender, EventArgs e) {
145      clientCom.LogMessage("Connected successfully to Hive server");
146    }
147
148    /// <summary>
149    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
150    /// </summary>
151    /// <param name="container">The Container, containing the message</param>
152    private void DetermineAction(MessageContainer container) {
153      clientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
154
155      if (container is ExecutorMessageContainer<Guid>) {
156        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
157        c.execute();
158      } else if (container is MessageContainer) {
159        switch (container.Message) {
160          case MessageContainer.MessageType.CalculateJob:
161            Task.Factory.StartNew((jobIdObj) => {
162              Guid jobId = (Guid)jobIdObj;
163              Job job = wcfService.GetJob(jobId);
164              if (job == null) throw new JobNotFoundException(jobId);
165              lock (engines) {
166                if (!jobs.ContainsKey(job.Id)) {
167                  jobs.Add(job.Id, job);
168                }
169              }
170              JobData jobData = wcfService.GetJobData(job.Id);
171              if (job == null) throw new JobDataNotFoundException(jobId);
172              job = wcfService.UpdateJobState(job.Id, JobState.Calculating, null);
173              StartJobInAppDomain(job, jobData);
174            }, container.JobId)
175            .ContinueWith((t) => {
176              // handle exception of task
177              clientCom.LogMessage(t.Exception.ToString());
178            }, TaskContinuationOptions.OnlyOnFaulted);
179            break;
180          case MessageContainer.MessageType.ShutdownSlave:
181            ShutdownCore();
182            break;
183          case MessageContainer.MessageType.StopAll:
184            DoStopAll();
185            break;
186          case MessageContainer.MessageType.PauseAll:
187            DoPauseAll();
188            break;
189          case MessageContainer.MessageType.AbortAll:
190            DoAbortAll();
191            break;
192          case MessageContainer.MessageType.AbortJob:
193            KillAppDomain(container.JobId);
194            break;
195          case MessageContainer.MessageType.StopJob:
196            DoStopJob(container.JobId);
197            break;
198          case MessageContainer.MessageType.PauseJob:
199            DoPauseJob(container.JobId);
200            break;
201          case MessageContainer.MessageType.Restart:
202            DoStartSlave();
203            break;
204          case MessageContainer.MessageType.Sleep:
205            Sleep();
206            break;
207          case MessageContainer.MessageType.SayHello:
208            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
209            break;
210        }
211      } else {
212        clientCom.LogMessage("Unknown MessageContainer: " + container);
213      }
214    }
215
216    private void DoPauseJob(Guid jobId) {
217      if (!Jobs.ContainsKey(jobId)) {
218        clientCom.LogMessage("DoPauseJob: Can't find job with id " + jobId);
219      } else {
220        Job job = Jobs[jobId];
221
222        if (job != null) {
223          engines[job.Id].Pause();
224          JobData sJob = engines[job.Id].GetPausedJob();
225          job.ExecutionTime = engines[job.Id].ExecutionTime;
226
227          try {
228            clientCom.LogMessage("Sending the paused job with id: " + job.Id);
229            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
230            SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
231          }
232          catch (Exception e) {
233            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
234          }
235          finally {
236            KillAppDomain(job.Id); // kill app-domain in every case         
237          }
238        }
239      }
240    }
241
242    private void DoStopJob(Guid jobId) {
243      if (!Jobs.ContainsKey(jobId)) {
244        clientCom.LogMessage("DoStopJob: Can't find job with id " + jobId);
245      } else {
246        Job job = Jobs[jobId];
247
248        if (job != null) {
249          engines[job.Id].Stop();
250          JobData sJob = engines[job.Id].GetFinishedJob();
251          job.ExecutionTime = engines[job.Id].ExecutionTime;
252
253          try {
254            clientCom.LogMessage("Sending the stoppped job with id: " + job.Id);
255            wcfService.UpdateJobData(job, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
256            SlaveStatusInfo.JobsProcessed++;    //TODO: count or not count, thats the question
257          }
258          catch (Exception e) {
259            clientCom.LogMessage("Transmitting to server failed. Storing the paused job with id: " + job.Id + " to hdd (" + e.ToString() + ")");
260          }
261          finally {
262            KillAppDomain(job.Id); // kill app-domain in every case         
263          }
264        }
265      }
266    }
267
268    /// <summary>
269    /// aborts all running jobs, no results are sent back
270    /// </summary>
271    private void DoAbortAll() {
272      List<Guid> guids = new List<Guid>();
273      foreach (Guid job in Jobs.Keys) {
274        guids.Add(job);
275      }
276
277      foreach (Guid g in guids) {
278        KillAppDomain(g);
279      }
280
281      clientCom.LogMessage("Aborted all jobs!");
282    }
283
284    /// <summary>
285    /// wait for jobs to finish, then pause client
286    /// </summary>
287    private void DoPauseAll() {
288      clientCom.LogMessage("Pause all received");
289
290      //copy guids because there will be removed items from 'Jobs'
291      List<Guid> guids = new List<Guid>();
292      foreach (Guid job in Jobs.Keys) {
293        guids.Add(job);
294      }
295
296      foreach (Guid g in guids) {
297        DoPauseJob(g);
298      }
299    }
300
301    /// <summary>
302    /// pause slave immediately
303    /// </summary>
304    private void DoStopAll() {
305      clientCom.LogMessage("Stop all received");
306
307      //copy guids because there will be removed items from 'Jobs'
308      List<Guid> guids = new List<Guid>();
309      foreach (Guid job in Jobs.Keys) {
310        guids.Add(job);
311      }
312
313      foreach (Guid g in guids) {
314        DoStopJob(g);
315      }
316    }
317
318    /// <summary>
319    /// completly shudown slave
320    /// </summary>
321    public void Shutdown() {
322      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
323      MessageQueue.GetInstance().AddMessage(mc);
324      waitShutdownSem.WaitOne();
325    }
326
327    /// <summary>
328    /// complete shutdown, should be called before the the application is exited
329    /// </summary>
330    private void ShutdownCore() {
331      clientCom.LogMessage("Shutdown Signal received");
332      clientCom.LogMessage("Stopping heartbeat");
333      heartbeatManager.StopHeartBeat();
334      abortRequested = true;
335      clientCom.LogMessage("Logging out");
336
337
338      lock (engines) {
339        clientCom.LogMessage("engines locked");
340        foreach (KeyValuePair<Guid, AppDomain> kvp in appDomains) {
341          clientCom.LogMessage("Shutting down Appdomain for " + kvp.Key);
342          appDomains[kvp.Key].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
343          AppDomain.Unload(kvp.Value);
344        }
345      }
346      WcfService.Instance.Disconnect();
347      clientCom.Shutdown();
348      SlaveClientCom.Close();
349
350      if (slaveComm.State != CommunicationState.Closed)
351        slaveComm.Close();
352    }
353
354    /// <summary>
355    /// reinitializes everything and continues operation,
356    /// can be called after Sleep()
357    /// </summary> 
358    private void DoStartSlave() {
359      clientCom.LogMessage("Restart received");
360      StartHeartbeats();
361      clientCom.LogMessage("Restart done");
362    }
363
364    /// <summary>
365    /// stop slave, except for client gui communication,
366    /// primarily used by gui if core is running as windows service
367    /// </summary>
368    //TODO: do we need an AbortSleep?
369    private void Sleep() {
370      clientCom.LogMessage("Sleep received");
371      heartbeatManager.StopHeartBeat();
372      heartbeatManager = null;
373      DoStopAll();
374      WcfService.Instance.Disconnect();
375      clientCom.LogMessage("Sleep done");
376    }
377
378    /// <summary>
379    /// Pauses a job, which means sending it to the server and killing it locally;
380    /// atm only used when executor is waiting for child jobs
381    /// </summary>
382    /// <param name="data"></param>
383    [MethodImpl(MethodImplOptions.Synchronized)]
384    public void PauseWaitJob(JobData data) {
385      if (!Jobs.ContainsKey(data.JobId)) {
386        clientCom.LogMessage("Can't find job with id " + data.JobId);
387      } else {
388        Job job = Jobs[data.JobId];
389        wcfService.UpdateJobData(job, data, ConfigManager.Instance.GetClientInfo().Id, JobState.Paused);
390        wcfService.UpdateJobState(job.Id, JobState.Waiting, null);
391      }
392      KillAppDomain(data.JobId);
393    }
394
395    /// <summary>
396    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
397    /// once the connection gets reestablished, the job gets submitted
398    /// </summary>
399    /// <param name="jobId"></param>
400    [MethodImpl(MethodImplOptions.Synchronized)]
401    public void SendFinishedJob(Guid jobId) {
402      try {
403        clientCom.LogMessage("Getting the finished job with id: " + jobId);
404        if (!engines.ContainsKey(jobId)) {
405          clientCom.LogMessage("Engine doesn't exist");
406          return;
407        }
408        if (!jobs.ContainsKey(jobId)) {
409          clientCom.LogMessage("Job doesn't exist");
410          return;
411        }
412        Job cJob = jobs[jobId];
413        cJob.ExecutionTime = engines[jobId].ExecutionTime;
414
415        JobData sJob = engines[jobId].GetFinishedJob();
416        // cJob.Exception = engines[jId].CurrentException; // can there be an exception if the job is sent normally. the exception should be entered in the statelog with the corresponding state (Failed)
417        cJob.ExecutionTime = engines[jobId].ExecutionTime;
418
419        try {
420          clientCom.LogMessage("Sending the finished job with id: " + jobId);
421          wcfService.UpdateJobData(cJob, sJob, ConfigManager.Instance.GetClientInfo().Id, JobState.Finished);
422          SlaveStatusInfo.JobsProcessed++;
423        }
424        catch (Exception e) {
425          clientCom.LogMessage("Transmitting to server failed. Storing the finished job with id: " + jobId + " to hdd (" + e.ToString() + ")");
426        }
427        finally {
428          KillAppDomain(jobId); // kill app-domain in every case
429          heartbeatManager.AwakeHeartBeatThread();
430        }
431      }
432      catch (Exception e) {
433        OnExceptionOccured(e);
434      }
435    }
436
437    /// <summary>
438    /// A new Job from the wcfService has been received and will be started within a AppDomain.
439    /// </summary>
440    /// <param name="sender"></param>
441    /// <param name="e"></param>
442    private void StartJobInAppDomain(Job myJob, JobData jobData) {
443      clientCom.LogMessage("Received new job with id " + myJob.Id);
444      if (engines.ContainsKey(myJob.Id))
445        throw new Exception("Job with key " + myJob.Id + " already exists");
446
447      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
448      bool pluginsPrepared = false;
449      string configFileName = string.Empty;
450
451      try {
452        PluginCache.Instance.PreparePlugins(myJob, out configFileName);
453        clientCom.LogMessage("Plugins fetched for job " + myJob.Id);
454        pluginsPrepared = true;
455      }
456      catch (Exception exception) {
457        clientCom.LogMessage(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
458      }
459
460      if (pluginsPrepared) {
461        try {
462          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, configFileName));
463          appDomain.UnhandledException += new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
464          lock (engines) {
465            appDomains.Add(myJob.Id, appDomain);
466            clientCom.LogMessage("Creating AppDomain");
467            Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
468            clientCom.LogMessage("Created AppDomain");
469            engine.JobId = myJob.Id;
470            engine.Core = this;
471            clientCom.LogMessage("Starting Engine for job " + myJob.Id);
472            engines.Add(myJob.Id, engine);
473            engine.Start(jobData.Data);
474            SlaveStatusInfo.JobsFetched++;
475            clientCom.LogMessage("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
476          }
477        }
478        catch (Exception exception) {
479          clientCom.LogMessage("Creating the Appdomain and loading the job failed for job " + myJob.Id);
480          clientCom.LogMessage("Error thrown is: " + exception.ToString());
481          KillAppDomain(myJob.Id);
482        }
483      }
484      heartbeatManager.AwakeHeartBeatThread();
485    }
486
487    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
488    private void OnExceptionOccured(Exception e) {
489      clientCom.LogMessage("Error: " + e.ToString());
490      var handler = ExceptionOccured;
491      if (handler != null) handler(this, new EventArgs<Exception>(e));
492    }
493
494    private void AppDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
495      clientCom.LogMessage("Exception in AppDomain: " + e.ExceptionObject.ToString());
496      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
497    }
498
499    /// <summary>
500    /// Enqueues messages from the executor to the message queue.
501    /// This is necessary if the core thread has to execute certain actions, e.g.
502    /// killing of an app domain.
503    /// </summary>
504    /// <typeparam name="T"></typeparam>
505    /// <param name="action"></param>
506    /// <param name="parameter"></param>
507    /// <returns>true if the calling method can continue execution, else false</returns>
508    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
509      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
510      container.Callback = action;
511      container.CallbackParameter = parameter;
512      MessageQueue.GetInstance().AddMessage(container);
513    }
514
515    /// <summary>
516    /// Kill a appdomain with a specific id.
517    /// </summary>
518    /// <param name="id">the GUID of the job</param>
519    //[MethodImpl(MethodImplOptions.Synchronized)]
520    public void KillAppDomain(Guid id) {
521      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
522        EnqueueExecutorMessage<Guid>(KillAppDomain, id);
523        return;
524      }
525
526      clientCom.LogMessage("Shutting down Appdomain for Job " + id);
527      lock (engines) {
528        try {
529          if (engines.ContainsKey(id)) {
530            engines[id].Dispose();
531            engines.Remove(id);
532          }
533
534          if (appDomains.ContainsKey(id)) {
535            appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(AppDomain_UnhandledException);
536
537            int repeat = 5;
538            while (repeat > 0) {
539              try {
540                AppDomain.Unload(appDomains[id]);
541                repeat = 0;
542              }
543              catch (CannotUnloadAppDomainException) {
544                clientCom.LogMessage("Could not unload AppDomain, will try again in 1 sec.");
545                Thread.Sleep(1000);
546                repeat--;
547                if (repeat == 0) {
548                  throw; // rethrow and let app crash
549                }
550              }
551            }
552            appDomains.Remove(id);
553          }
554
555          jobs.Remove(id);
556          PluginCache.Instance.DeletePluginsForJob(id);
557          GC.Collect();
558        }
559        catch (Exception ex) {
560          clientCom.LogMessage("Exception when unloading the appdomain: " + ex.ToString());
561        }
562      }
563      GC.Collect();
564    }
565  }
566}
Note: See TracBrowser for help on using the repository browser.