1 | using System;
2 | using System.Collections.Generic;
3 | using System.Linq;
4 | using System.Text;
5 | using System.Transactions;
6 | using HeuristicLab.Services.Hive.Common;
7 | using HeuristicLab.Services.Hive.Common.DataTransfer;
8 | using HeuristicLab.Tracing;
9 | using HeuristicLab.Core;
10 |
11 | namespace HeuristicLab.Services.Hive {
12 | /// <summary>
13 | /// This class holds the state of all recent heartbeats and decides to reschedule jobs and set slaves offline
14 | /// </summary>
15 | public class LifecycleManager : ILifecycleManager {
16 | private DataAccess.IHiveDao dao {
17 | get { return ServiceLocator.Instance.HiveDao; }
18 | }
19 | private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
20 | get { return ServiceLocator.Instance.TransactionManager; }
21 | }
22 | private IAuthorizationManager auth {
23 | get { return ServiceLocator.Instance.AuthorizationManager; }
24 | }
25 |
26 | private static object locker = new object();
27 | private Dictionary<Guid, DateTime> heartbeats = new Dictionary<Guid, DateTime>();
28 |
29 | // Windows-Forms timer is single threaded, so callbacks will be synchron
30 | System.Windows.Forms.Timer timer = new System.Windows.Forms.Timer();
31 |
32 | /// <summary>
33 | /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent
34 | /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.
35 | /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.
36 | /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.
37 | /// </summary>
38 | private Dictionary<Guid, int> newlyAssignedJobs = new Dictionary<Guid, int>();
39 |
40 | /// <summary>
41 | /// Counts how many jobs are currently beeing transferred.
42 | /// </summary>
43 | private int jobsCurrentlyTransfering = 0;
44 | public int JobsCurrentlyTransferring {
45 | get { return jobsCurrentlyTransfering; }
46 | set {
47 | if (jobsCurrentlyTransfering != value) {
48 | jobsCurrentlyTransfering = value;
49 | Logger.Info("JobsCurrentlyTransfering: " + jobsCurrentlyTransfering);
50 | }
51 | }
52 | }
53 |
54 | public ExecutionState ExecutionState {
55 | get { return timer.Enabled ? ExecutionState.Started : Core.ExecutionState.Stopped; }
56 | }
57 |
58 | public LifecycleManager() { }
59 |
60 | public void Start() {
61 | if (ExecutionState == Core.ExecutionState.Stopped) {
62 | this.timer.Interval = (int)new TimeSpan(0, 0, 10).TotalMilliseconds;
63 | this.timer.Tick += new EventHandler(timer_Tick);
64 | this.timer.Start();
65 | AddAllSlavesToHeartbeats();
66 | }
67 | }
68 |
69 | public void Stop() {
70 | if (ExecutionState == Core.ExecutionState.Started) {
71 | timer.Stop();
72 | }
73 | }
74 |
75 | /// <summary>
76 | // add all slaves to hearbeats-collection and give them some time to say hello (HEARTBEAT_TIMEOUT)
77 | // otherwise alls the slaves jobs would be aborted immediately, which is not desirable if the server has just been restarted
78 | /// </summary>
79 | private void AddAllSlavesToHeartbeats() {
80 | lock (locker) {
81 | using (trans.OpenTransaction()) {
82 | Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
83 | foreach (Guid slaveId in slaveIds) {
84 | if (!heartbeats.ContainsKey(slaveId)) {
85 | heartbeats.Add(slaveId, DateTime.Now);
86 | }
87 | }
88 | }
89 | }
90 | }
91 |
92 | /// <summary>
93 | /// This method is supposed to check if slaves are online
94 | /// if not -> set them offline and check if they where calculating a job
95 | /// </summary>
96 | void timer_Tick(object sender, EventArgs e) {
97 | lock (locker) {
98 | using (trans.OpenTransaction()) {
99 | Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
100 | foreach (Guid slaveId in slaveIds) {
101 | if (SlaveTimedOut(slaveId)) {
102 | var slave = dao.GetSlave(slaveId);
103 | if (slave.SlaveState != SlaveState.Offline) {
104 | AbortJobs(slaveId);
105 | slave.SlaveState = SlaveState.Offline;
106 | dao.UpdateSlave(slave);
107 | }
108 | heartbeats.Remove(slaveId);
109 | }
110 | }
111 | }
112 | }
113 | }
114 |
115 | private bool SlaveTimedOut(Guid slaveId) {
116 | if (!heartbeats.ContainsKey(slaveId))
117 | return true;
118 |
119 | if ((DateTime.Now - heartbeats[slaveId]).TotalSeconds > ApplicationConstants.HeartbeatTimeout) {
120 | return true;
121 | }
122 |
123 | return false;
124 | }
125 |
126 | private void AbortJobs(Guid slaveId) {
127 | var jobs = dao.GetJobs(x => x.Slave.ResourceId == slaveId);
128 | foreach (var j in jobs) {
129 | j.JobState = JobState.Waiting;
130 | dao.UpdateJob(j);
131 | }
132 | }
133 |
134 | /// <summary>
135 | /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
136 | /// </summary>
137 | /// <returns>a list of actions the slave should do</returns>
138 | public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
139 | List<MessageContainer> actions = new List<MessageContainer>();
140 | Slave slave = dao.GetSlave(heartbeat.SlaveId);
141 | if (slave == null) {
142 | actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
143 | } else {
144 | heartbeats[heartbeat.SlaveId] = DateTime.Now;
145 | actions.AddRange(UpdateJobs(heartbeat));
146 |
147 | if (this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
148 | var availableJobs = dao.GetWaitingJobs(slave);
149 | if (availableJobs.Count() > 0) {
150 | actions.Add(new MessageContainer(MessageContainer.MessageType.AquireJob));
151 | }
152 | }
153 |
154 | if (slave.FreeCores != heartbeat.FreeCores ||
155 | slave.FreeMemory != heartbeat.FreeMemory ||
156 | slave.IsAllowedToCalculate != heartbeat.IsAllowedToCalculate ||
157 | slave.SlaveState != (heartbeat.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle)) { // only update slave when something changed, to avoid unnecessary updates
158 | slave.FreeCores = heartbeat.FreeCores;
159 | slave.FreeMemory = heartbeat.FreeMemory;
160 | slave.IsAllowedToCalculate = heartbeat.IsAllowedToCalculate;
161 | slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
162 | dao.UpdateSlave(slave);
163 | }
164 | }
165 | return actions;
166 | }
167 |
168 | /// <summary>
169 | /// Update the progress of each job
170 | /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
171 | /// </summary>
172 | private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
173 | List<MessageContainer> actions = new List<MessageContainer>();
174 |
175 | if (heartbeat.JobProgress == null)
176 | return actions;
177 |
178 | // process the jobProgresses
179 | foreach (var jobProgress in heartbeat.JobProgress) {
180 | Job curJob = dao.GetJob(jobProgress.Key);
181 | if (curJob == null) {
182 | // job does not exist in db
183 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
184 | Logger.Error("Job does not exist in DB: " + jobProgress.Key);
185 | } else {
186 | if (curJob.SlaveId == Guid.Empty || curJob.SlaveId != heartbeat.SlaveId) {
187 | // assigned slave does not match heartbeat
188 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
189 | Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
190 | } else {
191 | // save job execution time
192 | curJob.ExecutionTime = jobProgress.Value;
193 |
194 | if (curJob.JobState == JobState.Aborted) {
195 | // a request to abort the job has been set
196 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
197 | }
198 | dao.UpdateJob(curJob);
199 | }
200 | }
201 | }
202 | return actions;
203 | }
204 |
205 | /// <summary>
206 | /// Returns true if there are enough resources to send a job
207 | /// There should not be too many jobs sent simultaniously
208 | /// </summary>
209 | private bool IsAllowedToSendJobs() {
210 | return JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
211 | }
212 | }
213 | }