Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/HeartbeatManager.cs @ 5597

Last change on this file since 5597 was 5511, checked in by cneumuel, 14 years ago

#1233

  • added StateLog to log state transitions of hive jobs
  • added permissions to hive experiments (in data access layer, no UI for that yet)
  • extended unit tests
File size: 4.6 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using HeuristicLab.Services.Hive.Common;
5using HeuristicLab.Services.Hive.Common.DataTransfer;
6using HeuristicLab.Tracing;
7
8namespace HeuristicLab.Services.Hive {
9  public class HeartbeatManager {
10    private DataAccess.IHiveDao dao {
11      get { return ServiceLocator.Instance.HiveDao; }
12    }
13    private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
14      get { return ServiceLocator.Instance.TransactionManager; }
15    }
16    private IAuthorizationManager auth {
17      get { return ServiceLocator.Instance.AuthorizationManager; }
18    }
19
20    /// <summary>
21    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
22    /// </summary>
23    /// <returns>a list of actions the slave should do</returns>
24    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
25      List<MessageContainer> actions = new List<MessageContainer>();
26      Slave slave = dao.GetSlave(heartbeat.SlaveId);
27      if (slave == null) {
28        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
29      } else {
30        // update slave data
31        slave.FreeCores = heartbeat.FreeCores;
32        slave.FreeMemory = heartbeat.FreeMemory;
33        slave.IsAllowedToCalculate = true; // Todo: look into calendar
34        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
35        slave.LastHeartbeat = DateTime.Now;
36        dao.UpdateSlave(slave);
37
38        // update job data
39        actions.AddRange(UpdateJobs(heartbeat));
40
41        // assign new job
42        if (heartbeat.AssignJob && this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
43          var availableJobs = dao.GetWaitingJobs(slave, 1);
44          if (availableJobs.Count() > 0) {
45            var job = availableJobs.First();
46            actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateJob, job.Id));
47            AssignJob(slave, job);
48          }
49        }
50      }
51      return actions;
52    }
53
54    private void AssignJob(Slave slave, Job job) {
55      job.SetState(JobState.Transferring, slave.Id, "");
56      dao.UpdateJob(job);
57      dao.UpdateSlave(slave);
58    }
59
60    /// <summary>
61    /// Update the progress of each job
62    /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
63    /// </summary>
64    private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
65      List<MessageContainer> actions = new List<MessageContainer>();
66
67      if (heartbeat.JobProgress == null)
68        return actions;
69
70      // process the jobProgresses
71      foreach (var jobProgress in heartbeat.JobProgress) {
72        Job curJob = dao.GetJob(jobProgress.Key);
73        if (curJob == null) {
74          // job does not exist in db
75          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
76          Logger.Error("Job does not exist in DB: " + jobProgress.Key);
77        } else {
78          if (curJob.CurrentStateLog.SlaveId == Guid.Empty || curJob.CurrentStateLog.SlaveId != heartbeat.SlaveId) {
79            // assigned slave does not match heartbeat
80            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
81            Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
82          } else {
83            // save job execution time
84            curJob.ExecutionTime = jobProgress.Value;
85            curJob.LastHeartbeat = DateTime.Now;
86
87            if (curJob.State == JobState.Aborted) {
88              // a request to abort the job has been set
89              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
90            } else if (curJob.State != JobState.Calculating) {
91              // jobstate was 'Transferring' before, now calculating
92              curJob.SetState(JobState.Calculating, heartbeat.SlaveId, "");
93            }
94            dao.UpdateJob(curJob);
95          }
96        }
97      }
98      return actions;
99    }
100
101    /// <summary>
102    /// Returns true if there are enough resources to send a job
103    /// There should not be too many jobs sent simultaniously
104    /// </summary>
105    private bool IsAllowedToSendJobs() {
106      return true; // JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
107      // Todo: see if unlimited job transfer count works. if not, look into db and count jobs in state Transferring
108    }
109  }
110}
Note: See TracBrowser for help on using the repository browser.