1 | using System;
|
---|
2 | using System.Collections.Generic;
|
---|
3 | using System.Linq;
|
---|
4 | using System.Text;
|
---|
5 | using System.Transactions;
|
---|
6 | using HeuristicLab.Services.Hive.Common;
|
---|
7 | using HeuristicLab.Services.Hive.Common.DataTransfer;
|
---|
8 | using HeuristicLab.Tracing;
|
---|
9 | using HeuristicLab.Core;
|
---|
10 |
|
---|
11 | namespace HeuristicLab.Services.Hive {
|
---|
12 | /// <summary>
|
---|
13 | /// This class holds the state of all recent heartbeats and decides to reschedule jobs and set slaves offline
|
---|
14 | /// </summary>
|
---|
15 | public class LifecycleManager : ILifecycleManager {
|
---|
16 | private DataAccess.IHiveDao dao {
|
---|
17 | get { return ServiceLocator.Instance.HiveDao; }
|
---|
18 | }
|
---|
19 | private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
|
---|
20 | get { return ServiceLocator.Instance.TransactionManager; }
|
---|
21 | }
|
---|
22 | private IAuthorizationManager auth {
|
---|
23 | get { return ServiceLocator.Instance.AuthorizationManager; }
|
---|
24 | }
|
---|
25 |
|
---|
26 | private static object locker = new object();
|
---|
27 | private Dictionary<Guid, DateTime> heartbeats = new Dictionary<Guid, DateTime>();
|
---|
28 |
|
---|
29 | // Windows-Forms timer is single threaded, so callbacks will be synchron
|
---|
30 | System.Windows.Forms.Timer timer = new System.Windows.Forms.Timer();
|
---|
31 |
|
---|
32 | /// <summary>
|
---|
33 | /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent
|
---|
34 | /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.
|
---|
35 | /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.
|
---|
36 | /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.
|
---|
37 | /// </summary>
|
---|
38 | private Dictionary<Guid, int> newlyAssignedJobs = new Dictionary<Guid, int>();
|
---|
39 |
|
---|
40 | /// <summary>
|
---|
41 | /// Counts how many jobs are currently beeing transferred.
|
---|
42 | /// </summary>
|
---|
43 | private int jobsCurrentlyTransfering = 0;
|
---|
44 | public int JobsCurrentlyTransferring {
|
---|
45 | get { return jobsCurrentlyTransfering; }
|
---|
46 | set {
|
---|
47 | if (jobsCurrentlyTransfering != value) {
|
---|
48 | jobsCurrentlyTransfering = value;
|
---|
49 | Logger.Info("JobsCurrentlyTransfering: " + jobsCurrentlyTransfering);
|
---|
50 | }
|
---|
51 | }
|
---|
52 | }
|
---|
53 |
|
---|
54 | public ExecutionState ExecutionState {
|
---|
55 | get { return timer.Enabled ? ExecutionState.Started : Core.ExecutionState.Stopped; }
|
---|
56 | }
|
---|
57 |
|
---|
58 | public LifecycleManager() { }
|
---|
59 |
|
---|
60 | public void Start() {
|
---|
61 | if (ExecutionState == Core.ExecutionState.Stopped) {
|
---|
62 | this.timer.Interval = (int)new TimeSpan(0, 0, 10).TotalMilliseconds;
|
---|
63 | this.timer.Tick += new EventHandler(timer_Tick);
|
---|
64 | this.timer.Start();
|
---|
65 | AddAllSlavesToHeartbeats();
|
---|
66 | }
|
---|
67 | }
|
---|
68 |
|
---|
69 | public void Stop() {
|
---|
70 | if (ExecutionState == Core.ExecutionState.Started) {
|
---|
71 | timer.Stop();
|
---|
72 | }
|
---|
73 | }
|
---|
74 |
|
---|
75 | /// <summary>
|
---|
76 | // add all slaves to hearbeats-collection and give them some time to say hello (HEARTBEAT_TIMEOUT)
|
---|
77 | // otherwise alls the slaves jobs would be aborted immediately, which is not desirable if the server has just been restarted
|
---|
78 | /// </summary>
|
---|
79 | private void AddAllSlavesToHeartbeats() {
|
---|
80 | lock (locker) {
|
---|
81 | using (trans.OpenTransaction()) {
|
---|
82 | Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
|
---|
83 | foreach (Guid slaveId in slaveIds) {
|
---|
84 | if (!heartbeats.ContainsKey(slaveId)) {
|
---|
85 | heartbeats.Add(slaveId, DateTime.Now);
|
---|
86 | }
|
---|
87 | }
|
---|
88 | }
|
---|
89 | }
|
---|
90 | }
|
---|
91 |
|
---|
92 | /// <summary>
|
---|
93 | /// This method is supposed to check if slaves are online
|
---|
94 | /// if not -> set them offline and check if they where calculating a job
|
---|
95 | /// </summary>
|
---|
96 | void timer_Tick(object sender, EventArgs e) {
|
---|
97 | lock (locker) {
|
---|
98 | using (trans.OpenTransaction()) {
|
---|
99 | Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
|
---|
100 | foreach (Guid slaveId in slaveIds) {
|
---|
101 | if (SlaveTimedOut(slaveId)) {
|
---|
102 | var slave = dao.GetSlave(slaveId);
|
---|
103 | if (slave.SlaveState != SlaveState.Offline) {
|
---|
104 | AbortJobs(slaveId);
|
---|
105 | slave.SlaveState = SlaveState.Offline;
|
---|
106 | dao.UpdateSlave(slave);
|
---|
107 | }
|
---|
108 | heartbeats.Remove(slaveId);
|
---|
109 | }
|
---|
110 | }
|
---|
111 | }
|
---|
112 | }
|
---|
113 | }
|
---|
114 |
|
---|
115 | private bool SlaveTimedOut(Guid slaveId) {
|
---|
116 | if (!heartbeats.ContainsKey(slaveId))
|
---|
117 | return true;
|
---|
118 |
|
---|
119 | if ((DateTime.Now - heartbeats[slaveId]).TotalSeconds > ApplicationConstants.HeartbeatTimeout) {
|
---|
120 | return true;
|
---|
121 | }
|
---|
122 |
|
---|
123 | return false;
|
---|
124 | }
|
---|
125 |
|
---|
126 | private void AbortJobs(Guid slaveId) {
|
---|
127 | var jobs = dao.GetJobs(x => x.Slave.ResourceId == slaveId);
|
---|
128 | foreach (var j in jobs) {
|
---|
129 | j.JobState = JobState.Waiting;
|
---|
130 | dao.UpdateJob(j);
|
---|
131 | }
|
---|
132 | }
|
---|
133 |
|
---|
134 | /// <summary>
|
---|
135 | /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
|
---|
136 | /// </summary>
|
---|
137 | /// <returns>a list of actions the slave should do</returns>
|
---|
138 | public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
|
---|
139 | List<MessageContainer> actions = new List<MessageContainer>();
|
---|
140 | Slave slave = dao.GetSlave(heartbeat.SlaveId);
|
---|
141 | if (slave == null) {
|
---|
142 | actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
|
---|
143 | } else {
|
---|
144 | heartbeats[heartbeat.SlaveId] = DateTime.Now;
|
---|
145 | actions.AddRange(UpdateJobs(heartbeat));
|
---|
146 |
|
---|
147 | if (this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
|
---|
148 | var availableJobs = dao.GetWaitingJobs(slave);
|
---|
149 | if (availableJobs.Count() > 0) {
|
---|
150 | actions.Add(new MessageContainer(MessageContainer.MessageType.AquireJob));
|
---|
151 | }
|
---|
152 | }
|
---|
153 |
|
---|
154 | if (slave.FreeCores != heartbeat.FreeCores ||
|
---|
155 | slave.FreeMemory != heartbeat.FreeMemory ||
|
---|
156 | slave.IsAllowedToCalculate != heartbeat.IsAllowedToCalculate ||
|
---|
157 | slave.SlaveState != (heartbeat.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle)) { // only update slave when something changed, to avoid unnecessary updates
|
---|
158 | slave.FreeCores = heartbeat.FreeCores;
|
---|
159 | slave.FreeMemory = heartbeat.FreeMemory;
|
---|
160 | slave.IsAllowedToCalculate = heartbeat.IsAllowedToCalculate;
|
---|
161 | slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
|
---|
162 | dao.UpdateSlave(slave);
|
---|
163 | }
|
---|
164 | }
|
---|
165 | return actions;
|
---|
166 | }
|
---|
167 |
|
---|
168 | /// <summary>
|
---|
169 | /// Update the progress of each job
|
---|
170 | /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
|
---|
171 | /// </summary>
|
---|
172 | private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
|
---|
173 | List<MessageContainer> actions = new List<MessageContainer>();
|
---|
174 |
|
---|
175 | if (heartbeat.JobProgress == null)
|
---|
176 | return actions;
|
---|
177 |
|
---|
178 | // process the jobProgresses
|
---|
179 | foreach (var jobProgress in heartbeat.JobProgress) {
|
---|
180 | Job curJob = dao.GetJob(jobProgress.Key);
|
---|
181 | if (curJob == null) {
|
---|
182 | // job does not exist in db
|
---|
183 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
|
---|
184 | Logger.Error("Job does not exist in DB: " + jobProgress.Key);
|
---|
185 | } else {
|
---|
186 | if (curJob.SlaveId == Guid.Empty || curJob.SlaveId != heartbeat.SlaveId) {
|
---|
187 | // assigned slave does not match heartbeat
|
---|
188 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
|
---|
189 | Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
|
---|
190 | } else {
|
---|
191 | // save job execution time
|
---|
192 | curJob.ExecutionTime = jobProgress.Value;
|
---|
193 |
|
---|
194 | if (curJob.JobState == JobState.Aborted) {
|
---|
195 | // a request to abort the job has been set
|
---|
196 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
|
---|
197 | }
|
---|
198 | dao.UpdateJob(curJob);
|
---|
199 | }
|
---|
200 | }
|
---|
201 | }
|
---|
202 | return actions;
|
---|
203 | }
|
---|
204 |
|
---|
205 | /// <summary>
|
---|
206 | /// Returns true if there are enough resources to send a job
|
---|
207 | /// There should not be too many jobs sent simultaniously
|
---|
208 | /// </summary>
|
---|
209 | private bool IsAllowedToSendJobs() {
|
---|
210 | return JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
|
---|
211 | }
|
---|
212 | }
|
---|
213 | }
|
---|