Free cookie consent management tool by TermsFeed Policy Generator

source: branches/OaaS/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs @ 15982

Last change on this file since 15982 was 9363, checked in by spimming, 12 years ago

#1888:

  • Merged revisions from trunk
File size: 8.9 KB
RevLine 
[6983]1#region License Information
2/* HeuristicLab
[7259]3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
[6983]4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
[9363]25using System.Threading;
[6983]26using HeuristicLab.Services.Hive.DataTransfer;
27using DA = HeuristicLab.Services.Hive.DataAccess;
28
29namespace HeuristicLab.Services.Hive {
30  public class HeartbeatManager {
[9363]31    private const string MutexName = "HiveTaskSchedulingMutex";
32
[6983]33    private IHiveDao dao {
34      get { return ServiceLocator.Instance.HiveDao; }
35    }
[9363]36    private ITaskScheduler taskScheduler {
37      get { return ServiceLocator.Instance.TaskScheduler; }
[6983]38    }
[9363]39    private DataAccess.ITransactionManager trans {
40      get { return ServiceLocator.Instance.TransactionManager; }
41    }
[6983]42
43    /// <summary>
44    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
45    /// </summary>
46    /// <returns>a list of actions the slave should do</returns>
47    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
48      List<MessageContainer> actions = new List<MessageContainer>();
[9363]49      Slave slave = null;
50      slave = trans.UseTransaction(() => { return dao.GetSlave(heartbeat.SlaveId); });
51
[6983]52      if (slave == null) {
53        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
[7723]54      } else {
[7187]55        if (heartbeat.HbInterval != slave.HbInterval) {
56          actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
[6983]57        }
[9363]58        if (ShutdownSlaveComputer(slave.Id)) {
59          actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
60        }
[7723]61
[6983]62        // update slave data
63        slave.FreeCores = heartbeat.FreeCores;
64        slave.FreeMemory = heartbeat.FreeMemory;
65        slave.CpuUtilization = heartbeat.CpuUtilization;
66        slave.IsAllowedToCalculate = SlaveIsAllowedToCalculate(slave.Id);
67        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
68        slave.LastHeartbeat = DateTime.Now;
69
[9363]70        trans.UseTransaction(() => { dao.UpdateSlave(slave); });
71
[6983]72        // update task data
73        actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate));
74
75        // assign new task
76        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
[9363]77          bool mutexAquired = false;
78          var mutex = new Mutex(false, MutexName);
79          try {
80            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
81            if (!mutexAquired)
82              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
83            else {
84              IEnumerable<TaskInfoForScheduler> availableTasks = null;
85              availableTasks = trans.UseTransaction(() => { return taskScheduler.Schedule(dao.GetWaitingTasks(slave)); });
86              if (availableTasks.Any()) {
87                var task = availableTasks.First();
88                AssignJob(slave, task.TaskId);
89                actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
90              }
91            }
[6983]92          }
[9363]93          catch (AbandonedMutexException) {
94            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
95          }
96          catch (Exception ex) {
97            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());
98          }
99          finally {
100            if (mutexAquired) mutex.ReleaseMutex();
101          }
[6983]102        }
103      }
104      return actions;
105    }
106
[9363]107    private void AssignJob(Slave slave, Guid taskId) {
108      trans.UseTransaction(() => {
109        var task = dao.UpdateTaskState(taskId, DataAccess.TaskState.Transferring, slave.Id, null, null);
[6983]110
[9363]111        // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
112        task.LastHeartbeat = DateTime.Now;
113        dao.UpdateTask(task);
114      });
[6983]115    }
116
117    /// <summary>
118    /// Update the progress of each task
119    /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave
120    /// </summary>
121    private IEnumerable<MessageContainer> UpdateTasks(Heartbeat heartbeat, bool IsAllowedToCalculate) {
122      List<MessageContainer> actions = new List<MessageContainer>();
123
124      if (heartbeat.JobProgress == null)
125        return actions;
126
127      if (!IsAllowedToCalculate && heartbeat.JobProgress.Count != 0) {
128        actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll));
129      } else {
130        // process the jobProgresses
131        foreach (var jobProgress in heartbeat.JobProgress) {
[9363]132          Task curTask = null;
133          curTask = trans.UseTransaction(() => { return dao.GetTask(jobProgress.Key); });
[7723]134          if (curTask == null) {
[6983]135            // task does not exist in db
136            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
[7723]137            DA.LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
[6983]138          } else {
[7723]139            if (curTask.CurrentStateLog.SlaveId == Guid.Empty || curTask.CurrentStateLog.SlaveId != heartbeat.SlaveId) {
[6983]140              // assigned slave does not match heartbeat
[7723]141              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id));
142              DA.LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
143            } else if (!TaskIsAllowedToBeCalculatedBySlave(heartbeat.SlaveId, curTask)) {
[6983]144              // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
[7723]145              actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id));
[6983]146            } else {
147              // save task execution time
[7723]148              curTask.ExecutionTime = jobProgress.Value;
149              curTask.LastHeartbeat = DateTime.Now;
[6983]150
[7723]151              switch (curTask.Command) {
[6983]152                case Command.Stop:
[7723]153                  actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.Id));
[6983]154                  break;
155                case Command.Pause:
[7723]156                  actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.Id));
[6983]157                  break;
158                case Command.Abort:
[7723]159                  actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.Id));
[6983]160                  break;
161              }
[9363]162              trans.UseTransaction(() => { dao.UpdateTask(curTask); });
[6983]163            }
164          }
165        }
166      }
167      return actions;
168    }
169
170    private bool TaskIsAllowedToBeCalculatedBySlave(Guid slaveId, Task curTask) {
[9363]171      return trans.UseTransaction(() => {
172        var assignedResourceIds = dao.GetAssignedResources(curTask.Id).Select(x => x.Id);
173        var slaveResourceIds = dao.GetParentResources(slaveId).Select(x => x.Id);
174        return assignedResourceIds.Any(x => slaveResourceIds.Contains(x));
175      });
[6983]176    }
177
178    private bool SlaveIsAllowedToCalculate(Guid slaveId) {
179      // the slave may only calculate if there is no downtime right now. this needs to be checked for every parent resource also
[9363]180      return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).All(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Offline && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() == 0); });
[6983]181    }
[9363]182
183    private bool ShutdownSlaveComputer(Guid slaveId) {
184      return trans.UseTransaction(() => { return dao.GetParentResources(slaveId).Any(r => dao.GetDowntimes(x => x.ResourceId == r.Id && x.DowntimeType == DA.DowntimeType.Shutdown && (DateTime.Now >= x.StartDate) && (DateTime.Now <= x.EndDate)).Count() != 0); });
185    }
[6983]186  }
187}
Note: See TracBrowser for help on using the repository browser.