Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs @ 6258

Last change on this file since 6258 was 6258, checked in by cneumuel, 13 years ago

#1233

  • small cosmetic changes
File size: 13.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Diagnostics;
25using System.ServiceModel;
26using System.Threading;
27using System.Threading.Tasks;
28using HeuristicLab.Clients.Hive.SlaveCore.ServiceContracts;
29using HeuristicLab.Common;
30using HeuristicLab.Core;
31
32
33namespace HeuristicLab.Clients.Hive.SlaveCore {
34  /// <summary>
35  /// The core component of the Hive Slave.
36  /// Handles commands sent from the Hive Server.
37  /// </summary>
38  public class Core : MarshalByRefObject {
39    public static ILog Log { get; set; }
40    public static bool AbortRequested { get; set; }
41
42    private static HeartbeatManager heartbeatManager;
43    public static HeartbeatManager HeartbeatManager {
44      get { return heartbeatManager; }
45    }
46
47    private ISlaveCommunication clientCom;
48    private ServiceHost slaveComm;
49    private Semaphore waitShutdownSem = new Semaphore(0, 1);
50    private Dictionary<Guid, SlaveJob> slaveJobs = new Dictionary<Guid, SlaveJob>();
51    private WcfService wcfService;
52
53    public EventLog ServiceEventLog { get; set; }
54    public Dictionary<Guid, SlaveJob> SlaveJobs { get { return slaveJobs; } }
55
56    public Core() { }
57
58    /// <summary>
59    /// Main Method for the client
60    /// </summary>
61    public void Start() {
62      AbortRequested = false;
63
64      try {
65        ConfigManager manager = ConfigManager.Instance;
66        manager.Core = this;
67
68        //start the client communication service (pipe between slave and slave gui)
69        slaveComm = new ServiceHost(typeof(SlaveCommunicationService));
70        slaveComm.Open();
71        clientCom = SlaveClientCom.Instance.ClientCom;
72
73        // delete all left over job directories
74        PluginCache.Instance.CleanPluginTemp();
75        clientCom.LogMessage("Hive Slave started");
76
77        wcfService = WcfService.Instance;
78        RegisterServiceEvents();
79
80        StartHeartbeats(); // Start heartbeats thread       
81        DispatchMessageQueue(); // dispatch messages until abortRequested
82      }
83      catch (Exception ex) {
84        if (ServiceEventLog != null) {
85          try {
86            ServiceEventLog.WriteEntry(string.Format("Hive Slave threw exception: {0} with stack trace: {1}", ex.ToString(), ex.StackTrace));
87          }
88          catch (Exception) { }
89        } else {
90          //try to log with clientCom. if this works the user sees at least a message,
91          //else an exception will be thrown anyways.
92          clientCom.LogMessage(string.Format("Uncaught exception: {0} {1} Core is going to shutdown.", ex.ToString(), Environment.NewLine));
93        }
94        ShutdownCore();
95      }
96      finally {
97        DeregisterServiceEvents();
98        waitShutdownSem.Release();
99      }
100    }
101
102    private void StartHeartbeats() {
103      //Initialize the heartbeat     
104      if (heartbeatManager == null) {
105        heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
106        heartbeatManager.StartHeartbeat();
107      }
108    }
109
110    private void DispatchMessageQueue() {
111      MessageQueue queue = MessageQueue.GetInstance();
112      while (!AbortRequested) {
113        MessageContainer container = queue.GetMessage();
114        DetermineAction(container);
115        if (!AbortRequested) {
116          clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
117        }
118      }
119    }
120
121    private void RegisterServiceEvents() {
122      WcfService.Instance.Connected += new EventHandler(WcfService_Connected);
123      WcfService.Instance.ExceptionOccured += new EventHandler<EventArgs<Exception>>(WcfService_ExceptionOccured);
124    }
125
126    private void DeregisterServiceEvents() {
127      WcfService.Instance.Connected -= WcfService_Connected;
128      WcfService.Instance.ExceptionOccured -= WcfService_ExceptionOccured;
129    }
130
131    private void WcfService_ExceptionOccured(object sender, EventArgs<Exception> e) {
132      clientCom.LogMessage(string.Format("Connection to server interruped with exception: {0}", e.Value.Message));
133    }
134
135    private void WcfService_Connected(object sender, EventArgs e) {
136      clientCom.LogMessage("Connected successfully to Hive server");
137    }
138
139    /// <summary>
140    /// Reads and analyzes the Messages from the MessageQueue and starts corresponding actions
141    /// </summary>
142    /// <param name="container">The container, containing the message</param>
143    private void DetermineAction(MessageContainer container) {
144      clientCom.LogMessage(string.Format("Message: {0} for job: {1} ", container.Message.ToString(), container.JobId));
145
146      if (container is ExecutorMessageContainer<Guid>) {
147        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
148        c.execute();
149      } else if (container is MessageContainer) {
150        switch (container.Message) {
151          case MessageContainer.MessageType.CalculateJob:
152            Task.Factory.StartNew(HandleCalculateJob, container.JobId)
153            .ContinueWith((t) => {
154              // handle exception of task
155              clientCom.LogMessage(t.Exception.ToString());
156              if (t.Exception.GetType() != typeof(JobNotFoundException)) {
157                wcfService.UpdateJobState(container.JobId, JobState.Waiting, t.Exception.ToString());
158                SlaveStatusInfo.IncrementJobsFailed();
159              }
160            }, TaskContinuationOptions.OnlyOnFaulted);
161            break;
162          case MessageContainer.MessageType.ShutdownSlave:
163            ShutdownCore();
164            break;
165          case MessageContainer.MessageType.StopAll:
166            DoStopAll();
167            break;
168          case MessageContainer.MessageType.PauseAll:
169            DoPauseAll();
170            break;
171          case MessageContainer.MessageType.AbortAll:
172            DoAbortAll();
173            break;
174          case MessageContainer.MessageType.AbortJob:
175            SlaveStatusInfo.IncrementJobsAborted(); //TODO: move to a sane place
176
177            Task.Factory.StartNew(HandleAbortJob, container.JobId)
178             .ContinueWith((t) => {
179               clientCom.LogMessage(t.Exception.ToString());
180             }, TaskContinuationOptions.OnlyOnFaulted);
181            break;
182          case MessageContainer.MessageType.StopJob:
183            Task.Factory.StartNew(HandleStopJob, container.JobId)
184             .ContinueWith((t) => {
185               clientCom.LogMessage(t.Exception.ToString());
186             }, TaskContinuationOptions.OnlyOnFaulted);
187            break;
188          case MessageContainer.MessageType.PauseJob:
189            Task.Factory.StartNew(HandlePauseJob, container.JobId)
190             .ContinueWith((t) => {
191               clientCom.LogMessage(t.Exception.ToString());
192             }, TaskContinuationOptions.OnlyOnFaulted);
193            break;
194          case MessageContainer.MessageType.Restart:
195            DoStartSlave();
196            break;
197          case MessageContainer.MessageType.Sleep:
198            Sleep();
199            break;
200          case MessageContainer.MessageType.SayHello:
201            wcfService.Connect(ConfigManager.Instance.GetClientInfo());
202            break;
203        }
204      } else {
205        clientCom.LogMessage("Unknown MessageContainer: " + container);
206      }
207    }
208
209    private void HandleCalculateJob(object jobIdObj) {
210      Guid jobId = (Guid)jobIdObj;
211      SlaveJob newJob = new SlaveJob(this);
212      bool start = true;
213
214      lock (slaveJobs) {
215        if (ConfigManager.Instance.GetFreeCores() < 1) {
216          wcfService.UpdateJobState(jobId, JobState.Waiting, "Slave set status waiting because no cores were available");
217          clientCom.LogMessage(string.Format("Setting job with id {0} to waiting, all cores are used", jobId));
218          start = false;
219        } else {
220          if (slaveJobs.ContainsKey(jobId)) {
221            start = false;
222            clientCom.LogMessage(string.Format("Job with id {0} already exists. Start aborted.", jobId));
223          } else {
224            slaveJobs.Add(jobId, newJob);
225            newJob.PrepareJob(jobId);
226          }
227        }
228      }
229
230      if (start) {
231        newJob.CalculateJob();
232      }
233    }
234
235    private void HandleAbortJob(object jobIdObj) {
236      Guid jobId = (Guid)jobIdObj;
237      bool abort = true;
238      SlaveJob sj = null;
239
240      lock (slaveJobs) {
241        if (!slaveJobs.ContainsKey(jobId)) {
242          clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Abort aborted.", jobId));
243          abort = false;
244        } else {
245          sj = slaveJobs[jobId];
246        }
247      }
248
249      if (abort && !sj.Finished) {
250        sj.KillAppDomain();
251      }
252    }
253
254    private void HandleStopJob(object jobIdObj) {
255      Guid jobId = (Guid)jobIdObj;
256      bool stop = true;
257      SlaveJob sj = null;
258
259      lock (slaveJobs) {
260        if (!slaveJobs.ContainsKey(jobId)) {
261          clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Stop aborted.", jobId));
262          stop = false;
263        } else {
264          sj = slaveJobs[jobId];
265        }
266      }
267
268      if (stop && !sj.Finished) {
269        sj.StopJob();
270      }
271    }
272
273    private void HandlePauseJob(object jobIdObj) {
274      Guid jobId = (Guid)jobIdObj;
275      bool pause = true;
276      SlaveJob sj = null;
277
278      lock (slaveJobs) {
279        if (!slaveJobs.ContainsKey(jobId)) {
280          clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Pause aborted.", jobId));
281          pause = false;
282        } else {
283          sj = slaveJobs[jobId];
284        }
285      }
286
287      if (pause && !sj.Finished) {
288        sj.PauseJob();
289      }
290    }
291
292    /// <summary>
293    /// aborts all running jobs, no results are sent back
294    /// </summary>
295    private void DoAbortAll() {
296      lock (slaveJobs) {
297        foreach (SlaveJob sj in slaveJobs.Values) {
298          if (!sj.Finished) {
299            sj.KillAppDomain();
300          }
301        }
302      }
303      clientCom.LogMessage("Aborted all jobs!");
304    }
305
306    /// <summary>
307    /// wait for jobs to finish, then pause client
308    /// </summary>
309    private void DoPauseAll() {
310      clientCom.LogMessage("Pause all received");
311
312      lock (slaveJobs) {
313        foreach (SlaveJob sj in slaveJobs.Values) {
314          if (!sj.Finished) {
315            sj.PauseJob();
316          }
317        }
318      }
319    }
320
321    /// <summary>
322    /// pause slave immediately
323    /// </summary>
324    private void DoStopAll() {
325      clientCom.LogMessage("Stop all received");
326
327      lock (slaveJobs) {
328        foreach (SlaveJob sj in slaveJobs.Values) {
329          if (!sj.Finished) {
330            sj.StopJob();
331          }
332        }
333      }
334    }
335
336    /// <summary>
337    /// completly shudown slave
338    /// </summary>
339    public void Shutdown() {
340      MessageContainer mc = new MessageContainer(MessageContainer.MessageType.ShutdownSlave);
341      MessageQueue.GetInstance().AddMessage(mc);
342      waitShutdownSem.WaitOne();
343    }
344
345    /// <summary>
346    /// complete shutdown, should be called before the the application is exited
347    /// </summary>
348    private void ShutdownCore() {
349      clientCom.LogMessage("Shutdown Signal received");
350      clientCom.LogMessage("Stopping heartbeat");
351      heartbeatManager.StopHeartBeat();
352      AbortRequested = true;
353      clientCom.LogMessage("Logging out");
354
355      DoAbortAll();
356
357      WcfService.Instance.Disconnect();
358      clientCom.Shutdown();
359      SlaveClientCom.Close();
360
361      if (slaveComm.State != CommunicationState.Closed)
362        slaveComm.Close();
363    }
364
365    /// <summary>
366    /// reinitializes everything and continues operation,
367    /// can be called after Sleep()
368    /// </summary> 
369    private void DoStartSlave() {
370      clientCom.LogMessage("Restart received");
371      ConfigManager.Instance.Asleep = false;
372      clientCom.LogMessage("Restart done");
373    }
374
375    /// <summary>
376    /// stop slave, except for client gui communication,
377    /// primarily used by gui if core is running as windows service
378    /// </summary>   
379    private void Sleep() {
380      clientCom.LogMessage("Sleep received - not accepting any new jobs");
381      ConfigManager.Instance.Asleep = true;
382      DoPauseAll(); //TODO: or stop? can't decide...     
383    }
384
385    public void RemoveSlaveJobFromList(Guid jobId) {
386      lock (slaveJobs) {
387        if (slaveJobs.ContainsKey(jobId)) {
388          slaveJobs.Remove(jobId);
389        }
390      }
391    }
392  }
393}
Note: See TracBrowser for help on using the repository browser.