Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HiveStatistics/sources/HeuristicLab.Services.Hive/3.3/Manager/HeartbeatManager.cs @ 12773

Last change on this file since 12773 was 12773, checked in by dglaser, 9 years ago

#2388:

HeuristicLab.Services.WebApp-3.3:

  • The font is now loaded properly even when accessing the page over https
  • Added no-cache to the index page
  • Added no-cache to the views in addition to the existing (datetime) cache buster
    • Caching can be enabled when required

HeuristicLab.Services.Hive-3.3:

  • Improved performance of NewHeartbeatManager by adding an additional check if the collection is empty

HeuristicLab.Services.WebApp.Statistics-3.3:

  • Removed invalid link from the exception page

Projects:

  • Added the '-3.3' affix to the assembly names
File size: 8.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2015 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Services.Hive.DataAccess;
27using HeuristicLab.Services.Hive.DataAccess.Interfaces;
28using Heartbeat = HeuristicLab.Services.Hive.DataTransfer.Heartbeat;
29
30namespace HeuristicLab.Services.Hive {
31  public class HeartbeatManager {
32    private const string MutexName = "HiveTaskSchedulingMutex";
33
34    private IOptimizedHiveDao dao {
35      get { return ServiceLocator.Instance.OptimizedHiveDao; }
36    }
37    private ITaskScheduler taskScheduler {
38      get { return ServiceLocator.Instance.TaskScheduler; }
39    }
40    private ITransactionManager trans {
41      get { return ServiceLocator.Instance.TransactionManager; }
42    }
43
44    /// <summary>
45    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
46    /// </summary>
47    /// <returns>a list of actions the slave should do</returns>
48    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
49
50      List<MessageContainer> actions = new List<MessageContainer>();
51      Slave slave = null;
52      trans.UseTransaction(() => {
53        slave = dao.GetSlaveById(heartbeat.SlaveId);
54      });
55      if (slave == null) {
56        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
57      } else {
58        if (heartbeat.HbInterval != slave.HbInterval) {
59          actions.Add(new MessageContainer(MessageContainer.MessageType.NewHBInterval));
60        }
61        if (dao.SlaveHasToShutdownComputer(slave.ResourceId)) {
62          actions.Add(new MessageContainer(MessageContainer.MessageType.ShutdownComputer));
63        }
64        // update slave data
65        slave.FreeCores = heartbeat.FreeCores;
66        slave.FreeMemory = heartbeat.FreeMemory;
67        slave.CpuUtilization = heartbeat.CpuUtilization;
68        slave.IsAllowedToCalculate = dao.SlaveIsAllowedToCalculate(slave.ResourceId);
69        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
70        slave.LastHeartbeat = DateTime.Now;
71
72        trans.UseTransaction(() => {
73          dao.UpdateSlave(slave);
74        });
75
76        // update task data
77        actions.AddRange(UpdateTasks(heartbeat, slave.IsAllowedToCalculate));
78
79        // assign new task
80        if (heartbeat.AssignJob && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
81          bool mutexAquired = false;
82          var mutex = new Mutex(false, MutexName);
83          try {
84
85            mutexAquired = mutex.WaitOne(Properties.Settings.Default.SchedulingPatience);
86            if (!mutexAquired)
87              LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling could not be aquired.");
88            else {
89              trans.UseTransaction(() => {
90                IEnumerable<TaskInfoForScheduler> availableTasks = null;
91                availableTasks = taskScheduler.Schedule(dao.GetWaitingTasks(slave).ToArray());
92                if (availableTasks.Any()) {
93                  var task = availableTasks.First();
94                  AssignTask(slave, task.TaskId);
95                  actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateTask, task.TaskId));
96                }
97              });
98            }
99          }
100          catch (AbandonedMutexException) {
101            LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager: The mutex used for scheduling has been abandoned.");
102          }
103          catch (Exception ex) {
104            LogFactory.GetLogger(this.GetType().Namespace).Log("HeartbeatManager threw an exception in ProcessHeartbeat: " + ex.ToString());
105          }
106          finally {
107            if (mutexAquired) mutex.ReleaseMutex();
108          }
109        }
110      }
111      return actions;
112    }
113
114    private void AssignTask(Slave slave, Guid taskId) {
115      var task = dao.UpdateTaskState(taskId, TaskState.Transferring, slave.ResourceId, null, null);
116
117      // from now on the task has some time to send the next heartbeat (ApplicationConstants.TransferringJobHeartbeatTimeout)
118      task.LastHeartbeat = DateTime.Now;
119      dao.UpdateTask(task);
120    }
121
122    /// <summary>
123    /// Update the progress of each task
124    /// Checks if all the task sent by heartbeat are supposed to be calculated by this slave
125    /// </summary>
126    private IEnumerable<MessageContainer> UpdateTasks(Heartbeat heartbeat, bool IsAllowedToCalculate) {
127      using (new PerformanceLogger("Old_UpdateTasks")) {
128        List<MessageContainer> actions = new List<MessageContainer>();
129
130        if (heartbeat.JobProgress == null)
131          return actions;
132
133        if (!IsAllowedToCalculate && heartbeat.JobProgress.Count != 0) {
134          actions.Add(new MessageContainer(MessageContainer.MessageType.PauseAll));
135        } else {
136          // process the jobProgresses
137          foreach (var jobProgress in heartbeat.JobProgress) {
138            Tuple<Task, Guid?> taskWithLastStateLogSlaveId = null;
139            trans.UseTransaction(() => {
140              taskWithLastStateLogSlaveId =
141                dao.GetTaskByIdAndLastStateLogSlaveId(jobProgress.Key);
142            });
143            var curTask = taskWithLastStateLogSlaveId != null ? taskWithLastStateLogSlaveId.Item1 : null;
144            if (curTask == null) {
145              // task does not exist in db
146              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, jobProgress.Key));
147              LogFactory.GetLogger(this.GetType().Namespace)
148                .Log("Task on slave " + heartbeat.SlaveId + " does not exist in DB: " + jobProgress.Key);
149            } else {
150              var slaveId = taskWithLastStateLogSlaveId.Item2;
151              if (slaveId == Guid.Empty || slaveId != heartbeat.SlaveId) {
152                // assigned slave does not match heartbeat
153                actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
154                LogFactory.GetLogger(this.GetType().Namespace)
155                  .Log("The slave " + heartbeat.SlaveId + " is not supposed to calculate task: " + curTask);
156              } else if (!dao.TaskIsAllowedToBeCalculatedBySlave(curTask.TaskId, heartbeat.SlaveId)) {
157                // assigned resources ids of task do not match with slaveId (and parent resourceGroupIds); this might happen when slave is moved to different group
158                actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
159              } else {
160                // save task execution time
161                curTask.ExecutionTimeMs = jobProgress.Value.TotalMilliseconds;
162                curTask.LastHeartbeat = DateTime.Now;
163
164                switch (curTask.Command) {
165                  case Command.Stop:
166                    actions.Add(new MessageContainer(MessageContainer.MessageType.StopTask, curTask.TaskId));
167                    break;
168                  case Command.Pause:
169                    actions.Add(new MessageContainer(MessageContainer.MessageType.PauseTask, curTask.TaskId));
170                    break;
171                  case Command.Abort:
172                    actions.Add(new MessageContainer(MessageContainer.MessageType.AbortTask, curTask.TaskId));
173                    break;
174                }
175                trans.UseTransaction(() => {
176                  dao.UpdateTask(curTask);
177                });
178              }
179            }
180          }
181        }
182        return actions;
183      }
184    }
185  }
186}
Note: See TracBrowser for help on using the repository browser.