Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/LifecycleManager.cs @ 5404

Last change on this file since 5404 was 5404, checked in by cneumuel, 13 years ago

#1233

  • changed the workflow of aquireing a new job from server.
    • if a job is available for calculation, the slave receives the jobId already with the heartbeats. The job is then exclusively assigned to this slave.
  • extended the metainfo for a slave by OperatingSystem and CpuArchitecture
  • enhanced the way plugin-dependencies are discovered by using the types used by XmlGenerator. Now only mimimum amount of plugins are transferred.
  • selection of waiting jobs now consideres assigned slave-group
  • more unit tests for service
  • added unit tests for experiment manager
File size: 8.7 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using System.Transactions;
6using HeuristicLab.Services.Hive.Common;
7using HeuristicLab.Services.Hive.Common.DataTransfer;
8using HeuristicLab.Tracing;
9using HeuristicLab.Core;
10
11namespace HeuristicLab.Services.Hive {
12  /// <summary>
13  /// This class holds the state of all recent heartbeats and decides to reschedule jobs and set slaves offline
14  /// </summary>
15  public class LifecycleManager : ILifecycleManager {
16    private DataAccess.IHiveDao dao {
17      get { return ServiceLocator.Instance.HiveDao; }
18    }
19    private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
20      get { return ServiceLocator.Instance.TransactionManager; }
21    }
22    private IAuthorizationManager auth {
23      get { return ServiceLocator.Instance.AuthorizationManager; }
24    }
25
26    private static object locker = new object();
27    private Dictionary<Guid, DateTime> heartbeats = new Dictionary<Guid, DateTime>();
28
29    // Windows-Forms timer is single threaded, so callbacks will be synchron
30    System.Windows.Forms.Timer timer = new System.Windows.Forms.Timer();
31
32    /// <summary>
33    /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent
34    /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.
35    /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.
36    /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.
37    /// </summary>
38    private Dictionary<Guid, int> newlyAssignedJobs = new Dictionary<Guid, int>();
39
40    /// <summary>
41    /// Counts how many jobs are currently beeing transferred.
42    /// </summary>
43    private int jobsCurrentlyTransfering = 0;
44    public int JobsCurrentlyTransferring {
45      get { return jobsCurrentlyTransfering; }
46      set {
47        if (jobsCurrentlyTransfering != value) {
48          jobsCurrentlyTransfering = value;
49          Logger.Info("JobsCurrentlyTransfering: " + jobsCurrentlyTransfering);
50        }
51      }
52    }
53
54    public ExecutionState ExecutionState {
55      get { return timer.Enabled ? ExecutionState.Started : Core.ExecutionState.Stopped;  }
56    }
57
58    public LifecycleManager() { }
59
60    public void Start() {
61      if (ExecutionState == Core.ExecutionState.Stopped) {
62        this.timer.Interval = (int)new TimeSpan(0, 0, 10).TotalMilliseconds;
63        this.timer.Tick += new EventHandler(timer_Tick);
64        this.timer.Start();
65        AddAllSlavesToHeartbeats();
66      }
67    }
68
69    public void Stop() {
70      if (ExecutionState == Core.ExecutionState.Started) {
71        timer.Stop();
72      }
73    }
74
75    /// <summary>
76    // add all slaves to hearbeats-collection and give them some time to say hello (HEARTBEAT_TIMEOUT)
77    // otherwise alls the slaves jobs would be aborted immediately, which is not desirable if the server has just been restarted
78    /// </summary>
79    private void AddAllSlavesToHeartbeats() {
80      lock (locker) {
81        using (trans.OpenTransaction()) {
82          Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
83          foreach (Guid slaveId in slaveIds) {
84            if (!heartbeats.ContainsKey(slaveId)) {
85              heartbeats.Add(slaveId, DateTime.Now);
86            }
87          }
88        }
89      }
90    }
91
92    /// <summary>
93    /// This method is supposed to check if slaves are online
94    /// if not -> set them offline and check if they where calculating a job
95    /// </summary>
96    private void timer_Tick(object sender, EventArgs e) {
97      lock (locker) {
98        using (trans.OpenTransaction()) {
99          Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
100          foreach (Guid slaveId in slaveIds) {
101            if (SlaveTimedOut(slaveId)) {
102              var slave = dao.GetSlave(slaveId);
103              if (slave.SlaveState != SlaveState.Offline) {
104                AbortJobs(slaveId);
105                slave.SlaveState = SlaveState.Offline;
106                dao.UpdateSlave(slave);
107              }
108              heartbeats.Remove(slaveId);
109            }
110          }
111        }
112      }
113    }
114
115    private bool SlaveTimedOut(Guid slaveId) {
116      if (!heartbeats.ContainsKey(slaveId))
117        return true;
118
119      if ((DateTime.Now - heartbeats[slaveId]).TotalSeconds > ApplicationConstants.HeartbeatTimeout) {
120        return true;
121      }
122
123      return false;
124    }
125
126    private void AbortJobs(Guid slaveId) {
127      var jobs = dao.GetJobs(x => x.Slave.ResourceId == slaveId);
128      foreach (var j in jobs) {
129        j.JobState = JobState.Waiting;
130        dao.UpdateJob(j);
131      }
132    }
133
134    /// <summary>
135    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
136    /// </summary>
137    /// <returns>a list of actions the slave should do</returns>
138    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
139      List<MessageContainer> actions = new List<MessageContainer>();
140      Slave slave = dao.GetSlave(heartbeat.SlaveId);
141      if (slave == null) {
142        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
143      } else {
144        heartbeats[heartbeat.SlaveId] = DateTime.Now;
145        actions.AddRange(UpdateJobs(heartbeat));
146
147        if (this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
148          var availableJobs = dao.GetWaitingJobs(slave, 1);
149          if (availableJobs.Count() > 0) {
150            var job = availableJobs.First();
151            actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateJob, job.Id));
152            AssignJob(slave, job);
153          }
154        }
155
156        if (slave.FreeCores != heartbeat.FreeCores ||
157            slave.FreeMemory != heartbeat.FreeMemory ||
158            slave.IsAllowedToCalculate != heartbeat.IsAllowedToCalculate ||
159            slave.SlaveState != (heartbeat.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle)) { // only update slave when something changed, to avoid unnecessary updates
160          slave.FreeCores = heartbeat.FreeCores;
161          slave.FreeMemory = heartbeat.FreeMemory;
162          slave.IsAllowedToCalculate = heartbeat.IsAllowedToCalculate;
163          slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
164          dao.UpdateSlave(slave);
165        }
166      }
167      return actions;
168    }
169
170    private void AssignJob(Slave slave, Job job) {
171      job.SlaveId = slave.Id;
172      job.JobState = JobState.Calculating; // Todo: Maybe use State = Transferring (?)
173      job.DateCalculated = DateTime.Now; // Todo: use statelog instead
174      dao.UpdateJob(job);
175      dao.UpdateSlave(slave);
176    }
177
178    /// <summary>
179    /// Update the progress of each job
180    /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
181    /// </summary>
182    private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
183      List<MessageContainer> actions = new List<MessageContainer>();
184
185      if (heartbeat.JobProgress == null)
186        return actions;
187
188      // process the jobProgresses
189      foreach (var jobProgress in heartbeat.JobProgress) {
190        Job curJob = dao.GetJob(jobProgress.Key);
191        if (curJob == null) {
192          // job does not exist in db
193          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
194          Logger.Error("Job does not exist in DB: " + jobProgress.Key);
195        } else {
196          if (curJob.SlaveId == Guid.Empty || curJob.SlaveId != heartbeat.SlaveId) {
197            // assigned slave does not match heartbeat
198            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
199            Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
200          } else {
201            // save job execution time
202            curJob.ExecutionTime = jobProgress.Value;
203
204            if (curJob.JobState == JobState.Aborted) {
205              // a request to abort the job has been set
206              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
207            }
208            dao.UpdateJob(curJob);
209          }
210        }
211      }
212      return actions;
213    }
214
215    /// <summary>
216    /// Returns true if there are enough resources to send a job
217    /// There should not be too many jobs sent simultaniously
218    /// </summary>
219    private bool IsAllowedToSendJobs() {
220      return JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
221    }
222  }
223}
Note: See TracBrowser for help on using the repository browser.