Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HiveStatistics/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs @ 9598

Last change on this file since 9598 was 9434, checked in by pfleck, 12 years ago

#2030
Added SelfHost-Project
Renamed HiveDtoDao back to HiveDao and renamed the optimized HiveDao into OptimizedDao instead.
Optimized AddTask by using compiled queries.

File size: 8.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Services.Hive.DataAccess;
27using Heartbeat = HeuristicLab.Services.Hive.DataTransfer.Heartbeat;
28
29namespace HeuristicLab.Services.Hive {
30  public class HeartbeatManager {
31    private const string MutexName = "HiveTaskSchedulingMutex";
32
33    private IOptimizedHiveDao dao {
34      get { return ServiceLocator.Instance.OptimizedHiveDao; }
35    }
36    private ITaskScheduler taskScheduler {
37      get { return ServiceLocator.Instance.TaskScheduler; }
38    }
39    private DataAccess.ITransactionManager trans {
40      get { return ServiceLocator.Instance.TransactionManager; }
41    }
42
43    /// <summary>
44    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
45    /// </summary>
46    /// <returns>a list of actions the slave should do</returns>
47    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
48      List<MessageContainer> actions = new List<MessageContainer>();
49
50      Slave slave = null;
51      trans.UseTransaction(() => {
52        slave = dao.GetSlaveById(heartbeat.SlaveId);
53      });
54      if (slave == null) {
55        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
56      } else {
57        if (heartbeat.HbInterval != slave.HbInterval) {
58          actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
59        }
60        if (dao.SlaveHasToShutdownComputer(slave.ResourceId)) {
61          actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
62        }
63
64        // update slave data
65        slave.FreeCores = heartbeat.FreeCores;
66        slave.FreeMemory = heartbeat.FreeMemory;
67        slave.CpuUtilization = heartbeat.CpuUtilization;
68        slave.IsAllowedToCalculate = dao.SlaveIsAllowedToCalculate(slave.ResourceId);
69        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
70        slave.LastHeartbeat = DateTime.Now;
71
72        trans.UseTransaction(() => {
73          dao.UpdateSlave(slave);
74        });
75      }
76
77      if (slave != null) {
78        // update task data
79        actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate));
80
81        // assign new task
82        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
83          bool mutexAquired = false;
84          var mutex = new Mutex(false, MutexName);
85          try {
86
87            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
88            if (!mutexAquired)
89              LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
90            else {
91              trans.UseTransaction(() => {
92                IEnumerable<TaskInfoForScheduler> availableTasks = null;
93                availableTasks = taskScheduler.Schedule(dao.GetWaitingTasks(slave).ToArray());
94                if (availableTasks.Any()) {
95                  var task = availableTasks.First();
96                  AssignJob(slave, task.TaskId);
97                  actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
98                }
99              });
100            }
101          }
102          catch (AbandonedMutexException) {
103            LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
104          }
105          catch (Exception ex) {
106            LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());
107          }
108          finally {
109            if (mutexAquired) mutex.ReleaseMutex();
110          }
111        }
112      }
113      return actions;
114    }
115
116    private void AssignJob(Slave slave, Guid taskId) {
117      var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.ResourceId, null, null);
118
119      // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
120      task.LastHeartbeat = DateTime.Now;
121      dao.UpdateTask(task);
122    }
123
124    /// <summary>
125    /// Update the progress of each task
126    /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave
127    /// </summary>
128    private IEnumerable<MessageContainer> UpdateTasks(Heartbeat heartbeat, bool IsAllowedToCalculate) {
129      List<MessageContainer> actions = new List<MessageContainer>();
130
131      if (heartbeat.JobProgress == null)
132        return actions;
133
134      if (!IsAllowedToCalculate && heartbeat.JobProgress.Count != 0) {
135        actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll));
136      } else {
137        // process the jobProgresses
138        foreach (var jobProgress in heartbeat.JobProgress) {
139          Tuple<Task, Guid?> taskWithLastStateLogSlaveId = null;
140          trans.UseTransaction(() => {
141            taskWithLastStateLogSlaveId = dao.GetTaskByIdAndLastStateLogSlaveId(jobProgress.Key);
142          });
143          var curTask = taskWithLastStateLogSlaveId != null ? taskWithLastStateLogSlaveId.Item1 : null;
144          if (curTask == null) {
145            // task does not exist in db
146            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
147            LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
148          } else {
149            var slaveId = taskWithLastStateLogSlaveId.Item2;
150            if (slaveId == Guid.Empty || slaveId != heartbeat.SlaveId) {
151              // assigned slave does not match heartbeat
152              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
153              LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
154            } else if (!dao.TaskIsAllowedToBeCalculatedBySlave(curTask.TaskId, heartbeat.SlaveId)) {
155              // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
156              actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
157            } else {
158              // save task execution time
159              curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds;
160              curTask.LastHeartbeat = DateTime.Now;
161
162              switch (curTask.Command) {
163                case Command.Stop:
164                  actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId));
165                  break;
166                case Command.Pause:
167                  actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
168                  break;
169                case Command.Abort:
170                  actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
171                  break;
172              }
173              trans.UseTransaction(() => {
174                dao.UpdateTask(curTask);
175              });
176            }
177          }
178        }
179      }
180      return actions;
181    }
182  }
183}
Note: See TracBrowser for help on using the repository browser.