Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HiveStatistics/sources/HeuristicLab.Services.Hive/3.3/Manager/NewHeartbeatManager.cs @ 12691

Last change on this file since 12691 was 12691, checked in by dglaser, 9 years ago

#2388:

HeuristicLab.Services.Access:
HeuristicLab.Services.Access.DataAccess:

  • Changed connection strings and certificates for local usage

HeuristicLab.Services.Hive.DataAccess:

  • Added compiled queries for frequently used queries
  • Integrated string queries from OptimizedHiveDao

HeuristicLab.Services.Hive:

  • Added NewHeartbeatManager.cs
  • Added NewRoundRobinTaskScheduler.cs
  • Added PerformanceLogger
  • Updated AuthoriziationManager
  • Updated NewHiveService
    • Added Regions
    • Implemented missing methods
    • Improved performance of several queries

HeuristicLab.Services.WebApp.Status:

  • Fixed a bug which caused an error when calculating the average waiting time.
File size: 9.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2015 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Services.Hive.DataAccess;
27using HeuristicLab.Services.Hive.DataAccess.Interfaces;
28using HeuristicLab.Services.Hive.DataTransfer;
29using DA = HeuristicLab.Services.Hive.DataAccess;
30
31namespace HeuristicLab.Services.Hive.Manager {
32  public class NewHeartbeatManager {
33    private const string MutexName = "HiveTaskSchedulingMutex";
34
35    private IPersistenceManager PersistenceManager {
36      get { return ServiceLocator.Instance.PersistenceManager; }
37    }
38
39    private ITaskScheduler TaskScheduler {
40      get { return ServiceLocator.Instance.NewTaskScheduler; }
41    }
42
43    /// <summary>
44    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
45    /// </summary>
46    /// <returns>a list of actions the slave should do</returns>
47    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
48      List<MessageContainer> actions = new List<MessageContainer>();
49      using (var pm = PersistenceManager) {
50        var slaveDao = pm.SlaveDao;
51        var taskDao = pm.TaskDao;
52
53        var slave = pm.UseTransaction(() => slaveDao.GetById(heartbeat.SlaveId));
54        if (slave == null) {
55          actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
56        } else {
57          if (heartbeat.HbInterval != slave.HbInterval) {
58            actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
59          }
60          if (slaveDao.SlaveHasToShutdownComputer(slave.ResourceId)) {
61            actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
62          }
63          // update slave data 
64          slave.FreeCores = heartbeat.FreeCores;
65          slave.FreeMemory = heartbeat.FreeMemory;
66          slave.CpuUtilization = heartbeat.CpuUtilization;
67          slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0)
68            ? DA.SlaveState.Calculating
69            : DA.SlaveState.Idle;
70          slave.LastHeartbeat = DateTime.Now;
71          pm.UseTransaction(() => {
72            slave.IsAllowedToCalculate = slaveDao.SlaveIsAllowedToCalculate(slave.ResourceId);
73            pm.SubmitChanges();
74          });
75
76          // update task data
77          actions.AddRange(UpdateTasks(pm, heartbeat, slave.IsAllowedToCalculate));
78
79          // assign new task
80          if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
81            bool mutexAquired = false;
82            var mutex = new Mutex(false, MutexName);
83            try {
84              mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
85              if (mutexAquired) {
86                var waitingTasks = pm.UseTransaction(() => taskDao.GetWaitingTasks(slave)
87                  .Select(x => new TaskInfoForScheduler {
88                    TaskId = x.TaskId,
89                    JobId = x.JobId,
90                    Priority = x.Priority
91                  })
92                  .ToList()
93                );
94                var availableTasks = TaskScheduler.Schedule(waitingTasks);
95                if (availableTasks.Any()) {
96                  var task = availableTasks.First();
97                  AssignTask(pm, slave, task.TaskId);
98                  actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
99                }
100              } else {
101                LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
102              }
103            }
104            catch (AbandonedMutexException) {
105              LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
106            }
107            catch (Exception ex) {
108              LogFactory.GetLogger(this.GetType().Namespace).Log(string.Format("HeartbeatManager threw an exception in ProcessHeartbeat: {0}", ex));
109            }
110            finally {
111              if (mutexAquired) mutex.ReleaseMutex();
112            }
113          }
114        }
115      }
116      return actions;
117    }
118
119    private void AssignTask(IPersistenceManager pm, DA.Slave slave, Guid taskId) {
120      const DA.TaskState transferring = DA.TaskState.Transferring;
121      DateTime now = DateTime.Now;
122      var taskDao = pm.TaskDao;
123      var stateLogDao = pm.StateLogDao;
124      pm.UseTransaction(() => {
125        var task = taskDao.GetById(taskId);
126        stateLogDao.Save(new DA.StateLog {
127          State = transferring,
128          DateTime = now,
129          TaskId = taskId,
130          SlaveId = slave.ResourceId,
131          UserId = null,
132          Exception = null
133        });
134        task.State = transferring;
135        task.LastHeartbeat = now;
136        pm.SubmitChanges();
137      });
138    }
139
140    /// <summary>
141    /// Update the progress of each task
142    /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave
143    /// </summary>
144    private IEnumerable<MessageContainer> UpdateTasks(IPersistenceManager pm, Heartbeat heartbeat, bool isAllowedToCalculate) {
145      var taskDao = pm.TaskDao;
146      var assignedResourceDao = pm.AssignedResourceDao;
147      var actions = new List<MessageContainer>();
148      if (heartbeat.JobProgress == null)
149        return actions;
150
151      if (!isAllowedToCalculate && heartbeat.JobProgress.Count != 0) {
152        actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll));
153      } else {
154        // select all tasks and statelogs with one query
155        var taskIds = heartbeat.JobProgress.Select(x => x.Key).ToList();
156        var taskInfos =
157          (from task in taskDao.GetAll()
158           where taskIds.Contains(task.TaskId)
159           let lastStateLog = task.StateLogs.OrderByDescending(x => x.DateTime).FirstOrDefault()
160           select new {
161             TaskId = task.TaskId,
162             Command = task.Command,
163             SlaveId = lastStateLog != null ? lastStateLog.SlaveId : default(Guid)
164           })
165           .ToList();
166
167        // process the jobProgresses
168        foreach (var jobProgress in heartbeat.JobProgress) {
169          var progress = jobProgress;
170          var curTask = taskInfos.SingleOrDefault(x => x.TaskId == progress.Key);
171          if (curTask == null) {
172            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, progress.Key));
173            LogFactory.GetLogger(this.GetType().Namespace).Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
174          } else {
175            var slaveId = curTask.SlaveId;
176            if (slaveId == Guid.Empty || slaveId != heartbeat.SlaveId) {
177              // assigned slave does not match heartbeat
178              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
179              LogFactory.GetLogger(this.GetType().Namespace).Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask.TaskId);
180            } else if (!assignedResourceDao.TaskIsAllowedToBeCalculatedBySlave(curTask.TaskId, heartbeat.SlaveId)) {
181              // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
182              actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
183            } else {
184              // update task execution time
185              pm.UseTransaction(() => {
186                taskDao.UpdateExecutionTime(curTask.TaskId, progress.Value.TotalMilliseconds);
187              });
188              switch (curTask.Command) {
189                case DA.Command.Stop:
190                  actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId));
191                  break;
192                case DA.Command.Pause:
193                  actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
194                  break;
195                case DA.Command.Abort:
196                  actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
197                  break;
198              }
199            }
200          }
201        }
202      }
203      return actions;
204    }
205  }
206}
Note: See TracBrowser for help on using the repository browser.