Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HivePerformance/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs @ 9381

Last change on this file since 9381 was 9381, checked in by pfleck, 11 years ago

#2030
Activated Delayed Loading for binary data.
Added HiveOperationContext to store HiveDataContext for whole ServiceOperation duration.
Added HiveDao methods to query database objects, not DTOs.
Changed HartbeatManager to use only database objects from new queries.

File size: 8.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Services.Hive.DataTransfer;
27using DA = HeuristicLab.Services.Hive.DataAccess;
28
29namespace HeuristicLab.Services.Hive {
30  public class HeartbeatManager {
31    private const string MutexName = "HiveTaskSchedulingMutex";
32
33    private IHiveDao dao {
34      get { return ServiceLocator.Instance.HiveDao; }
35    }
36    private ITaskScheduler taskScheduler {
37      get { return ServiceLocator.Instance.TaskScheduler; }
38    }
39    private DataAccess.ITransactionManager trans {
40      get { return ServiceLocator.Instance.TransactionManager; }
41    }
42
43    /// <summary>
44    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
45    /// </summary>
46    /// <returns>a list of actions the slave should do</returns>
47    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
48      List<MessageContainer> actions = new List<MessageContainer>();
49
50      DA.Slave slave = null;
51      trans.UseTransaction(() => {
52        slave = dao.GetSlaveDA(heartbeat.SlaveId);
53
54        if (slave == null) {
55          actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
56        } else {
57          if (heartbeat.HbInterval != slave.HbInterval) {
58            actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
59          }
60          if (ShutdownSlaveComputer(slave.ResourceId)) {
61            actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
62          }
63
64          // update slave data
65          slave.FreeCores = heartbeat.FreeCores;
66          slave.FreeMemory = heartbeat.FreeMemory;
67          slave.CpuUtilization = heartbeat.CpuUtilization;
68          slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.ResourceId);
69          slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? DA.SlaveState.Calculating : DA.SlaveState.Idle;
70          slave.LastHeartbeat = DateTime.Now;
71
72          dao.UpdateSlaveDA(slave);
73        }
74      });
75
76      // update task data
77      actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate));
78
79      // assign new task
80      if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
81        bool mutexAquired = false;
82        var mutex = new Mutex(false, MutexName);
83        try {
84
85          mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
86          if (!mutexAquired)
87            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
88          else {
89            trans.UseTransaction(() => {
90              IEnumerable<TaskInfoForScheduler> availableTasks = null;
91              availableTasks = taskScheduler.Schedule(dao.GetWaitingTasksDA(slave));
92              if (availableTasks.Any()) {
93                var task = availableTasks.First();
94                AssignJob(slave, task.TaskId);
95                actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
96              }
97            });
98          }
99        }
100        catch (AbandonedMutexException) {
101          DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
102        }
103        catch (Exception ex) {
104          DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());
105        }
106        finally {
107          if (mutexAquired) mutex.ReleaseMutex();
108        }
109      }
110      return actions;
111    }
112
113    private void AssignJob(DA.Slave slave, Guid taskId) {
114      var task = dao.UpdateTaskStateDA(taskId, DataAccess.TaskState.Transferring, slave.ResourceId, null, null);
115
116      // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
117      task.LastHeartbeat = DateTime.Now;
118      dao.UpdateTaskDA(task);
119    }
120
121    /// <summary>
122    /// Update the progress of each task
123    /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave
124    /// </summary>
125    private IEnumerable<MessageContainer> UpdateTasks(Heartbeat heartbeat, bool IsAllowedToCalculate) {
126      List<MessageContainer> actions = new List<MessageContainer>();
127
128      if (heartbeat.JobProgress == null)
129        return actions;
130
131      if (!IsAllowedToCalculate && heartbeat.JobProgress.Count != 0) {
132        actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll));
133      } else {
134        trans.UseTransaction(() => {
135          // process the jobProgresses
136          foreach (var jobProgress in heartbeat.JobProgress) {
137            var curTask = dao.GetTaskDA(jobProgress.Key);
138            if (curTask == null) {
139              // task does not exist in db
140              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
141              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
142            } else {
143              var currentStateLog = curTask.StateLogs.LastOrDefault();
144              if (currentStateLog.SlaveId == Guid.Empty || currentStateLog.SlaveId != heartbeat.SlaveId) {
145                // assigned slave does not match heartbeat
146                actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
147                DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
148              } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) {
149                // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
150                actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
151              } else {
152                // save task execution time
153                curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds;
154                curTask.LastHeartbeat = DateTime.Now;
155
156                switch (curTask.Command) {
157                  case DA.Command.Stop:
158                    actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId));
159                    break;
160                  case DA.Command.Pause:
161                    actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
162                    break;
163                  case DA.Command.Abort:
164                    actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
165                    break;
166                }
167                dao.UpdateTaskDA(curTask);
168              }
169            }
170          }
171        });
172      }
173      return actions;
174    }
175
176    private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, DA.Task curTask) {
177      var assignedResourceIds = curTask.AssignedResources.Select(ar => ar.Resource.ResourceId);
178      var slaveResourceIds = dao.GetParentResourcesDA(slaveId).Select(x => x.ResourceId);
179      return assignedResourceIds.Any(r => slaveResourceIds.Contains(r));
180    }
181
182    private bool SlaveIsAllowedToCalculate(Guid slaveId) {
183      var parentResources = dao.GetParentResourcesDA(slaveId);
184      // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also
185      return parentResources.All(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count() == 0);
186    }
187
188    private bool ShutdownSlaveComputer(Guid slaveId) {
189      var parentResources = dao.GetParentResourcesDA(slaveId);
190
191      return parentResources.Any(r => r.Downtimes.Where(d => d.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= d.StartDate) && (DateTime.Now <= d.EndDate)).Count() != 0);
192    }
193  }
194}
Note: See TracBrowser for help on using the repository browser.