Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
02/01/11 18:12:46 (13 years ago)
Author:
cneumuel
Message:

#1233

  • moved heartbeat timestamps of slaves and jobs into database to make server stateless
  • made slave use the right authentication ("hiveslave" instead of HL username/password)
  • moved heartbeat related methods into HeartbeatManager
  • changed signature of Service.Hello method, so all hardware related information is transferred in that method withing the Slave-object
Location:
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4
Files:
1 added
6 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/HeuristicLab.Services.Hive-3.4.csproj

    r5402 r5405  
    112112    <None Include="HeuristicLabServicesHivePlugin.cs.frame" />
    113113    <None Include="Properties\AssemblyInfo.cs.frame" />
     114    <Compile Include="HeartbeatManager.cs" />
    114115    <Compile Include="Interfaces\ILifecycleManager.cs" />
    115116    <Compile Include="Interfaces\IServiceLocator.cs" />
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/HiveService.cs

    r5404 r5405  
    2525    private ILifecycleManager lifecycleManager {
    2626      get { return ServiceLocator.Instance.LifecycleManager; }
     27    }
     28    private HeartbeatManager heartbeatManager {
     29      get { return ServiceLocator.Instance.HeartbeatManager; }
    2730    }
    2831
     
    162165
    163166    #region Login Methods
    164     public void Hello(Guid slaveId, string name, int cores, int memory) {
    165       using (trans.OpenTransaction()) {
    166         var slave = dao.GetSlave(slaveId);
     167    public void Hello(Slave slaveInfo) {
     168      using (trans.OpenTransaction()) {
     169        var slave = dao.GetSlave(slaveInfo.Id);
    167170
    168171        if (slave == null) {
    169           slave = new Slave { Id = slaveId, Name = name, Cores = cores, Memory = memory };
    170           slave.IsAllowedToCalculate = true; //a little bit to optimistic?
    171           slave.SlaveState = SlaveState.Idle;
    172           dao.AddSlave(slave);
     172          dao.AddSlave(slaveInfo);
    173173        } else {
    174           //TODO: error handling?
     174          dao.UpdateSlave(slaveInfo);
    175175        }
    176176      }
     
    191191    public List<MessageContainer> Heartbeat(Heartbeat heartbeat) {
    192192      using (trans.OpenTransaction()) {
    193         return lifecycleManager.ProcessHeartbeat(heartbeat);
     193        return heartbeatManager.ProcessHeartbeat(heartbeat);
    194194      }
    195195    }
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/Interfaces/ILifecycleManager.cs

    r5095 r5405  
    1414
    1515    void Stop();
    16 
    17     List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat);
    1816  }
    1917}
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/Interfaces/IServiceLocator.cs

    r5095 r5405  
    1212    ILifecycleManager LifecycleManager { get; }
    1313    TransactionManager TransactionManager { get; }
     14    HeartbeatManager HeartbeatManager { get; }
    1415  }
    1516}
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/LifecycleManager.cs

    r5404 r5405  
    2525
    2626    private static object locker = new object();
    27     private Dictionary<Guid, DateTime> heartbeats = new Dictionary<Guid, DateTime>();
    2827
    2928    // Windows-Forms timer is single threaded, so callbacks will be synchron
    30     System.Windows.Forms.Timer timer = new System.Windows.Forms.Timer();
     29    System.Windows.Forms.Timer timer;
    3130
    32     /// <summary>
    33     /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent
    34     /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.
    35     /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.
    36     /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.
    37     /// </summary>
    38     private Dictionary<Guid, int> newlyAssignedJobs = new Dictionary<Guid, int>();
    39 
    40     /// <summary>
    41     /// Counts how many jobs are currently beeing transferred.
    42     /// </summary>
    43     private int jobsCurrentlyTransfering = 0;
    44     public int JobsCurrentlyTransferring {
    45       get { return jobsCurrentlyTransfering; }
    46       set {
    47         if (jobsCurrentlyTransfering != value) {
    48           jobsCurrentlyTransfering = value;
    49           Logger.Info("JobsCurrentlyTransfering: " + jobsCurrentlyTransfering);
    50         }
    51       }
     31    public ExecutionState ExecutionState {
     32      get { return timer.Enabled ? ExecutionState.Started : Core.ExecutionState.Stopped; }
    5233    }
    5334
    54     public ExecutionState ExecutionState {
    55       get { return timer.Enabled ? ExecutionState.Started : Core.ExecutionState.Stopped;  }
     35    public LifecycleManager() {
     36      this.timer = new System.Windows.Forms.Timer();
     37      this.timer.Tick += new EventHandler(timer_Tick);
    5638    }
    57 
    58     public LifecycleManager() { }
    5939
    6040    public void Start() {
    6141      if (ExecutionState == Core.ExecutionState.Stopped) {
    6242        this.timer.Interval = (int)new TimeSpan(0, 0, 10).TotalMilliseconds;
    63         this.timer.Tick += new EventHandler(timer_Tick);
    6443        this.timer.Start();
    65         AddAllSlavesToHeartbeats();
    6644      }
    6745    }
     
    7452
    7553    /// <summary>
    76     // add all slaves to hearbeats-collection and give them some time to say hello (HEARTBEAT_TIMEOUT)
    77     // otherwise alls the slaves jobs would be aborted immediately, which is not desirable if the server has just been restarted
    78     /// </summary>
    79     private void AddAllSlavesToHeartbeats() {
    80       lock (locker) {
    81         using (trans.OpenTransaction()) {
    82           Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
    83           foreach (Guid slaveId in slaveIds) {
    84             if (!heartbeats.ContainsKey(slaveId)) {
    85               heartbeats.Add(slaveId, DateTime.Now);
    86             }
    87           }
    88         }
    89       }
    90     }
    91 
    92     /// <summary>
    9354    /// This method is supposed to check if slaves are online
    9455    /// if not -> set them offline and check if they where calculating a job
     
    9758      lock (locker) {
    9859        using (trans.OpenTransaction()) {
    99           Guid[] slaveIds = dao.GetSlaves(x => true).Select(s => s.Id).ToArray();
    100           foreach (Guid slaveId in slaveIds) {
    101             if (SlaveTimedOut(slaveId)) {
    102               var slave = dao.GetSlave(slaveId);
    103               if (slave.SlaveState != SlaveState.Offline) {
    104                 AbortJobs(slaveId);
    105                 slave.SlaveState = SlaveState.Offline;
    106                 dao.UpdateSlave(slave);
    107               }
    108               heartbeats.Remove(slaveId);
     60          var slaves = dao.GetSlaves(x => x.SlaveState != SlaveState.Offline);
     61          foreach (Slave slave in slaves) {
     62            if (!slave.LastHeartbeat.HasValue || (DateTime.Now - slave.LastHeartbeat.Value).TotalSeconds > ApplicationConstants.HeartbeatTimeout) {
     63              slave.SlaveState = SlaveState.Offline;
     64              AbortJobs(slave.Id);
     65              dao.UpdateSlave(slave);
    10966            }
    11067          }
    11168        }
    11269      }
    113     }
    114 
    115     private bool SlaveTimedOut(Guid slaveId) {
    116       if (!heartbeats.ContainsKey(slaveId))
    117         return true;
    118 
    119       if ((DateTime.Now - heartbeats[slaveId]).TotalSeconds > ApplicationConstants.HeartbeatTimeout) {
    120         return true;
    121       }
    122 
    123       return false;
    12470    }
    12571
     
    13177      }
    13278    }
    133 
    134     /// <summary>
    135     /// This method will be called every time a slave sends a heartbeat (-> very often; concurrency is important!)
    136     /// </summary>
    137     /// <returns>a list of actions the slave should do</returns>
    138     public List<MessageContainer> ProcessHeartbeat(Heartbeat heartbeat) {
    139       List<MessageContainer> actions = new List<MessageContainer>();
    140       Slave slave = dao.GetSlave(heartbeat.SlaveId);
    141       if (slave == null) {
    142         actions.Add(new MessageContainer(MessageContainer.MessageType.SayHello));
    143       } else {
    144         heartbeats[heartbeat.SlaveId] = DateTime.Now;
    145         actions.AddRange(UpdateJobs(heartbeat));
    146 
    147         if (this.IsAllowedToSendJobs() && slave.IsAllowedToCalculate && heartbeat.FreeCores > 0) {
    148           var availableJobs = dao.GetWaitingJobs(slave, 1);
    149           if (availableJobs.Count() > 0) {
    150             var job = availableJobs.First();
    151             actions.Add(new MessageContainer(MessageContainer.MessageType.CalculateJob, job.Id));
    152             AssignJob(slave, job);
    153           }
    154         }
    155 
    156         if (slave.FreeCores != heartbeat.FreeCores ||
    157             slave.FreeMemory != heartbeat.FreeMemory ||
    158             slave.IsAllowedToCalculate != heartbeat.IsAllowedToCalculate ||
    159             slave.SlaveState != (heartbeat.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle)) { // only update slave when something changed, to avoid unnecessary updates
    160           slave.FreeCores = heartbeat.FreeCores;
    161           slave.FreeMemory = heartbeat.FreeMemory;
    162           slave.IsAllowedToCalculate = heartbeat.IsAllowedToCalculate;
    163           slave.SlaveState = (heartbeat.JobProgress != null && heartbeat.JobProgress.Count > 0) ? SlaveState.Calculating : SlaveState.Idle;
    164           dao.UpdateSlave(slave);
    165         }
    166       }
    167       return actions;
    168     }
    169 
    170     private void AssignJob(Slave slave, Job job) {
    171       job.SlaveId = slave.Id;
    172       job.JobState = JobState.Calculating; // Todo: Maybe use State = Transferring (?)
    173       job.DateCalculated = DateTime.Now; // Todo: use statelog instead
    174       dao.UpdateJob(job);
    175       dao.UpdateSlave(slave);
    176     }
    177 
    178     /// <summary>
    179     /// Update the progress of each job
    180     /// Checks if all the jobs sent by heartbeat are supposed to be calculated by this slave
    181     /// </summary>
    182     private IEnumerable<MessageContainer> UpdateJobs(Heartbeat heartbeat) {
    183       List<MessageContainer> actions = new List<MessageContainer>();
    184 
    185       if (heartbeat.JobProgress == null)
    186         return actions;
    187 
    188       // process the jobProgresses
    189       foreach (var jobProgress in heartbeat.JobProgress) {
    190         Job curJob = dao.GetJob(jobProgress.Key);
    191         if (curJob == null) {
    192           // job does not exist in db
    193           actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
    194           Logger.Error("Job does not exist in DB: " + jobProgress.Key);
    195         } else {
    196           if (curJob.SlaveId == Guid.Empty || curJob.SlaveId != heartbeat.SlaveId) {
    197             // assigned slave does not match heartbeat
    198             actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
    199             Logger.Error("The slave " + heartbeat.SlaveId + " is not supposed to calculate Job: " + curJob);
    200           } else {
    201             // save job execution time
    202             curJob.ExecutionTime = jobProgress.Value;
    203 
    204             if (curJob.JobState == JobState.Aborted) {
    205               // a request to abort the job has been set
    206               actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
    207             }
    208             dao.UpdateJob(curJob);
    209           }
    210         }
    211       }
    212       return actions;
    213     }
    214 
    215     /// <summary>
    216     /// Returns true if there are enough resources to send a job
    217     /// There should not be too many jobs sent simultaniously
    218     /// </summary>
    219     private bool IsAllowedToSendJobs() {
    220       return JobsCurrentlyTransferring < ApplicationConstants.MaxJobTransferCount;
    221     }
    22279  }
    22380}
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive/3.4/ServiceLocator.cs

    r5095 r5405  
    4848      }
    4949    }
     50
     51    private HeartbeatManager heartbeatManager;
     52    public HeartbeatManager HeartbeatManager {
     53      get {
     54        if(heartbeatManager == null) heartbeatManager = new HeartbeatManager();
     55        return heartbeatManager;
     56      }
     57    }
    5058  }
    5159}
Note: See TracChangeset for help on using the changeset viewer.