Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive/sources/HeuristicLab.Hive.New/HeuristicLab.Services.Hive/3.3/Hive.cs @ 5028

Last change on this file since 5028 was 5028, checked in by cneumuel, 13 years ago

#1233 cleanup

File size: 8.2 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using System.Transactions;
6using HeuristicLab.Services.Hive.Common;
7using HeuristicLab.Services.Hive.Common.DataTransfer;
8using HeuristicLab.Tracing;
9
10namespace HeuristicLab.Services.Hive {
11  /// <summary>
12  /// This class holds the state of all recent heartbeats and decides to reschedule jobs and set slaves offline
13  /// </summary>
14  public class Hive {
15    private DataAccess.IHiveDao dao {
16      get { return ServiceLocator.Instance.HiveDao; }
17    }
18    private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
19      get { return ServiceLocator.Instance.TransactionManager; }
20    }
21    private IAuthorizationManager auth {
22      get { return ServiceLocator.Instance.AuthorizationManager; }
23    }
24
25    private static object locker = new object();
26    private Dictionary<Guid, DateTime> heartbeats = new Dictionary<Guid, DateTime>();
27
28    // Windows-Forms timer is single threaded, so callbacks will be synchron
29    System.Windows.Forms.Timer timer = new System.Windows.Forms.Timer();
30
31    /// <summary>
32    /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent
33    /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.
34    /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.
35    /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.
36    /// </summary>
37    private Dictionary<Guid, int> newlyAssignedJobs = new Dictionary<Guid, int>();
38
39    /// <summary>
40    /// Counts how many jobs are currently beeing transferred.
41    /// </summary>
42    private int jobsCurrentlyTransfering = 0;
43    public int JobsCurrentlyTransferring {
44      get { return jobsCurrentlyTransfering; }
45      set {
46        if (jobsCurrentlyTransfering != value) {
47          jobsCurrentlyTransfering = value;
48          Logger.Info("JobsCurrentlyTransfering: " + jobsCurrentlyTransfering);
49        }
50      }
51    }
52
53    public Hive() {
54      this.timer.Interval = (int)new TimeSpan(0, 0, 10).TotalMilliseconds;
55      this.timer.Tick += new EventHandler(timer_Tick);
56      this.timer.Start();
57      AddAllSlavesToHeartbeats();
58    }
59
60    public void Shutdown() {
61      timer.Stop();
62    }
63
64    /// <summary>
65    // add all slaves to hearbeats-collection and give them some time to say hello (HEARTBEAT_TIMEOUT)
66    // otherwise alls the slaves jobs would be aborted immediately, which is not desirable if the server has just been restarted
67    /// </summary>
68    private void AddAllSlavesToHeartbeats() {
69      lock (locker) {
70        using (trans.OpenTransaction()) {
71          Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
72          foreach (Guid slaveId in slaveIds) {
73            if (!heartbeats.ContainsKey(slaveId)) {
74              heartbeats.Add(slaveId, DateTime.Now);
75            }
76          }
77        }
78      }
79    }
80
81    /// <summary>
82    /// This method is supposed to check if slaves are online
83    /// if not -> set them offline and check if they where calculating a job
84    /// </summary>
85    void timer_Tick(object sender, EventArgs e) {
86      lock (locker) {
87        using (trans.OpenTransaction()) {
88          Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
89          foreach (Guid slaveId in slaveIds) {
90            if (SlaveTimedOut(slaveId)) {
91              var slave = dao.GetSlave(slaveId);
92              if (slave.SlaveState != SlaveState.Offline) {
93                AbortJobs(slaveId);
94                slave.SlaveState = SlaveState.Offline;
95                dao.UpdateSlave(slave);
96              }
97              heartbeats.Remove(slaveId);
98            }
99          }
100        }
101      }
102    }
103
104    private bool SlaveTimedOut(Guid slaveId) {
105      if (!heartbeats.ContainsKey(slaveId))
106        return true;
107
108      if ((DateTime.Now - heartbeats[slaveId]).TotalSeconds > ApplicationConstants.HeartbeatTimeout) {
109        return true;
110      }
111
112      return false;
113    }
114
115    private void AbortJobs(Guid slaveId) {
116      var jobs = dao.GetJobs(x => x.Slave.ResourceId == slaveId);
117      foreach (var j in jobs) {
118        j.JobState = JobState.Waiting;
119        dao.UpdateJob(j);
120      }
121    }
122
123    /// <summary>
124    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
125    /// </summary>
126    /// <returns>a list of actions the slave should do</returns>
127    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
128      List<MessageContainer> actions = new List<MessageContainer>();
129      Slave slave = dao.GetSlave(heartbeat.SlaveId);
130      if (slave == null) {
131        actions.Add(new MessageContainer(MessageContainer.MessageType.AddSlaveInfo));
132      } else {
133        heartbeats[heartbeat.SlaveId] = DateTime.Now;
134        actions.AddRange(UpdateJobs(heartbeat));
135
136        //check if new Cal must be loaded
137        if (slave.CalendarSyncState == CalendarState.Fetch || slave.CalendarSyncState == CalendarState.ForceFetch) {
138          actions.Add(new MessageContainer(MessageContainer.MessageType.UpdateCalendar));
139        }
140
141        if (this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
142          var availableJobs = dao.GetWaitingJobs(slave);
143          if (availableJobs.Count() > 0) {
144            actions.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
145          }
146        }
147
148        if (slave.FreeCores != heartbeat.FreeCores ||
149            slave.FreeMemory != heartbeat.FreeMemory ||
150            slave.IsAllowedToCalculate != heartbeat.IsAllowedToCalculate ||
151            slave.SlaveState != (heartbeat.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle)) { // only update slave when something changed, to avoid unnecessary updates
152          slave.FreeCores = heartbeat.FreeCores;
153          slave.FreeMemory = heartbeat.FreeMemory;
154          slave.IsAllowedToCalculate = heartbeat.IsAllowedToCalculate;
155          slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
156          dao.UpdateSlave(slave);
157        }
158      }
159      return actions;
160    }
161
162    /// <summary>
163    /// Update the progress of each job
164    /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
165    /// </summary>
166    private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
167      List<MessageContainer> actions = new List<MessageContainer>();
168
169      if (heartbeat.JobProgress == null)
170        return actions;
171
172      // process the jobProgresses
173      foreach (var jobProgress in heartbeat.JobProgress) {
174        Job curJob = dao.GetJob(jobProgress.Key);
175        if (curJob == null) {
176          // job does not exist in db
177          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
178          Logger.Error("Job does not exist in DB: " + jobProgress.Key);
179        } else {
180          if (curJob.SlaveId == Guid.Empty || curJob.SlaveId != heartbeat.SlaveId) {
181            // assigned slave does not match heartbeat
182            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
183            Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
184          } else {
185            // save job execution time
186            curJob.ExecutionTime = jobProgress.Value;
187
188            if (curJob.JobState == JobState.Aborted) {
189              // a request to abort the job has been set
190              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
191            }
192            dao.UpdateJob(curJob);
193          }
194        }
195      }
196      return actions;
197    }
198
199    /// <summary>
200    /// Returns true if there are enough resources to send a job
201    /// There should not be too many jobs sent simultaniously
202    /// </summary>
203    private bool IsAllowedToSendJobs() {
204      return JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
205    }
206  }
207}
Note: See TracBrowser for help on using the repository browser.