Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/HeartbeatManager.cs @ 5458

Last change on this file since 5458 was 5405, checked in by cneumuel, 14 years ago

#1233

  • moved heartbeat timestamps of slaves and jobs into database to make server stateless
  • made slave use the right authentication ("hiveslave" instead of HL username/password)
  • moved heartbeat related methods into HeartbeatManager
  • changed signature of Service.Hello method, so all hardware related information is transferred in that method withing the Slave-object
File size: 4.5 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using HeuristicLab.Services.Hive.Common;
6using HeuristicLab.Services.Hive.Common.DataTransfer;
7using HeuristicLab.Tracing;
8
9namespace HeuristicLab.Services.Hive {
10  public class HeartbeatManager {
11    private DataAccess.IHiveDao dao {
12      get { return ServiceLocator.Instance.HiveDao; }
13    }
14    private HeuristicLab.Services.Hive.DataAccess.TransactionManager trans {
15      get { return ServiceLocator.Instance.TransactionManager; }
16    }
17    private IAuthorizationManager auth {
18      get { return ServiceLocator.Instance.AuthorizationManager; }
19    }
20
21    /// <summary>
22    /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
23    /// </summary>
24    /// <returns>a list of actions the slave should do</returns>
25    public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
26      List<MessageContainer> actions = new List<MessageContainer>();
27      Slave slave = dao.GetSlave(heartbeat.SlaveId);
28      if (slave == null) {
29        actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
30      } else {
31        // update slave data
32        slave.FreeCores = heartbeat.FreeCores;
33        slave.FreeMemory = heartbeat.FreeMemory;
34        slave.IsAllowedToCalculate = true; // Todo: look into calendar
35        slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
36        slave.LastHeartbeat = DateTime.Now;
37        dao.UpdateSlave(slave);
38
39        // update job data
40        actions.AddRange(UpdateJobs(heartbeat));
41
42        // assign new job
43        if (heartbeat.AssignJob && this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
44          var availableJobs = dao.GetWaitingJobs(slave, 1);
45          if (availableJobs.Count() > 0) {
46            var job = availableJobs.First();
47            actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateJob, job.Id));
48            AssignJob(slave, job);
49          }
50        }
51      }
52      return actions;
53    }
54
55    private void AssignJob(Slave slave, Job job) {
56      job.SlaveId = slave.Id;
57      job.JobState = JobState.Calculating; // Todo: Maybe use State = Transferring (?)
58      job.DateCalculated = DateTime.Now; // Todo: use statelog instead
59      dao.UpdateJob(job);
60      dao.UpdateSlave(slave);
61    }
62
63    /// <summary>
64    /// Update the progress of each job
65    /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
66    /// </summary>
67    private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
68      List<MessageContainer> actions = new List<MessageContainer>();
69
70      if (heartbeat.JobProgress == null)
71        return actions;
72
73      // process the jobProgresses
74      foreach (var jobProgress in heartbeat.JobProgress) {
75        Job curJob = dao.GetJob(jobProgress.Key);
76        if (curJob == null) {
77          // job does not exist in db
78          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
79          Logger.Error("Job does not exist in DB: " + jobProgress.Key);
80        } else {
81          if (curJob.SlaveId == Guid.Empty || curJob.SlaveId != heartbeat.SlaveId) {
82            // assigned slave does not match heartbeat
83            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
84            Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
85          } else {
86            // save job execution time
87            curJob.ExecutionTime = jobProgress.Value;
88            curJob.LastHeartbeat = DateTime.Now;
89
90            if (curJob.JobState == JobState.Aborted) {
91              // a request to abort the job has been set
92              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
93            }
94            dao.UpdateJob(curJob);
95          }
96        }
97      }
98      return actions;
99    }
100
101    /// <summary>
102    /// Returns true if there are enough resources to send a job
103    /// There should not be too many jobs sent simultaniously
104    /// </summary>
105    private bool IsAllowedToSendJobs() {
106      return true; // JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
107      // Todo: see if unlimited job transfer count works. if not, look into db and count jobs in state Transferring
108    }
109  }
110}
Note: See TracBrowser for help on using the repository browser.