1 | using System;
|
---|
2 | using System.Collections.Generic;
|
---|
3 | using System.Linq;
|
---|
4 | using System.Text;
|
---|
5 | using System.Transactions;
|
---|
6 | using HeuristicLab.Services.Hive.Common;
|
---|
7 | using HeuristicLab.Services.Hive.Common.DataTransfer;
|
---|
8 | using HeuristicLab.Tracing;
|
---|
9 |
|
---|
10 | namespace HeuristicLab.Services.Hive {
|
---|
11 | /// <summary>
|
---|
12 | /// This class holds the state of all recent heartbeats and decides to reschedule jobs and set slaves offline
|
---|
13 | /// </summary>
|
---|
14 | public class Hive {
|
---|
15 | private DataAccess.IHiveDao dao {
|
---|
16 | get { return ServiceLocator.Instance.HiveDao; }
|
---|
17 | }
|
---|
18 | private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
|
---|
19 | get { return ServiceLocator.Instance.TransactionManager; }
|
---|
20 | }
|
---|
21 | private IAuthorizationManager auth {
|
---|
22 | get { return ServiceLocator.Instance.AuthorizationManager; }
|
---|
23 | }
|
---|
24 |
|
---|
25 | private static object locker = new object();
|
---|
26 | private Dictionary<Guid, DateTime> heartbeats = new Dictionary<Guid, DateTime>();
|
---|
27 |
|
---|
28 | // Windows-Forms timer is single threaded, so callbacks will be synchron
|
---|
29 | System.Windows.Forms.Timer timer = new System.Windows.Forms.Timer();
|
---|
30 |
|
---|
31 | /// <summary>
|
---|
32 | /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent
|
---|
33 | /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.
|
---|
34 | /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.
|
---|
35 | /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.
|
---|
36 | /// </summary>
|
---|
37 | private Dictionary<Guid, int> newlyAssignedJobs = new Dictionary<Guid, int>();
|
---|
38 |
|
---|
39 | /// <summary>
|
---|
40 | /// Counts how many jobs are currently beeing transferred.
|
---|
41 | /// </summary>
|
---|
42 | private int jobsCurrentlyTransfering = 0;
|
---|
43 | public int JobsCurrentlyTransferring {
|
---|
44 | get { return jobsCurrentlyTransfering; }
|
---|
45 | set {
|
---|
46 | if (jobsCurrentlyTransfering != value) {
|
---|
47 | jobsCurrentlyTransfering = value;
|
---|
48 | Logger.Info("JobsCurrentlyTransfering: " + jobsCurrentlyTransfering);
|
---|
49 | }
|
---|
50 | }
|
---|
51 | }
|
---|
52 |
|
---|
53 | public Hive() {
|
---|
54 | this.timer.Interval = (int)new TimeSpan(0, 0, 10).TotalMilliseconds;
|
---|
55 | this.timer.Tick += new EventHandler(timer_Tick);
|
---|
56 | this.timer.Start();
|
---|
57 | AddAllSlavesToHeartbeats();
|
---|
58 | }
|
---|
59 |
|
---|
60 | public void Shutdown() {
|
---|
61 | timer.Stop();
|
---|
62 | }
|
---|
63 |
|
---|
64 | /// <summary>
|
---|
65 | // add all slaves to hearbeats-collection and give them some time to say hello (HEARTBEAT_TIMEOUT)
|
---|
66 | // otherwise alls the slaves jobs would be aborted immediately, which is not desirable if the server has just been restarted
|
---|
67 | /// </summary>
|
---|
68 | private void AddAllSlavesToHeartbeats() {
|
---|
69 | lock (locker) {
|
---|
70 | using (trans.OpenTransaction()) {
|
---|
71 | Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
|
---|
72 | foreach (Guid slaveId in slaveIds) {
|
---|
73 | if (!heartbeats.ContainsKey(slaveId)) {
|
---|
74 | heartbeats.Add(slaveId, DateTime.Now);
|
---|
75 | }
|
---|
76 | }
|
---|
77 | }
|
---|
78 | }
|
---|
79 | }
|
---|
80 |
|
---|
81 | /// <summary>
|
---|
82 | /// This method is supposed to check if slaves are online
|
---|
83 | /// if not -> set them offline and check if they where calculating a job
|
---|
84 | /// </summary>
|
---|
85 | void timer_Tick(object sender, EventArgs e) {
|
---|
86 | lock (locker) {
|
---|
87 | using (trans.OpenTransaction()) {
|
---|
88 | Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
|
---|
89 | foreach (Guid slaveId in slaveIds) {
|
---|
90 | if (SlaveTimedOut(slaveId)) {
|
---|
91 | var slave = dao.GetSlave(slaveId);
|
---|
92 | if (slave.SlaveState != SlaveState.Offline) {
|
---|
93 | AbortJobs(slaveId);
|
---|
94 | slave.SlaveState = SlaveState.Offline;
|
---|
95 | dao.UpdateSlave(slave);
|
---|
96 | }
|
---|
97 | heartbeats.Remove(slaveId);
|
---|
98 | }
|
---|
99 | }
|
---|
100 | }
|
---|
101 | }
|
---|
102 | }
|
---|
103 |
|
---|
104 | private bool SlaveTimedOut(Guid slaveId) {
|
---|
105 | if (!heartbeats.ContainsKey(slaveId))
|
---|
106 | return true;
|
---|
107 |
|
---|
108 | if ((DateTime.Now - heartbeats[slaveId]).TotalSeconds > ApplicationConstants.HeartbeatTimeout) {
|
---|
109 | return true;
|
---|
110 | }
|
---|
111 |
|
---|
112 | return false;
|
---|
113 | }
|
---|
114 |
|
---|
115 | private void AbortJobs(Guid slaveId) {
|
---|
116 | var jobs = dao.GetJobs(x => x.Slave.ResourceId == slaveId);
|
---|
117 | foreach (var j in jobs) {
|
---|
118 | j.JobState = JobState.Waiting;
|
---|
119 | dao.UpdateJob(j);
|
---|
120 | }
|
---|
121 | }
|
---|
122 |
|
---|
123 | /// <summary>
|
---|
124 | /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
|
---|
125 | /// </summary>
|
---|
126 | /// <returns>a list of actions the slave should do</returns>
|
---|
127 | public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
|
---|
128 | List<MessageContainer> actions = new List<MessageContainer>();
|
---|
129 | Slave slave = dao.GetSlave(heartbeat.SlaveId);
|
---|
130 | if (slave == null) {
|
---|
131 | actions.Add(new MessageContainer(MessageContainer.MessageType.AddSlaveInfo));
|
---|
132 | } else {
|
---|
133 | heartbeats[heartbeat.SlaveId] = DateTime.Now;
|
---|
134 | actions.AddRange(UpdateJobs(heartbeat));
|
---|
135 |
|
---|
136 | //check if new Cal must be loaded
|
---|
137 | if (slave.CalendarSyncState == CalendarState.Fetch || slave.CalendarSyncState == CalendarState.ForceFetch) {
|
---|
138 | actions.Add(new MessageContainer(MessageContainer.MessageType.UpdateCalendar));
|
---|
139 | }
|
---|
140 |
|
---|
141 | if (this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
|
---|
142 | var availableJobs = dao.GetWaitingJobs(slave);
|
---|
143 | if (availableJobs.Count() > 0) {
|
---|
144 | actions.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
|
---|
145 | }
|
---|
146 | }
|
---|
147 |
|
---|
148 | if (slave.FreeCores != heartbeat.FreeCores ||
|
---|
149 | slave.FreeMemory != heartbeat.FreeMemory ||
|
---|
150 | slave.IsAllowedToCalculate != heartbeat.IsAllowedToCalculate ||
|
---|
151 | slave.SlaveState != (heartbeat.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle)) { // only update slave when something changed, to avoid unnecessary updates
|
---|
152 | slave.FreeCores = heartbeat.FreeCores;
|
---|
153 | slave.FreeMemory = heartbeat.FreeMemory;
|
---|
154 | slave.IsAllowedToCalculate = heartbeat.IsAllowedToCalculate;
|
---|
155 | slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
|
---|
156 | dao.UpdateSlave(slave);
|
---|
157 | }
|
---|
158 | }
|
---|
159 | return actions;
|
---|
160 | }
|
---|
161 |
|
---|
162 | /// <summary>
|
---|
163 | /// Update the progress of each job
|
---|
164 | /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
|
---|
165 | /// </summary>
|
---|
166 | private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
|
---|
167 | List<MessageContainer> actions = new List<MessageContainer>();
|
---|
168 |
|
---|
169 | if (heartbeat.JobProgress == null)
|
---|
170 | return actions;
|
---|
171 |
|
---|
172 | // process the jobProgresses
|
---|
173 | foreach (var jobProgress in heartbeat.JobProgress) {
|
---|
174 | Job curJob = dao.GetJob(jobProgress.Key);
|
---|
175 | if (curJob == null) {
|
---|
176 | // job does not exist in db
|
---|
177 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
|
---|
178 | Logger.Error("Job does not exist in DB: " + jobProgress.Key);
|
---|
179 | } else {
|
---|
180 | if (curJob.SlaveId == Guid.Empty || curJob.SlaveId != heartbeat.SlaveId) {
|
---|
181 | // assigned slave does not match heartbeat
|
---|
182 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
|
---|
183 | Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
|
---|
184 | } else {
|
---|
185 | // save job execution time
|
---|
186 | curJob.ExecutionTime = jobProgress.Value;
|
---|
187 |
|
---|
188 | if (curJob.JobState == JobState.Aborted) {
|
---|
189 | // a request to abort the job has been set
|
---|
190 | actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
|
---|
191 | }
|
---|
192 | dao.UpdateJob(curJob);
|
---|
193 | }
|
---|
194 | }
|
---|
195 | }
|
---|
196 | return actions;
|
---|
197 | }
|
---|
198 |
|
---|
199 | /// <summary>
|
---|
200 | /// Returns true if there are enough resources to send a job
|
---|
201 | /// There should not be too many jobs sent simultaniously
|
---|
202 | /// </summary>
|
---|
203 | private bool IsAllowedToSendJobs() {
|
---|
204 | return JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
|
---|
205 | }
|
---|
206 | }
|
---|
207 | }
|
---|