Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
12/13/10 14:13:15 (13 years ago)
Author:
cneumuel
Message:

#1260

  • moved all state-information into lifecycleManager
  • changed isolation level for transactions to ReadCommited
  • made currentlyFetching-status on slave more rubust
  • made LogServiceReader more rubust
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs

    r5000 r5093  
    3636using HeuristicLab.Hive.Tracing;
    3737using HeuristicLab.PluginInfrastructure.Manager;
     38using System.Web;
     39using System.Web.SessionState;
    3840
    3941namespace HeuristicLab.Hive.Server.Core {
     
    4244  /// </summary>
    4345  public class SlaveCommunicator : ISlaveCommunicator, IInternalSlaveCommunicator {
    44     private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>();
    45 
    46     /// <summary>
    47     /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent
    48     /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.
    49     /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.
    50     /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.
    51     /// </summary>
    52     private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>();
    53 
    54     /// <summary>
    55     /// When a slave reconnects and he has finished results waiting it calls IsJobStillNeeded. If the finished
    56     /// result has not yet been collected from anywhere else, the job will be sent by the slave and the job state is set to Pending.
    57     /// Now the job be in pending state until it is received from the reconnected slave or the TimeToLive value of this dictionary has reached zero.
    58     /// </summary>
    59     private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>();
    6046    private static int PENDING_TIMEOUT = 100;
    6147
     
    7460    /// </summary>
    7561    public SlaveCommunicator() {
     62      Logger.Debug("ServiceCommunicator instantiated");
    7663      lifecycleManager = ServiceLocator.GetLifecycleManager();
    7764      jobManager = ServiceLocator.GetJobManager() as IInternalJobManager;
     
    9784            SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
    9885
    99             if (!lastHeartbeats.ContainsKey(slave.Id)) {
    100               Logger.Info("No previous hearbeats are available for " + slave.Id + " although it is in state " + slave.State);
     86            if (!lifecycleManager.LastHeartbeats.ContainsKey(slave.Id)) {
     87              Logger.Info("No previous hearbeats are available for " + slave.Name + "(" + slave.Id + "), although it is in state " + slave.State);
    10188
    10289              // add a heartbeat NOW and give the slave time to say something for HEARTBEAT_MAX_DIF
    10390              // otherwise alls the slaves jobs would be aborted, which is not desirable if the server has just been restarted
    10491              heartbeatLock.EnterWriteLock();
    105               lastHeartbeats.Add(slave.Id, DateTime.Now);
     92              lifecycleManager.LastHeartbeats.Add(slave.Id, DateTime.Now);
    10693              heartbeatLock.ExitWriteLock();
    10794            } else {
    108               DateTime lastHeartbeatOfSlave = lastHeartbeats[slave.Id];
     95              DateTime lastHeartbeatOfSlave = lifecycleManager.LastHeartbeats[slave.Id];
    10996
    11097              TimeSpan diff = DateTime.Now.Subtract(lastHeartbeatOfSlave);
     
    11299              if (diff.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
    113100                // if slave calculated jobs, the job must be reset
    114                 Logger.Info("Slave timed out and is on RESET");
     101                Logger.Info("Slave timed out and is on RESET (no message for " + diff.TotalSeconds + " seconds.)");
    115102                foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
    116103                  DaoLocator.JobDao.SetJobOffline(job);
    117                   lock (newAssignedJobs) {
    118                     if (newAssignedJobs.ContainsKey(job.Id))
    119                       newAssignedJobs.Remove(job.Id);
     104                  lock (lifecycleManager.NewAssignedJobs) {
     105                    if (lifecycleManager.NewAssignedJobs.ContainsKey(job.Id))
     106                      lifecycleManager.NewAssignedJobs.Remove(job.Id);
    120107                  }
    121108                }
     
    127114                Logger.Debug("removing it from the heartbeats list");
    128115                heartbeatLock.EnterWriteLock();
    129                 lastHeartbeats.Remove(slave.Id);
     116                lifecycleManager.LastHeartbeats.Remove(slave.Id);
    130117                heartbeatLock.ExitWriteLock();
    131118              }
     
    148135
    149136      foreach (JobDto currJob in pendingJobsInDb) {
    150         lock (pendingJobs) {
    151           if (pendingJobs.ContainsKey(currJob.Id)) {
    152             if (pendingJobs[currJob.Id] <= 0) {
     137        lock (lifecycleManager.PendingJobs) {
     138          if (lifecycleManager.PendingJobs.ContainsKey(currJob.Id)) {
     139            if (lifecycleManager.PendingJobs[currJob.Id] <= 0) {
    153140              currJob.State = JobState.Offline;
    154141              DaoLocator.JobDao.Update(currJob);
    155142            } else {
    156               pendingJobs[currJob.Id]--;
     143              lifecycleManager.PendingJobs[currJob.Id]--;
    157144            }
    158145          }
     
    173160
    174161      heartbeatLock.EnterWriteLock();
    175       if (lastHeartbeats.ContainsKey(slave.Id)) {
    176         lastHeartbeats[slave.Id] = DateTime.Now;
     162      if (lifecycleManager.LastHeartbeats.ContainsKey(slave.Id)) {
     163        lifecycleManager.LastHeartbeats[slave.Id] = DateTime.Now;
    177164      } else {
    178         lastHeartbeats.Add(slave.Id, DateTime.Now);
     165        lifecycleManager.LastHeartbeats.Add(slave.Id, DateTime.Now);
    179166      }
    180167      heartbeatLock.ExitWriteLock();
     
    247234        using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
    248235          SlaveDto slave = UpdateSlaveData(heartbeatData);
     236          DaoLocator.SlaveDao.Update(slave);
     237
    249238          SaveTimestamp(heartbeatData);
    250239
    251240          //ProcessJobProgress(heartbeatData, response);
    252241          response.ActionRequest = ProcessJobProgress(heartbeatData);
    253 
     242         
    254243          //check if new Cal must be loaded
    255244          if (slave.CalendarSyncStatus == CalendarState.Fetch || slave.CalendarSyncStatus == CalendarState.ForceFetch) {
     
    270259          }
    271260
    272           DaoLocator.SlaveDao.Update(slave);
    273261          scope.Complete();
    274262        }
     
    286274    }
    287275
    288     private static void SaveTimestamp(HeartBeatData heartbeatData) {
     276    private void SaveTimestamp(HeartBeatData heartbeatData) {
    289277      heartbeatLock.EnterWriteLock();
    290       if (lastHeartbeats.ContainsKey(heartbeatData.SlaveId)) {
    291         lastHeartbeats[heartbeatData.SlaveId] = DateTime.Now;
     278      if (lifecycleManager.LastHeartbeats.ContainsKey(heartbeatData.SlaveId)) {
     279        lifecycleManager.LastHeartbeats[heartbeatData.SlaveId] = DateTime.Now;
    292280      } else {
    293         lastHeartbeats.Add(heartbeatData.SlaveId, DateTime.Now);
     281        lifecycleManager.LastHeartbeats.Add(heartbeatData.SlaveId, DateTime.Now);
    294282      }
    295283      heartbeatLock.ExitWriteLock();
     
    316304
    317305      // find all the jobs in jobProgress which are not in the database -> they are not supposed to be calculated by this slave
    318       IEnumerable<Guid> jobsToAbort = GetJobsNotInDatabase(heartbeatData.SlaveId, heartbeatData.JobProgress.Keys);
    319       foreach (Guid jobId in jobsToAbort) {
    320         actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
    321         heartbeatData.JobProgress.Remove(jobId);
    322       }
     306      //IEnumerable<Guid> jobsToAbort = GetJobsNotCalculatedByThisSlave(heartbeatData.SlaveId, heartbeatData.JobProgress.Keys);
     307      //foreach (Guid jobId in jobsToAbort) {
     308      //  Logger.Error("Job shall not be caculated by this slave or does not exist in DB: " + jobId);
     309      //  actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
     310      //  heartbeatData.JobProgress.Remove(jobId);
     311      //}
    323312
    324313      // process all the remaining jobProgresses
     
    327316        if (curJob == null) {
    328317          // job does not exist in db
     318          Logger.Error("Job does not exist in DB: " + jobProgress.Key);
    329319          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
    330           Logger.Error("Job does not exist in DB: " + jobProgress.Key);
    331320        } else {
    332321          curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id);
     322          Guid id = curJob.Slave != null ? curJob.Slave.Id : Guid.Empty;
    333323          if (curJob.Slave == null || curJob.Slave.Id != heartbeatData.SlaveId) {
    334324            // assigned slave does not match heartbeat
     325            Logger.Error("The slave " + heartbeatData.SlaveId + " is not supposed to calculate Job: " + curJob.ToString());
    335326            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
    336             Logger.Error("The slave " + heartbeatData.SlaveId + " is not supposed to calculate Job: " + curJob);
    337327          } else {
    338328            // save job execution time
     
    341331            if (curJob.State == JobState.Aborted) {
    342332              // a request to abort the job has been set
     333              Logger.Error("Job is in state aborted, send AbortJob: " + curJob.Id);
    343334              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
    344335            } else if (curJob.State == JobState.SnapshotRequested) {
     
    355346      foreach (JobDto currJob in jobsOfSlave) {
    356347        if (heartbeatData.JobProgress.ContainsKey(currJob.Id)) {
    357           lock (newAssignedJobs) {
    358             if (newAssignedJobs.ContainsKey(currJob.Id)) {
     348          lock (lifecycleManager.NewAssignedJobs) {
     349            if (lifecycleManager.NewAssignedJobs.ContainsKey(currJob.Id)) {
    359350              Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
    360               newAssignedJobs.Remove(currJob.Id);
     351              lifecycleManager.NewAssignedJobs.Remove(currJob.Id);
    361352            }
    362353          }
    363354        } else {
    364           lock (newAssignedJobs) {
    365             if (newAssignedJobs.ContainsKey(currJob.Id)) {
    366               newAssignedJobs[currJob.Id]--;
    367               Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);
    368               if (newAssignedJobs[currJob.Id] <= 0) {
     355          lock (lifecycleManager.NewAssignedJobs) {
     356            if (lifecycleManager.NewAssignedJobs.ContainsKey(currJob.Id)) {
     357              lifecycleManager.NewAssignedJobs[currJob.Id]--;
     358              Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + lifecycleManager.NewAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);
     359              if (lifecycleManager.NewAssignedJobs[currJob.Id] <= 0) {
    369360                Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave);
    370361
     
    374365                actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
    375366
    376                 newAssignedJobs.Remove(currJob.Id);
     367                lifecycleManager.NewAssignedJobs.Remove(currJob.Id);
    377368              }
    378369            } else {
     
    388379
    389380    /// <summary>
    390     /// Returns the jobIds of the jobs which are not assigned to this slave in the database
    391     /// </summary>
    392     private IEnumerable<Guid> GetJobsNotInDatabase(Guid slaveId, IEnumerable<Guid> jobIds) {
     381    /// Returns the jobIds of the jobs which are not assigned to this slave in the database (they either are not stored in DB or they are assigned
     382    /// </summary>
     383    private IEnumerable<Guid> GetJobsNotCalculatedByThisSlave(Guid slaveId, IEnumerable<Guid> jobIds) {
    393384      IEnumerable<Guid> activeJobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(slaveId)).Select(j => j.Id);
    394385      return jobIds.Except(activeJobsOfSlave).ToList();
     
    520511
    521512        Logger.Info("Job pulled: " + job2Calculate + " for user " + slaveId);
    522         lock (newAssignedJobs) {
    523           if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
    524             newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
     513        lock (lifecycleManager.NewAssignedJobs) {
     514          if (!lifecycleManager.NewAssignedJobs.ContainsKey(job2Calculate.Id))
     515            lifecycleManager.NewAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
    525516        }
    526517      } else {
     
    701692
    702693      heartbeatLock.EnterWriteLock();
    703       if (lastHeartbeats.ContainsKey(slaveId))
    704         lastHeartbeats.Remove(slaveId);
     694      if (lifecycleManager.LastHeartbeats.ContainsKey(slaveId))
     695        lifecycleManager.LastHeartbeats.Remove(slaveId);
    705696      heartbeatLock.ExitWriteLock();
    706697
     
    745736      }
    746737      job.State = JobState.Pending;
    747       lock (pendingJobs) {
    748         pendingJobs.Add(job.Id, PENDING_TIMEOUT);
     738      lock (lifecycleManager.PendingJobs) {
     739        lifecycleManager.PendingJobs.Add(job.Id, PENDING_TIMEOUT);
    749740      }
    750741
     
    765756        if (ipd != null) {
    766757          response.List.Add(ConvertPluginDescriptorToDto(ipd));
    767         } else {
    768           response.StatusMessage = ResponseStatus.GetPlugins_PluginsNotAvailable;
    769           return response;
    770758        }
    771759      }
Note: See TracChangeset for help on using the changeset viewer.