Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 6203

Last change on this file since 6203 was 6203, checked in by ascheibe, 13 years ago

#1233

  • dropped dependency of Core from Executor
  • enabled sandboxing
  • moved most parts of Job handling from Core to SlaveJob to simplify locking
  • optimized how UsedCores is handled
  • SlaveStatusInfo is now thread-save and counts jobs more correct
File size: 13.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Diagnostics;
25using System.ServiceModel;
26using System.Threading;
27using System.Threading.Tasks;
28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31
32
33namespace HeuristicLab.Clients.Hive.SlaveCore {
34  /// <summary>
35  /// The core component of the Hive Slave.
36  /// Handles commands sent from the Hive Server.
37  /// </summary>
38  public class Core : MarshalByRefObject {
39    public EventLog ServiceEventLog { get; set; }
40
41    public static bool abortRequested { get; set; }
42    private Semaphore waitShutdownSem = new Semaphore(0, 1);
43    public static ILog Log { get; set; }
44
45    private Dictionary<Guid, SlaveJob> slaveJobs = new Dictionary<Guid, SlaveJob>();
46
47    private WcfService wcfService;
48    private static HeartbeatManager heartbeatManager;
49    public static HeartbeatManager HBManager { get { return heartbeatManager; } }
50
51    private ISlaveCommunication clientCom;
52    private ServiceHost slaveComm;
53
54    public Dictionary<Guid, SlaveJob> SlaveJobs {
55      get { return slaveJobs; }
56    }
57
58    public Core() { }
59
60    /// <summary>
61    /// Main Method for the client
62    /// </summary>
63    public void Start() {
64      abortRequested = false;
65
66      try {
67        ConfigManager manager = ConfigManager.Instance;
68        manager.Core = this;
69
70        //start the client communication service (pipe between slave and slave gui)
71        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
72        slaveComm.Open();
73        clientCom = SlaveClientCom.Instance.ClientCom;
74
75        // delete all left over job directories
76        PluginCache.Instance.CleanPluginTemp();
77        clientCom.LogMessage("Hive Slave started");
78
79        wcfService = WcfService.Instance;
80        RegisterServiceEvents();
81
82        StartHeartbeats(); // Start heartbeats thread       
83        DispatchMessageQueue(); // dispatch messages until abortRequested
84      }
85      catch (Exception ex) {
86        if (ServiceEventLog != null) {
87          try {
88            ServiceEventLog.WriteEntry("Hive Slave threw exception: " + ex.ToString() + " with stack trace: " + ex.StackTrace);
89          }
90          catch (Exception) { }
91        } else {
92          //try to log with clientCom. if this works the user sees at least a message,
93          //else an exception will be thrown anyways.
94          clientCom.LogMessage("Uncaught exception: " + ex.ToString() +
95            Environment.NewLine + "Core is going to shutdown.");
96        }
97        ShutdownCore();
98      }
99      finally {
100        DeRegisterServiceEvents();
101        waitShutdownSem.Release();
102      }
103    }
104
105    private void StartHeartbeats() {
106      //Initialize the heartbeat     
107      if (heartbeatManager == null) {
108        heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
109        heartbeatManager.StartHeartbeat();
110      }
111    }
112
113    private void DispatchMessageQueue() {
114      MessageQueue queue = MessageQueue.GetInstance();
115      while (!abortRequested) {
116        MessageContainer container = queue.GetMessage();
117        DetermineAction(container);
118        clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
119      }
120    }
121
122    private void RegisterServiceEvents() {
123      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
124      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
125    }
126
127    private void DeRegisterServiceEvents() {
128      WcfService.Instance.Connected -= WcfService_Connected;
129      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
130    }
131
132    void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
133      clientCom.LogMessage("Connection to server interruped with exception: " + e.Value.Message);
134    }
135
136    void WcfService_Connected(object sender, EventArgs e) {
137      clientCom.LogMessage("Connected successfully to Hive server");
138    }
139
140    /// <summary>
141    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
142    /// </summary>
143    /// <param name="container">The container, containing the message</param>
144    private void DetermineAction(MessageContainer container) {
145      clientCom.LogMessage("Message: " + container.Message.ToString() + " for job: " + container.JobId);
146
147      if (container is ExecutorMessageContainer<Guid>) {
148        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
149        c.execute();
150      } else if (container is MessageContainer) {
151        switch (container.Message) {
152          case MessageContainer.MessageType.CalculateJob:
153            Task.Factory.StartNew((jobIdObj) => {
154              Guid jobId = (Guid)jobIdObj;
155              SlaveJob newJob = new SlaveJob(this);
156              bool start = true;
157
158              lock (slaveJobs) {
159                if (slaveJobs.ContainsKey(jobId)) {
160                  start = false;
161                  clientCom.LogMessage(string.Format("Job with id {0} already exists. Start aborted.", jobId));
162                } else {
163                  slaveJobs.Add(jobId, newJob);
164                }
165              }
166
167              if (start) {
168                newJob.CalculateJob(jobId);
169              }
170            }, container.JobId)
171            .ContinueWith((t) => {
172              // handle exception of task
173              clientCom.LogMessage(t.Exception.ToString());
174              wcfService.UpdateJobState(container.JobId, JobState.Failed, t.Exception.ToString());
175              SlaveStatusInfo.IncrementJobsFailed();
176            }, TaskContinuationOptions.OnlyOnFaulted);
177            break;
178          case MessageContainer.MessageType.ShutdownSlave:
179            ShutdownCore();
180            break;
181          case MessageContainer.MessageType.StopAll:
182            DoStopAll();
183            break;
184          case MessageContainer.MessageType.PauseAll:
185            DoPauseAll();
186            break;
187          case MessageContainer.MessageType.AbortAll:
188            DoAbortAll();
189            break;
190          case MessageContainer.MessageType.AbortJob:
191            SlaveStatusInfo.IncrementJobsAborted(); //TODO: move to a sane place
192
193            Task.Factory.StartNew((jobIdObj) => {
194              Guid jobId = (Guid)jobIdObj;
195              bool abort = true;
196              SlaveJob sj = null;
197
198              lock (slaveJobs) {
199                if (!slaveJobs.ContainsKey(jobId)) {
200                  clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Abort aborted.", jobId));
201                  abort = false;
202                } else {
203                  sj = slaveJobs[jobId];
204                }
205              }
206              if (abort && !sj.Finished) {
207                sj.KillAppDomain();
208              }
209            }, container.JobId)
210             .ContinueWith((t) => {
211               // handle exception of task
212               clientCom.LogMessage(t.Exception.ToString());
213             }, TaskContinuationOptions.OnlyOnFaulted);
214            break;
215          case MessageContainer.MessageType.StopJob:
216            Task.Factory.StartNew((jobIdObj) => {
217              Guid jobId = (Guid)jobIdObj;
218              bool stop = true;
219              SlaveJob sj = null;
220
221              lock (slaveJobs) {
222                if (!slaveJobs.ContainsKey(jobId)) {
223                  clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Stop aborted.", jobId));
224                  stop = false;
225                } else {
226                  sj = slaveJobs[jobId];
227                }
228              }
229              if (stop && !sj.Finished) {
230                sj.StopJob();
231              }
232            }, container.JobId)
233             .ContinueWith((t) => {
234               // handle exception of task
235               clientCom.LogMessage(t.Exception.ToString());
236             }, TaskContinuationOptions.OnlyOnFaulted);
237            break;
238          case MessageContainer.MessageType.PauseJob:
239            Task.Factory.StartNew((jobIdObj) => {
240              Guid jobId = (Guid)jobIdObj;
241              bool pause = true;
242              SlaveJob sj = null;
243
244              lock (slaveJobs) {
245                if (!slaveJobs.ContainsKey(jobId)) {
246                  clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Pause aborted.", jobId));
247                  pause = false;
248                } else {
249                  sj = slaveJobs[jobId];
250                }
251              }
252              if (pause && !sj.Finished) {
253                sj.PauseJob();
254              }
255            }, container.JobId)
256             .ContinueWith((t) => {
257               // handle exception of task
258               clientCom.LogMessage(t.Exception.ToString());
259             }, TaskContinuationOptions.OnlyOnFaulted);
260            break;
261          case MessageContainer.MessageType.Restart:
262            DoStartSlave();
263            break;
264          case MessageContainer.MessageType.Sleep:
265            Sleep();
266            break;
267          case MessageContainer.MessageType.SayHello:
268            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
269            break;
270        }
271      } else {
272        clientCom.LogMessage("Unknown MessageContainer: " + container);
273      }
274    }
275
276    /// <summary>
277    /// aborts all running jobs, no results are sent back
278    /// </summary>
279    private void DoAbortAll() {
280      lock (slaveJobs) {
281        foreach (SlaveJob sj in slaveJobs.Values) {
282          if (!sj.Finished) {
283            sj.KillAppDomain();
284          }
285        }
286      }
287      clientCom.LogMessage("Aborted all jobs!");
288    }
289
290    /// <summary>
291    /// wait for jobs to finish, then pause client
292    /// </summary>
293    private void DoPauseAll() {
294      clientCom.LogMessage("Pause all received");
295
296      lock (slaveJobs) {
297        foreach (SlaveJob sj in slaveJobs.Values) {
298          if (!sj.Finished) {
299            sj.PauseJob();
300          }
301        }
302      }
303    }
304
305    /// <summary>
306    /// pause slave immediately
307    /// </summary>
308    private void DoStopAll() {
309      clientCom.LogMessage("Stop all received");
310
311      lock (slaveJobs) {
312        foreach (SlaveJob sj in slaveJobs.Values) {
313          if (!sj.Finished) {
314            sj.StopJob();
315          }
316        }
317      }
318    }
319
320    /// <summary>
321    /// completly shudown slave
322    /// </summary>
323    public void Shutdown() {
324      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
325      MessageQueue.GetInstance().AddMessage(mc);
326      waitShutdownSem.WaitOne();
327    }
328
329    /// <summary>
330    /// complete shutdown, should be called before the the application is exited
331    /// </summary>
332    private void ShutdownCore() {
333      clientCom.LogMessage("Shutdown Signal received");
334      clientCom.LogMessage("Stopping heartbeat");
335      heartbeatManager.StopHeartBeat();
336      abortRequested = true;
337      clientCom.LogMessage("Logging out");
338
339      DoAbortAll();
340
341      WcfService.Instance.Disconnect();
342      clientCom.Shutdown();
343      SlaveClientCom.Close();
344
345      if (slaveComm.State != CommunicationState.Closed)
346        slaveComm.Close();
347    }
348
349    /// <summary>
350    /// reinitializes everything and continues operation,
351    /// can be called after Sleep()
352    /// </summary> 
353    private void DoStartSlave() {
354      clientCom.LogMessage("Restart received");
355      StartHeartbeats();
356      clientCom.LogMessage("Restart done");
357    }
358
359    /// <summary>
360    /// stop slave, except for client gui communication,
361    /// primarily used by gui if core is running as windows service
362    /// </summary>   
363    private void Sleep() {
364      clientCom.LogMessage("Sleep received");
365      heartbeatManager.StopHeartBeat();
366      heartbeatManager = null;
367      DoStopAll();
368      WcfService.Instance.Disconnect();
369      clientCom.LogMessage("Sleep done");
370    }
371
372    /// <summary>
373    /// Enqueues messages from the executor to the message queue.
374    /// This is necessary if the core thread has to execute certain actions, e.g.
375    /// killing of an app domain.
376    /// </summary>   
377    /// <returns>true if the calling method can continue execution, else false</returns>
378    public void EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
379      ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
380      container.Callback = action;
381      container.CallbackParameter = parameter;
382      MessageQueue.GetInstance().AddMessage(container);
383    }
384
385    public void RemoveSlaveJobFromList(Guid jobId) {
386      lock (slaveJobs) {
387        if (slaveJobs.ContainsKey(jobId)) {
388          slaveJobs.Remove(jobId);
389        }
390      }
391    }
392  }
393}
Note: See TracBrowser for help on using the repository browser.