Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/LifecycleManager.cs @ 5095

Last change on this file since 5095 was 5095, checked in by cneumuel, 14 years ago

#1233

  • fixed config merge process
  • worked on hive server test setup
File size: 8.3 KB
RevLine 
[5095]1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using System.Transactions;
6using HeuristicLab.Services.Hive.Common;
7using HeuristicLab.Services.Hive.Common.DataTransfer;
8using HeuristicLab.Tracing;
9using HeuristicLab.Core;
10
11namespace HeuristicLab.Services.Hive {
12  /// <summary>
13  /// This class holds the state of all recent heartbeats and decides to reschedule jobs and set slaves offline
14  /// </summary>
15  public class LifecycleManager : ILifecycleManager {
16    private DataAccess.IHiveDao dao {
17      get { return ServiceLocator.Instance.HiveDao; }
18    }
19    private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
20      get { return ServiceLocator.Instance.TransactionManager; }
21    }
22    private IAuthorizationManager auth {
23      get { return ServiceLocator.Instance.AuthorizationManager; }
24    }
25
26    private static object locker = new object();
27    private Dictionary<Guid, DateTime> heartbeats = new Dictionary<Guid, DateTime>();
28
29    // Windows-Forms timer is single threaded, so callbacks will be synchron
30    System.Windows.Forms.Timer timer = new System.Windows.Forms.Timer();
31
32    /// <summary>
33    /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent
34    /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.
35    /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.
36    /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.
37    /// </summary>
38    private Dictionary<Guid, int> newlyAssignedJobs = new Dictionary<Guid, int>();
39
40    /// <summary>
41    /// Counts how many jobs are currently beeing transferred.
42    /// </summary>
43    private int jobsCurrentlyTransfering = 0;
44    public int JobsCurrentlyTransferring {
45      get { return jobsCurrentlyTransfering; }
46      set {
47        if (jobsCurrentlyTransfering != value) {
48          jobsCurrentlyTransfering = value;
49          Logger.Info("JobsCurrentlyTransfering: " + jobsCurrentlyTransfering);
50        }
51      }
52    }
53
54    public ExecutionState ExecutionState {
55      get { return timer.Enabled ? ExecutionState.Started : Core.ExecutionState.Stopped;  }
56    }
57
58    public LifecycleManager() { }
59
60    public void Start() {
61      if (ExecutionState == Core.ExecutionState.Stopped) {
62        this.timer.Interval = (int)new TimeSpan(0, 0, 10).TotalMilliseconds;
63        this.timer.Tick += new EventHandler(timer_Tick);
64        this.timer.Start();
65        AddAllSlavesToHeartbeats();
66      }
67    }
68
69    public void Stop() {
70      if (ExecutionState == Core.ExecutionState.Started) {
71        timer.Stop();
72      }
73    }
74
75    /// <summary>
76    // add all slaves to hearbeats-collection and give them some time to say hello (HEARTBEAT_TIMEOUT)
77    // otherwise alls the slaves jobs would be aborted immediately, which is not desirable if the server has just been restarted
78    /// </summary>
79    private void AddAllSlavesToHeartbeats() {
80      lock (locker) {
81        using (trans.OpenTransaction()) {
82          Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
83          foreach (Guid slaveId in slaveIds) {
84            if (!heartbeats.ContainsKey(slaveId)) {
85              heartbeats.Add(slaveId, DateTime.Now);
86            }
87          }
88        }
89      }
90    }
91
92    /// <summary>
93    /// This method is supposed to check if slaves are online
94    /// if not -> set them offline and check if they where calculating a job
95    /// </summary>
96    void timer_Tick(object sender, EventArgs e) {
97      lock (locker) {
98        using (trans.OpenTransaction()) {
99          Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
100          foreach (Guid slaveId in slaveIds) {
101            if (SlaveTimedOut(slaveId)) {
102              var slave = dao.GetSlave(slaveId);
103              if (slave.SlaveState != SlaveState.Offline) {
104                AbortJobs(slaveId);
105                slave.SlaveState = SlaveState.Offline;
106                dao.UpdateSlave(slave);
107              }
108              heartbeats.Remove(slaveId);
109            }
110          }
111        }
112      }
113    }
114
115    private bool SlaveTimedOut(Guid slaveId) {
116      if (!heartbeats.ContainsKey(slaveId))
117        return true;
118
119      if ((DateTime.Now - heartbeats[slaveId]).TotalSeconds > ApplicationConstants.HeartbeatTimeout) {
120        return true;
121      }
122
123      return false;
124    }
125
126    private void AbortJobs(Guid slaveId) {
127      var jobs = dao.GetJobs(x => x.Slave.ResourceId == slaveId);
128      foreach (var j in jobs) {
129        j.JobState = JobState.Waiting;
130        dao.UpdateJob(j);
131      }
132    }
133
134    /// <summary>
135    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
136    /// </summary>
137    /// <returns>a list of actions the slave should do</returns>
138    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
139      List<MessageContainer> actions = new List<MessageContainer>();
140      Slave slave = dao.GetSlave(heartbeat.SlaveId);
141      if (slave == null) {
142        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
143      } else {
144        heartbeats[heartbeat.SlaveId] = DateTime.Now;
145        actions.AddRange(UpdateJobs(heartbeat));
146
147        if (this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
148          var availableJobs = dao.GetWaitingJobs(slave);
149          if (availableJobs.Count() > 0) {
150            actions.Add(new MessageContainer(MessageContainer.MessageType.AquireJob));
151          }
152        }
153
154        if (slave.FreeCores != heartbeat.FreeCores ||
155            slave.FreeMemory != heartbeat.FreeMemory ||
156            slave.IsAllowedToCalculate != heartbeat.IsAllowedToCalculate ||
157            slave.SlaveState != (heartbeat.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle)) { // only update slave when something changed, to avoid unnecessary updates
158          slave.FreeCores = heartbeat.FreeCores;
159          slave.FreeMemory = heartbeat.FreeMemory;
160          slave.IsAllowedToCalculate = heartbeat.IsAllowedToCalculate;
161          slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
162          dao.UpdateSlave(slave);
163        }
164      }
165      return actions;
166    }
167
168    /// <summary>
169    /// Update the progress of each job
170    /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
171    /// </summary>
172    private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
173      List<MessageContainer> actions = new List<MessageContainer>();
174
175      if (heartbeat.JobProgress == null)
176        return actions;
177
178      // process the jobProgresses
179      foreach (var jobProgress in heartbeat.JobProgress) {
180        Job curJob = dao.GetJob(jobProgress.Key);
181        if (curJob == null) {
182          // job does not exist in db
183          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
184          Logger.Error("Job does not exist in DB: " + jobProgress.Key);
185        } else {
186          if (curJob.SlaveId == Guid.Empty || curJob.SlaveId != heartbeat.SlaveId) {
187            // assigned slave does not match heartbeat
188            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
189            Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
190          } else {
191            // save job execution time
192            curJob.ExecutionTime = jobProgress.Value;
193
194            if (curJob.JobState == JobState.Aborted) {
195              // a request to abort the job has been set
196              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
197            }
198            dao.UpdateJob(curJob);
199          }
200        }
201      }
202      return actions;
203    }
204
205    /// <summary>
206    /// Returns true if there are enough resources to send a job
207    /// There should not be too many jobs sent simultaniously
208    /// </summary>
209    private bool IsAllowedToSendJobs() {
210      return JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
211    }
212  }
213}
Note: See TracBrowser for help on using the repository browser.