Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
09/17/10 10:26:55 (14 years ago)
Author:
cneumuel
Message:
  • Refactored HL.Hive.Experiment. JobItems are not called HiveJobs and OptimizerJobs do not contain a hierarchy anymore.
  • Dynamic generation of jobs on a slave are not reflected on the client user interface.
  • Optimizer-Trees are now strictly synchronized with the HiveJob-Trees (also the ComputeInParallel property is taken into account when the Child HiveJobs are created)
  • Improved the way a class can report progress and lock the UI (IProgressReporter, IProgress, Progress, ProgressView)
  • Changes were made to the config-files, so that server and clients work with blade12.hpc.fh-hagenberg.at
  • Lots of small changes and bugfixes
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs

    r4368 r4423  
    4545  /// The SlaveCommunicator manages the whole communication with the slave
    4646  /// </summary>
    47   public class SlaveCommunicator : ISlaveCommunicator,
    48     IInternalSlaveCommunicator {
     47  public class SlaveCommunicator : ISlaveCommunicator, IInternalSlaveCommunicator {
    4948    private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>();
     49
     50    /// <summary>
     51    /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent
     52    /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.
     53    /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.
     54    /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.
     55    /// </summary>
    5056    private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>();
     57
     58    /// <summary>
     59    /// When a slave reconnects and he has finished results waiting it calls IsJobStillNeeded. If the finished
     60    /// result has not yet been collected from anywhere else, the job will be sent by the slave and the job state is set to Pending.
     61    /// Now the job be in pending state until it is received from the reconnected slave or the TimeToLive value of this dictionary has reached zero.
     62    /// </summary>
    5163    private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>();
     64    private static int PENDING_TIMEOUT = 100;
    5265
    5366    private static ReaderWriterLockSlim heartbeatLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    5467
    55     //private ISessionFactory factory;
    5668    private ILifecycleManager lifecycleManager;
    5769    private IInternalJobManager jobManager;
    5870    private IScheduler scheduler;
    5971
    60     private static int PENDING_TIMEOUT = 100;
     72    private static object slaveLocker = new object();
    6173
    6274    /// <summary>
     
    6678    /// </summary>
    6779    public SlaveCommunicator() {
    68       //factory = ServiceLocator.GetSessionFactory();
    69 
    7080      lifecycleManager = ServiceLocator.GetLifecycleManager();
    7181      jobManager = ServiceLocator.GetJobManager() as IInternalJobManager;
    7282      scheduler = ServiceLocator.GetScheduler();
    7383
    74       lifecycleManager.RegisterHeartbeat(new EventHandler(lifecycleManager_OnServerHeartbeat));
     84      lifecycleManager.ServerHeartbeat += new EventHandler(lifecycleManager_OnServerHeartbeat);
    7585    }
    7686
     
    8292    /// <param name="e"></param>
    8393    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
    84       Logger.Debug("Server Heartbeat ticked");
    85 
    86       // [chn] why is transaction management done here
    87       using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
    88         List<SlaveDto> allSlaves = new List<SlaveDto>(DaoLocator.SlaveDao.FindAll());
    89 
    90         foreach (SlaveDto slave in allSlaves) {
    91           if (slave.State != SlaveState.Offline && slave.State != SlaveState.NullState) {
     94      // this block can conflict with the heartbeats from a slave (which also updates the slave-records)
     95      lock (slaveLocker) {
     96        Logger.Debug("Server Heartbeat ticked");
     97        List<Guid> slaveIds = DaoLocator.SlaveDao.FindAll().Select(s => s.Id).ToList();
     98
     99        foreach (Guid slaveId in slaveIds) {
     100          using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
    92101            heartbeatLock.EnterUpgradeableReadLock();
     102            SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
    93103
    94104            if (!lastHeartbeats.ContainsKey(slave.Id)) {
    95               Logger.Info("Slave " + slave.Id +
    96                               " wasn't offline but hasn't sent heartbeats - setting offline");
    97               slave.State = SlaveState.Offline;
    98               DaoLocator.SlaveDao.Update(slave);
    99               Logger.Info("Slave " + slave.Id +
    100                               " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
    101               foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
    102                 //maybe implementa n additional Watchdog? Till then, just set them offline..
    103                 DaoLocator.JobDao.SetJobOffline(job);
    104               }
     105              Logger.Info("No previous hearbeats are available for " + slave.Id + " although it is in state " + slave.State);
     106
     107              // add a heartbeat NOW and give the slave time to say something for HEARTBEAT_MAX_DIF
     108              // otherwise alls the slaves jobs would be aborted, which is not desirable if the server has just been restarted
     109              heartbeatLock.EnterWriteLock();
     110              lastHeartbeats.Add(slave.Id, DateTime.Now);
     111              heartbeatLock.ExitWriteLock();
    105112            } else {
    106               DateTime lastHbOfSlave = lastHeartbeats[slave.Id];
    107 
    108               TimeSpan dif = DateTime.Now.Subtract(lastHbOfSlave);
     113              DateTime lastHeartbeatOfSlave = lastHeartbeats[slave.Id];
     114
     115              TimeSpan diff = DateTime.Now.Subtract(lastHeartbeatOfSlave);
    109116              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
    110               if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
     117              if (diff.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
    111118                // if slave calculated jobs, the job must be reset
    112119                Logger.Info("Slave timed out and is on RESET");
     
    119126                }
    120127                Logger.Debug("setting slave offline");
    121                 // slave must be set offline
    122128                slave.State = SlaveState.Offline;
    123129
    124                 //slaveAdapter.Update(slave);
    125130                DaoLocator.SlaveDao.Update(slave);
    126131
     
    131136              }
    132137            }
    133 
    134138            heartbeatLock.ExitUpgradeableReadLock();
    135           } else {
    136             //TODO: RLY neccesary?
    137             //HiveLogger.Info(this.ToString() + ": Slave " + slave.Id + " has wrong state: Shouldn't have offline or nullstate, has " + slave.State);
    138             heartbeatLock.EnterWriteLock();
    139             //HiveLogger.Info(this.ToString() + ": Slave " + slave.Id + " has wrong state: Resetting all his jobs");
    140             if (lastHeartbeats.ContainsKey(slave.Id))
    141               lastHeartbeats.Remove(slave.Id);
    142             foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
    143               DaoLocator.JobDao.SetJobOffline(job);
    144             }
    145             heartbeatLock.ExitWriteLock();
    146           }
    147         }
    148         CheckForPendingJobs();
    149         //        DaoLocator.DestroyContext();
    150         scope.Complete();
    151       }
     139            scope.Complete();
     140          } // using
     141        } // foreach
     142
     143        using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
     144          CheckForPendingJobs();
     145          scope.Complete();
     146        }
     147        Logger.Debug("Server Heartbeat ended");
     148      } // lock
    152149    }
    153150
    154151    private void CheckForPendingJobs() {
    155       IList<JobDto> pendingJobsInDB = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(JobState.Pending));
    156 
    157       foreach (JobDto currJob in pendingJobsInDB) {
     152      IList<JobDto> pendingJobsInDb = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(JobState.Pending));
     153
     154      foreach (JobDto currJob in pendingJobsInDb) {
    158155        lock (pendingJobs) {
    159156          if (pendingJobs.ContainsKey(currJob.Id)) {
     
    192189      slave.CalendarSyncStatus = dbSlave != null ? dbSlave.CalendarSyncStatus : CalendarState.NotAllowedToFetch;
    193190      slave.State = SlaveState.Idle;
    194 
    195       if (dbSlave == null)
     191      slave.Login = DateTime.Now;
     192
     193      if (dbSlave == null) {
    196194        DaoLocator.SlaveDao.Insert(slave);
    197       else {
     195      } else {
    198196        DaoLocator.SlaveDao.Update(slave);
    199197      }
     
    207205      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
    208206      if (slave == null) {
    209         //response.Success = false;
    210         response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
     207        response.StatusMessage = ResponseStatus.GetCalendar_ResourceDoesNotExist;
    211208        return response;
    212209      }
     
    217214      if (appointments.Count() == 0) {
    218215        response.StatusMessage = ResponseStatus.GetCalendar_NoCalendarFound;
    219         //response.Success = false;
    220216      } else {
    221         //response.Success = true;
    222217        response.Appointments = appointments;
    223218      }
     
    232227      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
    233228      if (slave == null) {
    234         //response.Success = false;
    235         response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
     229        response.StatusMessage = ResponseStatus.GetCalendar_ResourceDoesNotExist;
    236230        return response;
    237231      }
     
    251245    /// <returns></returns>
    252246    public ResponseHeartBeat ProcessHeartBeat(HeartBeatData heartbeatData) {
    253       Logger.Debug("BEGIN Processing Heartbeat for Slave " + heartbeatData.SlaveId);
    254 
    255       ResponseHeartBeat response = new ResponseHeartBeat();
    256       response.ActionRequest = new List<MessageContainer>();
    257 
    258       Logger.Debug("BEGIN Started Slave Fetching");
    259       SlaveDto slave = DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId);
    260       Logger.Debug("END Finished Slave Fetching");
    261 
    262       slave.NrOfFreeCores = heartbeatData.FreeCores;
    263       slave.FreeMemory = heartbeatData.FreeMemory;
    264       slave.IsAllowedToCalculate = heartbeatData.IsAllowedToCalculate;
    265 
    266       // check if the slave is logged in
    267       if (slave.State == SlaveState.Offline || slave.State == SlaveState.NullState) {
    268         response.StatusMessage = ResponseStatus.ProcessHeartBeat_UserNotLoggedIn;
    269         response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
    270         Logger.Error("ProcessHeartBeat: Slave state null or offline: " + slave);
    271         return response;
    272       }
    273 
    274       // save timestamp of this heartbeat
    275       Logger.Debug("BEGIN Locking for Heartbeats");
     247      lock (slaveLocker) {
     248        Logger.Debug("BEGIN Processing Heartbeat for Slave " + heartbeatData.SlaveId);
     249        ResponseHeartBeat response = new ResponseHeartBeat();
     250        response.ActionRequest = new List<MessageContainer>();
     251
     252        using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
     253          SlaveDto slave = UpdateSlaveData(heartbeatData);
     254          SaveTimestamp(heartbeatData);
     255
     256          //ProcessJobProgress(heartbeatData, response);
     257          response.ActionRequest = ProcessJobProgress(heartbeatData);
     258
     259          //check if new Cal must be loaded
     260          if (slave.CalendarSyncStatus == CalendarState.Fetch || slave.CalendarSyncStatus == CalendarState.ForceFetch) {
     261            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
     262            Logger.Info("fetch or forcefetch sent");
     263          }
     264
     265          // check if slave has a free core for a new job
     266          // if true, ask scheduler for a new job for this slave
     267          Logger.Debug(" BEGIN Looking for Slave Jobs");
     268          if (this.IsAllowedToSendJobs() &&
     269              slave.IsAllowedToCalculate &&
     270              heartbeatData.FreeCores > 0 &&
     271              scheduler.ExistsJobForSlave(heartbeatData)) {
     272            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
     273          } else {
     274            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
     275          }
     276
     277          DaoLocator.SlaveDao.Update(slave);
     278          scope.Complete();
     279        }
     280        Logger.Debug(" END Processed Heartbeat for Slave " + heartbeatData.SlaveId);
     281        return response;
     282      }
     283    }
     284
     285    /// <summary>
     286    /// Returns true if there are enough resources to send a job
     287    /// There should not be too many jobs sent simultaniously
     288    /// </summary>
     289    private bool IsAllowedToSendJobs() {
     290      return lifecycleManager.JobsCurrentlyTransferring < ApplicationConstants.MAX_JOB_TRANSFER_COUNT;
     291    }
     292
     293    private static void SaveTimestamp(HeartBeatData heartbeatData) {
    276294      heartbeatLock.EnterWriteLock();
    277       Logger.Debug("END Locked for Heartbeats");
    278295      if (lastHeartbeats.ContainsKey(heartbeatData.SlaveId)) {
    279296        lastHeartbeats[heartbeatData.SlaveId] = DateTime.Now;
     
    282299      }
    283300      heartbeatLock.ExitWriteLock();
    284 
    285       Logger.Debug("BEGIN Processing Heartbeat Jobs");
    286       ProcessJobProcess(heartbeatData, response);
    287       Logger.Debug("END Processed Heartbeat Jobs");
    288 
    289       //check if new Cal must be loaded
    290       if (slave.CalendarSyncStatus == CalendarState.Fetch || slave.CalendarSyncStatus == CalendarState.ForceFetch) {
    291         response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
    292         //slave.CalendarSyncStatus = CalendarState.Fetching;
    293         Logger.Info("fetch or forcefetch sent");
    294       }
    295 
    296       // check if slave has a free core for a new job
    297       // if true, ask scheduler for a new job for this slave
    298       Logger.Debug(" BEGIN Looking for Slave Jobs");
    299       if (slave.IsAllowedToCalculate && heartbeatData.FreeCores > 0 && scheduler.ExistsJobForSlave(heartbeatData)) {
    300         response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
    301       } else {
    302         response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
    303       }
    304       Logger.Debug(" END Looked for Slave Jobs");
    305 
    306       DaoLocator.SlaveDao.Update(slave);
    307 
    308       //tx.Commit();
    309       Logger.Debug(" END Processed Heartbeat for Slave " + heartbeatData.SlaveId);
    310       return response;
     301    }
     302
     303    private SlaveDto UpdateSlaveData(HeartBeatData heartbeatData) {
     304      SlaveDto slave = DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId);
     305      if (slave == null) {
     306        slave = new SlaveDto() { Id = heartbeatData.SlaveId };
     307        Login(slave);
     308        slave = DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId);
     309      }
     310
     311      slave.NrOfFreeCores = heartbeatData.FreeCores;
     312      slave.FreeMemory = heartbeatData.FreeMemory;
     313      slave.IsAllowedToCalculate = heartbeatData.IsAllowedToCalculate;
     314
     315      slave.State = heartbeatData.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle;
     316      return slave;
     317    }
     318
     319    private List<MessageContainer> ProcessJobProgress(HeartBeatData heartbeatData) {
     320      List<MessageContainer> actions = new List<MessageContainer>();
     321
     322      // find all the jobs in jobProgress which are not in the database -> they are not supposed to be calculated by this slave
     323      IEnumerable<Guid> jobsToAbort = GetJobsNotInDatabase(heartbeatData.SlaveId, heartbeatData.JobProgress.Keys);
     324      foreach (Guid jobId in jobsToAbort) {
     325        actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
     326        heartbeatData.JobProgress.Remove(jobId);
     327      }
     328
     329      // process all the remaining jobProgresses
     330      foreach (var jobProgress in heartbeatData.JobProgress) {
     331        JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
     332        if (curJob == null) {
     333          // job does not exist in db
     334          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
     335          Logger.Error("Job does not exist in DB: " + jobProgress.Key);
     336        } else {
     337          curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id);
     338          if (curJob.Slave == null || curJob.Slave.Id != heartbeatData.SlaveId) {
     339            // assigned slave does not match heartbeat
     340            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
     341            Logger.Error("The slave " + heartbeatData.SlaveId + " is not supposed to calculate Job: " + curJob);
     342          } else {
     343            // save job execution time
     344            curJob.ExecutionTime = jobProgress.Value;
     345
     346            if (curJob.State == JobState.Aborted) {
     347              // a request to abort the job has been set
     348              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
     349            } else if (curJob.State == JobState.SnapshotRequested) {
     350              // a request for a snapshot has been set
     351              actions.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
     352              curJob.State = JobState.SnapshotSent;
     353            }
     354          }
     355        }
     356        DaoLocator.JobDao.Update(curJob);
     357      }
     358
     359      var jobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId));
     360      foreach (JobDto currJob in jobsOfSlave) {
     361        if (heartbeatData.JobProgress.ContainsKey(currJob.Id)) {
     362          lock (newAssignedJobs) {
     363            if (newAssignedJobs.ContainsKey(currJob.Id)) {
     364              Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
     365              newAssignedJobs.Remove(currJob.Id);
     366            }
     367          }
     368        } else {
     369          lock (newAssignedJobs) {
     370            if (newAssignedJobs.ContainsKey(currJob.Id)) {
     371              newAssignedJobs[currJob.Id]--;
     372              Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);
     373              if (newAssignedJobs[currJob.Id] <= 0) {
     374                Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave);
     375
     376                currJob.State = JobState.Offline;
     377                DaoLocator.JobDao.Update(currJob);
     378
     379                actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
     380
     381                newAssignedJobs.Remove(currJob.Id);
     382              }
     383            } else {
     384              Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
     385              currJob.State = JobState.Offline;
     386              DaoLocator.JobDao.Update(currJob);
     387            }
     388          } // lock
     389        }
     390      }
     391      return actions;
     392    }
     393
     394    /// <summary>
     395    /// Returns the jobIds of the jobs which are not assigned to this slave in the database
     396    /// </summary>
     397    private IEnumerable<Guid> GetJobsNotInDatabase(Guid slaveId, IEnumerable<Guid> jobIds) {
     398      IEnumerable<Guid> activeJobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(slaveId)).Select(j => j.Id);
     399      return jobIds.Except(activeJobsOfSlave).ToList();
    311400    }
    312401
     
    333422    /// (2) state Abort results in Finished. This should be: AbortRequested, Aborted.
    334423    /// </summary>
    335     /// <param name="hbData"></param>
    336     /// <param name="jobAdapter"></param>
    337     /// <param name="slaveAdapter"></param>
     424    /// <param name="heartbeatData"></param>
    338425    /// <param name="response"></param>
    339     private void ProcessJobProcess(HeartBeatData hbData, ResponseHeartBeat response) {
    340       Logger.Debug("Started for Slave " + hbData.SlaveId);
    341       List<JobDto> jobsOfSlave = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(hbData.SlaveId)));
    342       if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
    343         if (jobsOfSlave == null || jobsOfSlave.Count == 0) {
    344           foreach (Guid jobId in hbData.JobProgress.Keys) {
    345             response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
    346           }
    347 
    348           Logger.Error("There is no job calculated by this user " + hbData.SlaveId + ", advise him to abort all");
    349           return;
    350         }
    351 
    352         foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
    353           JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
    354           if (curJob == null) {
    355             response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
    356             response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
    357             Logger.Error("Job does not exist in DB: " + jobProgress.Key);
    358             return;
    359           }
    360           curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id);
    361           if (curJob.Slave == null || curJob.Slave.Id != hbData.SlaveId) {
    362             response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
    363             Logger.Error("There is no job calculated by this user " + hbData.SlaveId + " Job: " + curJob);
    364           } else if (curJob.State == JobState.Aborted) {
    365             // a request to abort the job has been set
    366             response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
    367             curJob.State = JobState.Finished;
    368           } else {
    369             // save job progress
    370             curJob.Percentage = jobProgress.Value;
    371 
    372             if (curJob.State == JobState.SnapshotRequested) {
    373               // a request for a snapshot has been set
    374               response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
    375               curJob.State = JobState.SnapshotSent;
    376             }
    377           }
    378           DaoLocator.JobDao.Update(curJob);
    379         }
    380       }
    381       foreach (JobDto currJob in jobsOfSlave) {
    382         bool found = false;
    383         if (hbData.JobProgress != null) {
    384           foreach (Guid jobId in hbData.JobProgress.Keys) {
    385             if (jobId == currJob.Id) {
    386               found = true;
    387               break;
    388             }
    389           }
    390         }
    391         if (!found) {
    392           lock (newAssignedJobs) {
    393             if (newAssignedJobs.ContainsKey(currJob.Id)) {
    394               newAssignedJobs[currJob.Id]--;
    395               Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);
    396               if (newAssignedJobs[currJob.Id] <= 0) {
    397                 Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave);
    398 
    399                 currJob.State = JobState.Offline;
    400                 DaoLocator.JobDao.Update(currJob);
    401 
    402                 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
    403 
    404                 newAssignedJobs.Remove(currJob.Id);
    405               }
    406             } else {
    407               Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
    408               currJob.State = JobState.Offline;
    409               DaoLocator.JobDao.Update(currJob);
    410             }
    411           } // lock
    412         } else {
    413           lock (newAssignedJobs) {
    414 
    415             if (newAssignedJobs.ContainsKey(currJob.Id)) {
    416               Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
    417               newAssignedJobs.Remove(currJob.Id);
    418             }
    419           }
    420         }
    421       }
    422     }
     426    //private void ProcessJobProgress(HeartBeatData heartbeatData, ResponseHeartBeat response) {
     427    //  Logger.Debug("Started for Slave " + heartbeatData.SlaveId);
     428    //  List<JobDto> jobsOfSlave = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId)));
     429    //  if (heartbeatData.JobProgress != null && heartbeatData.JobProgress.Count > 0) {
     430    //    if (jobsOfSlave == null || jobsOfSlave.Count == 0) {
     431    //      foreach (Guid jobId in heartbeatData.JobProgress.Keys) {
     432    //        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
     433    //      }
     434
     435    //      Logger.Error("There is no job calculated by this user " + heartbeatData.SlaveId + ", advise him to abort all");
     436    //      return;
     437    //    }
     438
     439    //    foreach (KeyValuePair<Guid, TimeSpan> jobProgress in heartbeatData.JobProgress) {
     440    //      JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
     441    //      if (curJob == null) {
     442    //        response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
     443    //        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
     444    //        Logger.Error("Job does not exist in DB: " + jobProgress.Key);
     445    //        return;
     446    //      }
     447    //      curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id);
     448    //      if (curJob.Slave == null || curJob.Slave.Id != heartbeatData.SlaveId) {
     449    //        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
     450    //        Logger.Error("There is no job calculated by this user " + heartbeatData.SlaveId + " Job: " + curJob);
     451    //      } else if (curJob.State == JobState.Aborted) {
     452    //        // a request to abort the job has been set
     453    //        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
     454    //        curJob.State = JobState.Finished;
     455    //      } else {
     456    //        // save job progress
     457    //        curJob.ExecutionTime = jobProgress.Value;
     458
     459    //        if (curJob.State == JobState.SnapshotRequested) {
     460    //          // a request for a snapshot has been set
     461    //          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
     462    //          curJob.State = JobState.SnapshotSent;
     463    //        }
     464    //      }
     465    //      DaoLocator.JobDao.Update(curJob);
     466    //    }
     467    //  }
     468
     469    //  foreach (JobDto currJob in jobsOfSlave) {
     470    //    bool found = false;
     471    //    if (heartbeatData.JobProgress != null) {
     472    //      foreach (Guid jobId in heartbeatData.JobProgress.Keys) {
     473    //        if (jobId == currJob.Id) {
     474    //          found = true;
     475    //          break;
     476    //        }
     477    //      }
     478    //    }
     479
     480    //    if (!found) {
     481    //      lock (newAssignedJobs) {
     482    //        if (newAssignedJobs.ContainsKey(currJob.Id)) {
     483    //          newAssignedJobs[currJob.Id]--;
     484    //          Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);
     485    //          if (newAssignedJobs[currJob.Id] <= 0) {
     486    //            Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave);
     487
     488    //            currJob.State = JobState.Offline;
     489    //            DaoLocator.JobDao.Update(currJob);
     490
     491    //            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
     492
     493    //            newAssignedJobs.Remove(currJob.Id);
     494    //          }
     495    //        } else {
     496    //          Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
     497    //          currJob.State = JobState.Offline;
     498    //          DaoLocator.JobDao.Update(currJob);
     499    //        }
     500    //      } // lock
     501    //    } else {
     502    //      lock (newAssignedJobs) {
     503    //        if (newAssignedJobs.ContainsKey(currJob.Id)) {
     504    //          Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
     505    //          newAssignedJobs.Remove(currJob.Id);
     506    //        }
     507    //      }
     508    //    }
     509    //  }
     510    //}
    423511
    424512    /// <summary>
     
    466554      //tx = session.BeginTransaction();
    467555
    468       ResponseResultReceived response = ProcessJobResult(result.SlaveId, result.Id, new byte[] { }, result.Percentage, result.Exception, finished);
     556      ResponseResultReceived response = ProcessJobResult(result.SlaveId, result.Id, new byte[] { }, result.ExecutionTime, result.Exception, finished);
    469557
    470558      if (response.StatusMessage == ResponseStatus.Ok) {
     
    495583    }
    496584
    497     private ResponseResultReceived ProcessJobResult(Guid slaveId, Guid jobId, byte[] result, double? percentage, string exception, bool finished) {
     585    private ResponseResultReceived ProcessJobResult(Guid slaveId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
    498586      Logger.Info("BEGIN Job received for Storage - SUB method: " + jobId);
    499587
     
    511599
    512600      if (job != null && job.JobInfo == null) {
    513         //response.Success = false;
    514601        response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
    515602        response.JobId = jobId;
     
    520607      }
    521608      if (job.JobInfo.State == JobState.Aborted) {
    522         //response.Success = false;
    523609        response.StatusMessage = ResponseStatus.ProcessJobResult_JobAborted;
    524610
     
    529615      }
    530616      if (job.JobInfo.Slave == null) {
    531         //response.Success = false;
    532617        response.StatusMessage = ResponseStatus.ProcessJobResult_JobIsNotBeeingCalculated;
    533618        response.JobId = jobId;
     
    562647      }
    563648      if (job.JobInfo.State != JobState.Calculating && job.JobInfo.State != JobState.Pending) {
    564         //response.Success = false;
    565649        response.StatusMessage = ResponseStatus.ProcessJobResult_InvalidJobState;
    566650        response.JobId = jobId;
     
    571655        return response;
    572656      }
    573       job.JobInfo.Percentage = percentage;
     657      job.JobInfo.ExecutionTime = executionTime;
    574658
    575659      if (!string.IsNullOrEmpty(exception)) {
     
    600684    /// these job results will be stored in the database
    601685    /// </summary>
    602     /// <param name="slaveId"></param>
    603     /// <param name="jobId"></param>
    604     /// <param name="result"></param>
    605     /// <param name="exception"></param>
    606     /// <param name="finished"></param>
    607     /// <returns></returns>
    608     public ResponseResultReceived StoreFinishedJobResult(Guid slaveId,
    609       Guid jobId,
    610       byte[] result,
    611       double percentage,
    612       string exception) {
    613 
    614       return ProcessJobResult(slaveId, jobId, result, percentage, exception, true);
    615     }
    616 
    617     public ResponseResultReceived ProcessSnapshot(Guid slaveId, Guid jobId, byte[] result, double percentage, string exception) {
    618       return ProcessJobResult(slaveId, jobId, result, percentage, exception, false);
     686    public ResponseResultReceived StoreFinishedJobResult(Guid slaveId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
     687      return ProcessJobResult(slaveId, jobId, result, executionTime, exception, true);
     688    }
     689
     690    public ResponseResultReceived ProcessSnapshot(Guid slaveId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
     691      return ProcessJobResult(slaveId, jobId, result, executionTime, exception, false);
    619692    }
    620693
     
    637710      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
    638711      if (slave == null) {
    639         //response.Success = false;
    640712        response.StatusMessage = ResponseStatus.Logout_SlaveNotRegistered;
    641713        return response;
     
    666738      JobDto job = DaoLocator.JobDao.FindById(jobId);
    667739      if (job == null) {
    668         //response.Success = false;
    669740        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobDoesNotExist;
    670741        Logger.Error("Job doesn't exist (anymore)! " + jobId);
     
    672743      }
    673744      if (job.State == JobState.Finished) {
    674         //response.Success = true;
    675745        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobAlreadyFinished;
    676746        Logger.Error("already finished! " + job);
     
    702772            response.List.Add(ConvertPluginDescriptorToDto(ipd));
    703773          } else {
    704             //response.Success = false;
    705774            response.StatusMessage = ResponseStatus.GetPlugins_PluginsNotAvailable;
    706775            return response;
     
    722791      return currCachedPlugin;
    723792    }
    724    
     793
    725794    #endregion
    726795  }
Note: See TracChangeset for help on using the changeset viewer.