Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/05/11 22:35:40 (13 years ago)
Author:
cneumuel
Message:

#1233

  • refactoring of slave core
  • created JobManager, which is responsible for managing jobs without knowing anything about the service. this class is easier testable than slave core
  • lots of cleanup
  • created console test project for slave
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs

    r6263 r6357  
    2121
    2222using System;
    23 using System.Collections.Generic;
    2423using System.Diagnostics;
    2524using System.ServiceModel;
     
    2928using HeuristicLab.Common;
    3029using HeuristicLab.Core;
     30using HeuristicLab.Hive;
    3131
    3232
     
    3737  /// </summary>
    3838  public class Core : MarshalByRefObject {
    39     public static ILog Log { get; set; }
    40     public static bool AbortRequested { get; set; }
    41 
    4239    private static HeartbeatManager heartbeatManager;
    4340    public static HeartbeatManager HeartbeatManager {
     
    4542    }
    4643
     44    private Semaphore waitShutdownSem = new Semaphore(0, 1);
     45    private bool abortRequested;
    4746    private ISlaveCommunication clientCom;
    4847    private ServiceHost slaveComm;
    49     private Semaphore waitShutdownSem = new Semaphore(0, 1);
    50     private Dictionary<Guid, SlaveJob> slaveJobs = new Dictionary<Guid, SlaveJob>();
    5148    private WcfService wcfService;
     49    private JobManager jobManager;
     50    private ConfigManager configManager;
     51    private PluginManager pluginManager;
    5252
    5353    public EventLog ServiceEventLog { get; set; }
    54     public Dictionary<Guid, SlaveJob> SlaveJobs { get { return slaveJobs; } }
    55 
    56     public Core() { }
     54
     55    public Core() {
     56      var log = new ThreadSafeLog(new Log());
     57      this.pluginManager = new PluginManager(WcfService.Instance, log);
     58      this.jobManager = new JobManager(pluginManager, log);
     59      log.MessageAdded += new EventHandler<EventArgs<string>>(log_MessageAdded);
     60
     61      RegisterJobManagerEvents();
     62
     63      this.configManager = new ConfigManager(jobManager);
     64      ConfigManager.Instance = this.configManager;
     65    }
    5766
    5867    /// <summary>
     
    6069    /// </summary>
    6170    public void Start() {
    62       AbortRequested = false;
    63 
    64       try {
    65         ConfigManager manager = ConfigManager.Instance;
    66         manager.Core = this;
     71      abortRequested = false;
     72
     73      try {
     74        //manager.Core = this;
    6775
    6876        //start the client communication service (pipe between slave and slave gui)
     
    7280
    7381        // delete all left over job directories
    74         PluginCache.Instance.CleanPluginTemp();
     82        pluginManager.CleanPluginTemp();
    7583        clientCom.LogMessage("Hive Slave started");
    7684
     
    110118    private void DispatchMessageQueue() {
    111119      MessageQueue queue = MessageQueue.GetInstance();
    112       while (!AbortRequested) {
     120      while (!abortRequested) {
    113121        MessageContainer container = queue.GetMessage();
    114122        DetermineAction(container);
    115         if (!AbortRequested) {
    116           clientCom.StatusChanged(ConfigManager.Instance.GetStatusForClientConsole());
     123        if (!abortRequested) {
     124          clientCom.StatusChanged(configManager.GetStatusForClientConsole());
    117125        }
    118126      }
     
    150158        switch (container.Message) {
    151159          case MessageContainer.MessageType.CalculateJob:
    152             Task.Factory.StartNew(HandleCalculateJob, container.JobId)
    153             .ContinueWith((t) => {
    154               // handle exception of task
    155               clientCom.LogMessage(t.Exception.ToString());
    156               if (t.Exception.GetType() != typeof(JobNotFoundException)) {
    157                 wcfService.UpdateJobState(container.JobId, JobState.Waiting, t.Exception.ToString());
    158                 SlaveStatusInfo.IncrementJobsFailed();
    159               }
    160             }, TaskContinuationOptions.OnlyOnFaulted);
     160            CalculateJobAsync(container.JobId);
     161            break;
     162          case MessageContainer.MessageType.AbortJob:
     163            AbortJobAsync(container.JobId);
     164            break;
     165          case MessageContainer.MessageType.StopJob:
     166            StopJobAsync(container.JobId);
     167            break;
     168          case MessageContainer.MessageType.PauseJob:
     169            PauseJobAsync(container.JobId);
     170            break;
     171          case MessageContainer.MessageType.StopAll:
     172            DoStopAll();
     173            break;
     174          case MessageContainer.MessageType.PauseAll:
     175            DoPauseAll();
     176            break;
     177          case MessageContainer.MessageType.AbortAll:
     178            DoAbortAll();
    161179            break;
    162180          case MessageContainer.MessageType.ShutdownSlave:
    163181            ShutdownCore();
    164182            break;
    165           case MessageContainer.MessageType.StopAll:
    166             DoStopAll();
    167             break;
    168           case MessageContainer.MessageType.PauseAll:
    169             DoPauseAll();
    170             break;
    171           case MessageContainer.MessageType.AbortAll:
    172             DoAbortAll();
    173             break;
    174           case MessageContainer.MessageType.AbortJob:
    175             SlaveStatusInfo.IncrementJobsAborted(); //TODO: move to a sane place
    176 
    177             Task.Factory.StartNew(HandleAbortJob, container.JobId)
    178              .ContinueWith((t) => {
    179                clientCom.LogMessage(t.Exception.ToString());
    180              }, TaskContinuationOptions.OnlyOnFaulted);
    181             break;
    182           case MessageContainer.MessageType.StopJob:
    183             Task.Factory.StartNew(HandleStopJob, container.JobId)
    184              .ContinueWith((t) => {
    185                clientCom.LogMessage(t.Exception.ToString());
    186              }, TaskContinuationOptions.OnlyOnFaulted);
    187             break;
    188           case MessageContainer.MessageType.PauseJob:
    189             Task.Factory.StartNew(HandlePauseJob, container.JobId)
    190              .ContinueWith((t) => {
    191                clientCom.LogMessage(t.Exception.ToString());
    192              }, TaskContinuationOptions.OnlyOnFaulted);
    193             break;
    194183          case MessageContainer.MessageType.Restart:
    195184            DoStartSlave();
     
    199188            break;
    200189          case MessageContainer.MessageType.SayHello:
    201             wcfService.Connect(ConfigManager.Instance.GetClientInfo());
     190            wcfService.Connect(configManager.GetClientInfo());
    202191            break;
    203192        }
     
    207196    }
    208197
     198    private void CalculateJobAsync(Guid jobId) {
     199      Task.Factory.StartNew(HandleCalculateJob, jobId)
     200      .ContinueWith((t) => {
     201        SlaveStatusInfo.IncrementExceptionOccured();
     202        clientCom.LogMessage(t.Exception.ToString());
     203      }, TaskContinuationOptions.OnlyOnFaulted);
     204    }
     205
     206    private void StopJobAsync(Guid jobId) {
     207      Task.Factory.StartNew(HandleStopJob, jobId)
     208       .ContinueWith((t) => {
     209         SlaveStatusInfo.IncrementExceptionOccured();
     210         clientCom.LogMessage(t.Exception.ToString());
     211       }, TaskContinuationOptions.OnlyOnFaulted);
     212    }
     213
     214    private void PauseJobAsync(Guid jobId) {
     215      Task.Factory.StartNew(HandlePauseJob, jobId)
     216       .ContinueWith((t) => {
     217         SlaveStatusInfo.IncrementExceptionOccured();
     218         clientCom.LogMessage(t.Exception.ToString());
     219       }, TaskContinuationOptions.OnlyOnFaulted);
     220    }
     221
     222    private void AbortJobAsync(Guid jobId) {
     223      Task.Factory.StartNew(HandleAbortJob, jobId)
     224       .ContinueWith((t) => {
     225         SlaveStatusInfo.IncrementExceptionOccured();
     226         clientCom.LogMessage(t.Exception.ToString());
     227       }, TaskContinuationOptions.OnlyOnFaulted);
     228    }
     229
    209230    private void HandleCalculateJob(object jobIdObj) {
    210231      Guid jobId = (Guid)jobIdObj;
    211       SlaveJob newJob = new SlaveJob(this);
    212       bool start = true;
    213 
    214       lock (slaveJobs) {
    215         if (ConfigManager.Instance.GetFreeCores() < 1) {
    216           wcfService.UpdateJobState(jobId, JobState.Waiting, "Slave set status waiting because no cores were available");
    217           clientCom.LogMessage(string.Format("Setting job with id {0} to waiting, all cores are used", jobId));
    218           start = false;
    219         } else {
    220           if (slaveJobs.ContainsKey(jobId)) {
    221             start = false;
    222             clientCom.LogMessage(string.Format("Job with id {0} already exists. Start aborted.", jobId));
    223           } else {
    224             slaveJobs.Add(jobId, newJob);
    225             newJob.PrepareJob(jobId);
    226           }
    227         }
    228       }
    229 
    230       if (start) {
    231         newJob.CalculateJob();
     232      Job job = null;
     233      bool usedCoresIncremented = false;
     234      try {
     235        job = wcfService.GetJob(jobId);
     236        if (job == null) throw new JobNotFoundException(jobId);
     237        if (ConfigManager.Instance.GetFreeCores() < job.CoresNeeded) throw new OutOfCoresException();
     238        if (ConfigManager.GetFreeMemory() < job.MemoryNeeded) throw new OutOfMemoryException();
     239        SlaveStatusInfo.IncrementUsedCores(job.CoresNeeded); usedCoresIncremented = true;
     240        JobData jobData = wcfService.GetJobData(jobId);
     241        if (jobData == null) throw new JobDataNotFoundException(jobId);
     242        job = wcfService.UpdateJobState(jobId, JobState.Calculating, null);
     243        if (job == null) throw new JobNotFoundException(jobId);
     244        jobManager.StartJobAsync(job, jobData);
     245      }
     246      catch (JobNotFoundException) {
     247        if (usedCoresIncremented) SlaveStatusInfo.DecrementUsedCores(job.CoresNeeded);
     248        throw;
     249      }
     250      catch (JobDataNotFoundException) {
     251        SlaveStatusInfo.DecrementUsedCores(job.CoresNeeded);
     252        throw;
     253      }
     254      catch (JobAlreadyRunningException) {
     255        SlaveStatusInfo.DecrementUsedCores(job.CoresNeeded);
     256        throw;
     257      }
     258      catch (OutOfCoresException) {
     259        wcfService.UpdateJobState(jobId, JobState.Waiting, "No more cores available");
     260        throw;
     261      }
     262      catch (OutOfMemoryException) {
     263        wcfService.UpdateJobState(jobId, JobState.Waiting, "No more memory available");
     264        throw;
     265      }
     266      catch (Exception) {
     267        if (usedCoresIncremented) SlaveStatusInfo.DecrementUsedCores(job.CoresNeeded);
     268        throw;
     269      }
     270    }
     271
     272    private void HandleStopJob(object jobIdObj) {
     273      Guid jobId = (Guid)jobIdObj;
     274      try {
     275        Job job = wcfService.GetJob(jobId);
     276        if (job == null) throw new JobNotFoundException(jobId);
     277        jobManager.StopJobAsync(jobId);
     278      }
     279      catch (JobNotFoundException) {
     280        throw;
     281      }
     282      catch (JobNotRunningException) {
     283        throw;
     284      }
     285      catch (AppDomainNotCreatedException) {
     286        throw;
     287      }
     288    }
     289
     290    private void HandlePauseJob(object jobIdObj) {
     291      Guid jobId = (Guid)jobIdObj;
     292      try {
     293        Job job = wcfService.GetJob(jobId);
     294        if (job == null) throw new JobNotFoundException(jobId);
     295        jobManager.PauseJobAsync(jobId);
     296      }
     297      catch (JobNotFoundException) {
     298        throw;
     299      }
     300      catch (JobNotRunningException) {
     301        throw;
    232302      }
    233303    }
     
    235305    private void HandleAbortJob(object jobIdObj) {
    236306      Guid jobId = (Guid)jobIdObj;
    237       bool abort = true;
    238       SlaveJob sj = null;
    239 
    240       lock (slaveJobs) {
    241         if (!slaveJobs.ContainsKey(jobId)) {
    242           clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Abort aborted.", jobId));
    243           abort = false;
     307      try {
     308        jobManager.AbortJob(jobId);
     309      }
     310      catch (JobNotFoundException) {
     311        throw;
     312      }
     313    }
     314
     315    #region JobManager Events
     316    private void RegisterJobManagerEvents() {
     317      this.jobManager.JobStarted += new EventHandler<EventArgs<SlaveJob>>(jobManager_JobStarted);
     318      this.jobManager.JobPaused += new EventHandler<EventArgs<SlaveJob, JobData>>(jobManager_JobPaused);
     319      this.jobManager.JobStopped += new EventHandler<EventArgs<SlaveJob, JobData>>(jobManager_JobStopped);
     320      this.jobManager.JobFailed += new EventHandler<EventArgs<Tuple<SlaveJob, JobData, Exception>>>(jobManager_JobFailed);
     321      this.jobManager.ExceptionOccured += new EventHandler<EventArgs<SlaveJob, Exception>>(jobManager_ExceptionOccured);
     322      this.jobManager.JobAborted += new EventHandler<EventArgs<SlaveJob>>(jobManager_JobAborted);
     323      this.jobManager.NewChildJob += new EventHandler<EventArgs<SlaveJob, IJob>>(jobManager_NewChildJob);
     324      this.jobManager.WaitForChildJobs += new EventHandler<EventArgs<SlaveJob, JobData>>(jobManager_WaitForChildJobs);
     325      this.jobManager.DeleteChildJobs += new EventHandler<EventArgs<SlaveJob>>(jobManager_DeleteChildJobs);
     326    }
     327
     328    private void jobManager_JobStarted(object sender, EventArgs<SlaveJob> e) {
     329      // successfully started, everything is good
     330    }
     331
     332    private void jobManager_JobPaused(object sender, EventArgs<SlaveJob, JobData> e) {
     333      try {
     334        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
     335        heartbeatManager.AwakeHeartBeatThread();
     336        Job job = wcfService.GetJob(e.Value.JobId);
     337        if (job == null) throw new JobNotFoundException(e.Value.JobId);
     338        job.ExecutionTime = e.Value.ExecutionTime;
     339        JobData jobData = e.Value.GetJobData();
     340        wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Paused);
     341      }
     342      catch (JobNotFoundException ex) {
     343        clientCom.LogMessage(ex.ToString());
     344      }
     345      catch (Exception ex) {
     346        clientCom.LogMessage(ex.ToString());
     347      }
     348    }
     349
     350    private void jobManager_JobStopped(object sender, EventArgs<SlaveJob, JobData> e) {
     351      try {
     352        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
     353        heartbeatManager.AwakeHeartBeatThread();
     354        Job job = wcfService.GetJob(e.Value.JobId);
     355        if (job == null) throw new JobNotFoundException(e.Value.JobId);
     356        job.ExecutionTime = e.Value.ExecutionTime;
     357        JobData jobData = e.Value.GetJobData();
     358        wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Finished);
     359      }
     360      catch (JobNotFoundException ex) {
     361        clientCom.LogMessage(ex.ToString());
     362      }
     363      catch (Exception ex) {
     364        clientCom.LogMessage(ex.ToString());
     365      }
     366    }
     367
     368    private void jobManager_JobFailed(object sender, EventArgs<Tuple<SlaveJob, JobData, Exception>> e) {
     369      try {
     370        SlaveStatusInfo.DecrementUsedCores(e.Value.Item1.CoresNeeded);
     371        heartbeatManager.AwakeHeartBeatThread();
     372        SlaveJob slaveJob = e.Value.Item1;
     373        JobData jobData = e.Value.Item2;
     374        Exception exception = e.Value.Item3;
     375
     376        Job job = wcfService.GetJob(slaveJob.JobId);
     377        if (job == null) throw new JobNotFoundException(slaveJob.JobId);
     378        job.ExecutionTime = slaveJob.ExecutionTime;
     379        if (jobData != null) {
     380          wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Failed);
    244381        } else {
    245           sj = slaveJobs[jobId];
     382          wcfService.UpdateJobState(job.Id, JobState.Failed, exception.ToString());
    246383        }
    247       }
    248 
    249       if (abort && !sj.Finished) {
    250         sj.KillAppDomain();
    251       }
    252     }
    253 
    254     private void HandleStopJob(object jobIdObj) {
    255       Guid jobId = (Guid)jobIdObj;
    256       bool stop = true;
    257       SlaveJob sj = null;
    258 
    259       lock (slaveJobs) {
    260         if (!slaveJobs.ContainsKey(jobId)) {
    261           clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Stop aborted.", jobId));
    262           stop = false;
    263         } else {
    264           sj = slaveJobs[jobId];
    265         }
    266       }
    267 
    268       if (stop && !sj.Finished) {
    269         sj.StopJob();
    270       }
    271     }
    272 
    273     private void HandlePauseJob(object jobIdObj) {
    274       Guid jobId = (Guid)jobIdObj;
    275       bool pause = true;
    276       SlaveJob sj = null;
    277 
    278       lock (slaveJobs) {
    279         if (!slaveJobs.ContainsKey(jobId)) {
    280           clientCom.LogMessage(string.Format("Job with id {0} doesn't exist. Pause aborted.", jobId));
    281           pause = false;
    282         } else {
    283           sj = slaveJobs[jobId];
    284         }
    285       }
    286 
    287       if (pause && !sj.Finished) {
    288         sj.PauseJob();
    289       }
    290     }
     384        clientCom.LogMessage(exception.Message);
     385      }
     386      catch (JobNotFoundException ex) {
     387        SlaveStatusInfo.IncrementExceptionOccured();
     388        clientCom.LogMessage(ex.ToString());
     389      }
     390      catch (Exception ex) {
     391        SlaveStatusInfo.IncrementExceptionOccured();
     392        clientCom.LogMessage(ex.ToString());
     393      }
     394    }
     395
     396    private void jobManager_ExceptionOccured(object sender, EventArgs<SlaveJob, Exception> e) {
     397      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
     398      SlaveStatusInfo.IncrementExceptionOccured();
     399      heartbeatManager.AwakeHeartBeatThread();
     400      clientCom.LogMessage(string.Format("Exception occured for job {0}: {1}", e.Value.JobId, e.Value2.ToString()));
     401    }
     402
     403    private void jobManager_JobAborted(object sender, EventArgs<SlaveJob> e) {
     404      SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
     405    }
     406
     407    private void jobManager_NewChildJob(object sender, EventArgs<SlaveJob, IJob> e) {
     408      var job = new Job() { CoresNeeded = 1, MemoryNeeded = 0 };
     409      WcfService.Instance.AddChildJob(e.Value.JobId, job, e.Value2);
     410    }
     411
     412    private void jobManager_WaitForChildJobs(object sender, EventArgs<SlaveJob, JobData> e) {
     413      try {
     414        SlaveStatusInfo.DecrementUsedCores(e.Value.CoresNeeded);
     415        heartbeatManager.AwakeHeartBeatThread();
     416        Job job = wcfService.GetJob(e.Value.JobId);
     417        if (job == null) throw new JobNotFoundException(e.Value.JobId);
     418        job.ExecutionTime = e.Value.ExecutionTime;
     419        job.IsParentJob = true;
     420        job.FinishWhenChildJobsFinished = false;
     421        JobData jobData = e.Value.GetJobData();
     422        wcfService.UpdateJobData(job, jobData, configManager.GetClientInfo().Id, JobState.Waiting);
     423      }
     424      catch (JobNotFoundException ex) {
     425        clientCom.LogMessage(ex.ToString());
     426      }
     427      catch (Exception ex) {
     428        clientCom.LogMessage(ex.ToString());
     429      }
     430    }
     431
     432    private void jobManager_DeleteChildJobs(object sender, EventArgs<SlaveJob> e) {
     433      try {
     434        WcfService.Instance.DeleteChildJobs(e.Value.JobId);
     435      }
     436      catch (Exception ex) {
     437        clientCom.LogMessage(ex.ToString());
     438      }
     439    }
     440    #endregion
     441
     442    #region Log Events
     443    private void log_MessageAdded(object sender, EventArgs<string> e) {
     444      clientCom.LogMessage(e.Value.Split('\t')[1]);
     445      ((ILog)sender).Clear(); // don't let the log take up memory
     446    }
     447    #endregion
    291448
    292449    /// <summary>
     
    294451    /// </summary>
    295452    private void DoAbortAll() {
    296       lock (slaveJobs) {
    297         foreach (SlaveJob sj in new List<SlaveJob>(slaveJobs.Values)) {
    298           if (!sj.Finished) {
    299             sj.KillAppDomain();
    300           }
    301         }
    302       }
    303       clientCom.LogMessage("Aborted all jobs!");
     453      clientCom.LogMessage("Aborting all jobs.");
     454      foreach (Guid jobId in jobManager.JobIds) {
     455        AbortJobAsync(jobId);
     456      }
    304457    }
    305458
     
    308461    /// </summary>
    309462    private void DoPauseAll() {
    310       clientCom.LogMessage("Pause all received");
    311 
    312       lock (slaveJobs) {
    313         foreach (SlaveJob sj in new List<SlaveJob>(slaveJobs.Values)) {
    314           if (!sj.Finished) {
    315             sj.PauseJob();
    316           }
    317         }
     463      clientCom.LogMessage("Pausing all jobs.");
     464      foreach (Guid jobId in jobManager.JobIds) {
     465        PauseJobAsync(jobId);
    318466      }
    319467    }
     
    323471    /// </summary>
    324472    private void DoStopAll() {
    325       clientCom.LogMessage("Stop all received");
    326 
    327       lock (slaveJobs) {
    328         foreach (SlaveJob sj in new List<SlaveJob>(slaveJobs.Values)) {
    329           if (!sj.Finished) {
    330             sj.StopJob();
    331           }
    332         }
    333       }
    334     }
    335 
     473      clientCom.LogMessage("Stopping all jobs.");
     474      foreach (Guid jobId in jobManager.JobIds) {
     475        StopJobAsync(jobId);
     476      }
     477    }
     478
     479    #region Slave Lifecycle Methods
    336480    /// <summary>
    337481    /// completly shudown slave
     
    350494      clientCom.LogMessage("Stopping heartbeat");
    351495      heartbeatManager.StopHeartBeat();
    352       AbortRequested = true;
     496      abortRequested = true;
    353497      clientCom.LogMessage("Logging out");
    354498
     
    369513    private void DoStartSlave() {
    370514      clientCom.LogMessage("Restart received");
    371       ConfigManager.Instance.Asleep = false;
     515      configManager.Asleep = false;
    372516      clientCom.LogMessage("Restart done");
    373517    }
     
    379523    private void Sleep() {
    380524      clientCom.LogMessage("Sleep received - not accepting any new jobs");
    381       ConfigManager.Instance.Asleep = true;
    382       DoPauseAll(); //TODO: or stop? can't decide...     
    383     }
    384 
    385     public void RemoveSlaveJobFromList(Guid jobId) {
    386       lock (slaveJobs) {
    387         if (slaveJobs.ContainsKey(jobId)) {
    388           slaveJobs.Remove(jobId);
    389         }
    390       }
    391     }
     525      configManager.Asleep = true;
     526      DoPauseAll();   
     527    }
     528    #endregion
    392529  }
    393530}
Note: See TracChangeset for help on using the changeset viewer.