Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4423

Last change on this file since 4423 was 4423, checked in by cneumuel, 14 years ago
  • Refactored HL.Hive.Experiment. JobItems are not called HiveJobs and OptimizerJobs do not contain a hierarchy anymore.
  • Dynamic generation of jobs on a slave are not reflected on the client user interface.
  • Optimizer-Trees are now strictly synchronized with the HiveJob-Trees (also the ComputeInParallel property is taken into account when the Child HiveJobs are created)
  • Improved the way a class can report progress and lock the UI (IProgressReporter, IProgress, Progress, ProgressView)
  • Changes were made to the config-files, so that server and clients work with blade12.hpc.fh-hagenberg.at
  • Lots of small changes and bugfixes
File size: 21.6 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using HeuristicLab.Core;
6using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
7using HeuristicLab.Common;
8using HeuristicLab.Hive.Contracts.BusinessObjects;
9using HeuristicLab.Hive.Contracts;
10using HeuristicLab.Hive.Contracts.Interfaces;
11using HeuristicLab.Hive.Contracts.ResponseObjects;
12using HeuristicLab.Hive.JobBase;
13using HeuristicLab.Persistence.Default.Xml;
14using System.IO;
15using HeuristicLab.Hive.Experiment.Jobs;
16using HeuristicLab.Tracing;
17using System.Threading;
18using HeuristicLab.Optimization;
19
20namespace HeuristicLab.Hive.Experiment {
21  /// <summary>
22  /// An experiment which contains multiple batch runs of algorithms.
23  /// </summary>
24  [Item(itemName, itemDescription)]
25  public class HiveExperiment : NamedItem, IExecutable, IProgressReporter {
26    private object locker = new object();
27    private const string itemName = "Hive Experiment";
28    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
29    private System.Timers.Timer timer;
30
31    private Guid hiveExperimentId;
32    public Guid HiveExperimentId {
33      get { return hiveExperimentId; }
34      set { hiveExperimentId = value; }
35    }
36
37    private HiveJob hiveJob;
38    public HiveJob HiveJob {
39      get { return hiveJob; }
40      set {
41        DeregisterHiveJobEvents();
42        if (hiveJob != value) {
43          hiveJob = value;
44          RegisterHiveJobEvents();
45          OnHiveJobChanged();
46        }
47      }
48    }
49
50    private ILog log;
51    public ILog Log {
52      get { return log; }
53    }
54
55    private DateTime lastUpdateTime;
56
57    private string resourceIds;
58    public string ResourceIds {
59      get { return resourceIds; }
60      set {
61        if (resourceIds != value) {
62          resourceIds = value;
63          OnResourceIdsChanged();
64        }
65      }
66    }
67
68    private Guid? rootJobId;
69
70    private bool isPollingResults;
71    public bool IsPollingResults {
72      get { return isPollingResults; }
73      private set {
74        if (isPollingResults != value) {
75          isPollingResults = value;
76          OnIsPollingResultsChanged();
77        }
78      }
79    }
80
81    private bool isProgressing;
82    public bool IsProgressing {
83      get { return isProgressing; }
84      set {
85        if (isProgressing != value) {
86          isProgressing = value;
87          OnIsProgressingChanged();
88        }
89      }
90    }
91
92    private IProgress progress;
93    public IProgress Progress {
94      get { return progress; }
95    }
96   
97    private JobResultPoller jobResultPoller;
98
99    public HiveExperiment()
100      : base(itemName, itemDescription) {
101      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
102      this.log = new Log();
103      InitTimer();
104    }
105
106    public HiveExperiment(HiveExperimentDto hiveExperimentDto)
107      : this() {
108      UpdateFromDto(hiveExperimentDto);
109    }
110
111    public void UpdateFromDto(HiveExperimentDto hiveExperimentDto) {
112      this.HiveExperimentId = hiveExperimentDto.Id;
113      this.Name = hiveExperimentDto.Name;
114      this.Description = hiveExperimentDto.Description;
115      this.ResourceIds = hiveExperimentDto.ResourceIds;
116      this.rootJobId = hiveExperimentDto.RootJobId;
117    }
118
119    public HiveExperimentDto ToHiveExperimentDto() {
120      return new HiveExperimentDto() {
121        Id = this.HiveExperimentId,
122        Name = this.Name,
123        Description = this.Description,
124        ResourceIds = this.ResourceIds,
125        RootJobId = this.rootJobId
126      };
127    }
128
129    public override IDeepCloneable Clone(Cloner cloner) {
130      LogMessage("I am beeing cloned");
131      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
132      clone.resourceIds = this.resourceIds;
133      clone.executionState = this.executionState;
134      clone.executionTime = this.executionTime;
135      clone.log = (ILog)cloner.Clone(log);
136      clone.lastUpdateTime = this.lastUpdateTime;
137      clone.rootJobId = this.rootJobId;
138      return clone;
139    }
140
141    public void SetExperiment(Optimization.Experiment experiment) {
142      this.HiveJob = new HiveJob(experiment);
143      Prepare();
144    }
145
146    private void RegisterHiveJobEvents() {
147      if (HiveJob != null) {
148        HiveJob.JobStateChanged += new EventHandler(HiveJob_JobStateChanged);
149      }
150    }
151
152    private void DeregisterHiveJobEvents() {
153      if (HiveJob != null) {
154        HiveJob.JobStateChanged -= new EventHandler(HiveJob_JobStateChanged);
155      }
156    }
157
158    /// <summary>
159    /// Returns the experiment from the root HiveJob
160    /// </summary>
161    public Optimization.Experiment GetExperiment() {
162      if (this.HiveJob != null) {
163        return HiveJob.Job.OptimizerAsExperiment;
164      }
165      return null;
166    }
167
168    #region IExecutable Members
169    private Core.ExecutionState executionState;
170    public ExecutionState ExecutionState {
171      get { return executionState; }
172      private set {
173        if (executionState != value) {
174          executionState = value;
175          OnExecutionStateChanged();
176        }
177      }
178    }
179
180    private TimeSpan executionTime;
181    public TimeSpan ExecutionTime {
182      get { return executionTime; }
183      private set {
184        if (executionTime != value) {
185          executionTime = value;
186          OnExecutionTimeChanged();
187        }
188      }
189    }
190
191    public void Pause() {
192      throw new NotSupportedException();
193    }
194
195    public void Prepare() {
196      // do nothing
197    }
198
199    public void Start() {
200      OnStarted();
201      ExecutionTime = new TimeSpan();
202      lastUpdateTime = DateTime.Now;
203      this.ExecutionState = Core.ExecutionState.Started;
204
205      Thread t = new Thread(RunUploadExperiment);
206      t.Name = "RunUploadExperimentThread";
207      t.Start();
208    }
209
210    private void RunUploadExperiment() {
211      try {
212        this.progress = new Progress("Connecting to server...");
213        IsProgressing = true;
214        using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
215          IEnumerable<string> groups = ToResourceIdList(this.ResourceIds);
216          this.HiveJob.SetIndexInParentOptimizerList(null);
217
218          int totalJobCount = this.HiveJob.GetAllHiveJobs().Count();
219          int jobCount = 0;
220
221          this.progress.Status = "Uploading jobs...";
222          UploadJobWithChildren(service.Obj, this.HiveJob, null, groups, ref jobCount, totalJobCount);
223          this.rootJobId = this.HiveJob.JobDto.Id;
224          LogMessage("Finished sending jobs to hive");
225
226          // insert or update HiveExperiment
227          this.progress.Status = "Uploading HiveExperiment...";
228          ResponseObject<HiveExperimentDto> resp = service.Obj.UpdateHiveExperiment(this.ToHiveExperimentDto());
229          this.UpdateFromDto(resp.Obj);
230
231          StartResultPolling();
232        }
233      }
234      catch (Exception e) {
235        OnExceptionOccured(e);
236      }
237      finally {
238        IsProgressing = false;
239      }
240    }
241
242    /// <summary>
243    /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs
244    ///
245    /// </summary>
246    /// <param name="service"></param>
247    /// <param name="hiveJob"></param>
248    /// <param name="parentHiveJob">shall be null if its the root job</param>
249    /// <param name="groups"></param>
250    private void UploadJobWithChildren(IClientFacade service, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<string> groups, ref int jobCount, int totalJobCount) {
251      jobCount++;
252      this.progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount);
253      SerializedJob serializedJob;
254      if (hiveJob.Job.ComputeInParallel &&
255        (hiveJob.Job.Optimizer is Optimization.Experiment || hiveJob.Job.Optimizer is Optimization.BatchRun)) {
256        hiveJob.JobDto.State = JobState.WaitForChildJobs;
257        hiveJob.Job.CollectChildJobs = false; // don't collect child-jobs on slaves
258        serializedJob = hiveJob.GetAsSerializedJob(true);
259      } else {
260        serializedJob = hiveJob.GetAsSerializedJob(false);
261      }
262
263      this.progress.Status = string.Format("Uploading job {0} of {1} ({2} kb)", jobCount, totalJobCount, serializedJob.SerializedJobData.Count() / 1024);
264      this.progress.ProgressValue = (double)jobCount / totalJobCount;
265      ResponseObject<JobDto> response;
266      if (parentHiveJob != null) {
267        response = service.AddChildJob(parentHiveJob.JobDto.Id, serializedJob);
268      } else {
269        response = service.AddJobWithGroupStrings(serializedJob, groups);
270      }
271
272      if (response.StatusMessage == ResponseStatus.Ok) {
273        LogMessage(response.Obj.Id, "Job sent to Hive");
274        hiveJob.JobDto = response.Obj;
275
276        foreach (HiveJob child in hiveJob.ChildHiveJobs) {
277          UploadJobWithChildren(service, child, hiveJob, groups, ref jobCount, totalJobCount);
278        }
279      } else {
280        throw new AddJobToHiveException(response.StatusMessage.ToString());
281      }
282    }
283
284    /// <summary>
285    /// Converts a string which can contain Ids separated by ';' to a enumerable
286    /// </summary>
287    private IEnumerable<string> ToResourceIdList(string resourceGroups) {
288      if (!string.IsNullOrEmpty(resourceGroups)) {
289        return resourceIds.Split(';');
290      } else {
291        return new List<string>();
292      }
293    }
294
295    public void Stop() {
296      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
297        foreach(HiveJob hj in HiveJob.GetAllHiveJobs()) {
298          service.Obj.AbortJob(hj.JobDto.Id);
299        }
300      }
301    }
302
303    #endregion
304
305    public void StartResultPolling() {
306      if (!jobResultPoller.IsPolling) {
307        jobResultPoller.Start();
308      } else {
309        throw new JobResultPollingException("Result polling already running");
310      }
311    }
312
313    public void StopResultPolling() {
314      if (jobResultPoller.IsPolling) {
315        jobResultPoller.Stop();
316      } else {
317        throw new JobResultPollingException("Result polling not running");
318      }
319    }
320
321    #region HiveJob Events
322    void HiveJob_JobStateChanged(object sender, EventArgs e) {
323      if (HiveJob != null) {
324        rootJobId = HiveJob.JobDto.Id;
325      }
326    }
327    #endregion
328
329    #region Eventhandler
330
331    public event EventHandler ExecutionTimeChanged;
332    private void OnExecutionTimeChanged() {
333      EventHandler handler = ExecutionTimeChanged;
334      if (handler != null) handler(this, EventArgs.Empty);
335    }
336
337    public event EventHandler ExecutionStateChanged;
338    private void OnExecutionStateChanged() {
339      LogMessage("ExecutionState changed to " + executionState.ToString());
340      EventHandler handler = ExecutionStateChanged;
341      if (handler != null) handler(this, EventArgs.Empty);
342    }
343
344    public event EventHandler Started;
345    private void OnStarted() {
346      LogMessage("Started");
347      timer.Start();
348      EventHandler handler = Started;
349      if (handler != null) handler(this, EventArgs.Empty);
350    }
351
352    public event EventHandler Stopped;
353    private void OnStopped() {
354      LogMessage("Stopped");
355      timer.Stop();
356      EventHandler handler = Stopped;
357      if (handler != null) handler(this, EventArgs.Empty);
358    }
359
360    public event EventHandler Paused;
361    private void OnPaused() {
362      LogMessage("Paused");
363      EventHandler handler = Paused;
364      if (handler != null) handler(this, EventArgs.Empty);
365    }
366
367    public event EventHandler Prepared;
368    protected virtual void OnPrepared() {
369      LogMessage("Prepared");
370      EventHandler handler = Prepared;
371      if (handler != null) handler(this, EventArgs.Empty);
372    }
373
374    public event EventHandler ResourceIdsChanged;
375    protected virtual void OnResourceIdsChanged() {
376      EventHandler handler = ResourceIdsChanged;
377      if (handler != null) handler(this, EventArgs.Empty);
378    }
379
380    public event EventHandler IsResultsPollingChanged;
381    private void OnIsPollingResultsChanged() {
382      if (this.IsPollingResults) {
383        LogMessage("Results Polling Started");
384      } else {
385        LogMessage("Results Polling Stopped");
386      }
387      EventHandler handler = IsResultsPollingChanged;
388      if (handler != null) handler(this, EventArgs.Empty);
389    }
390
391    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
392    private void OnExceptionOccured(Exception e) {
393      var handler = ExceptionOccurred;
394      if (handler != null) handler(this, new EventArgs<Exception>(e));
395    }
396
397    public event EventHandler HiveJobChanged;
398    private void OnHiveJobChanged() {
399      if (jobResultPoller != null && jobResultPoller.IsPolling) {
400        jobResultPoller.Stop();
401        DeregisterResultPollingEvents();
402      }
403      if (HiveJob != null) {
404        jobResultPoller = new JobResultPoller(HiveJob, ApplicationConstants.RESULT_POLLING_INTERVAL);
405        RegisterResultPollingEvents();
406      }
407      EventHandler handler = HiveJobChanged;
408      if (handler != null) handler(this, EventArgs.Empty);
409    }
410
411    public event EventHandler IsProgressingChanged;
412    private void OnIsProgressingChanged() {
413      var handler = IsProgressingChanged;
414      if (handler != null) handler(this, EventArgs.Empty);
415    }
416    #endregion
417
418    #region JobResultPoller Events
419    private void RegisterResultPollingEvents() {
420      jobResultPoller.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
421      jobResultPoller.JobResultsReceived += new EventHandler<EventArgs<JobResultList>>(jobResultPoller_JobResultReceived);
422      jobResultPoller.PollingStarted += new EventHandler(jobResultPoller_PollingStarted);
423      jobResultPoller.PollingFinished += new EventHandler(jobResultPoller_PollingFinished);
424      jobResultPoller.IsPollingChanged += new EventHandler(jobResultPoller_IsPollingChanged);
425    }
426    private void DeregisterResultPollingEvents() {
427      jobResultPoller.ExceptionOccured -= new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
428      jobResultPoller.JobResultsReceived -= new EventHandler<EventArgs<JobResultList>>(jobResultPoller_JobResultReceived);
429      jobResultPoller.PollingStarted -= new EventHandler(jobResultPoller_PollingStarted);
430      jobResultPoller.PollingFinished -= new EventHandler(jobResultPoller_PollingFinished);
431      jobResultPoller.IsPollingChanged -= new EventHandler(jobResultPoller_IsPollingChanged);
432    }
433    void jobResultPoller_IsPollingChanged(object sender, EventArgs e) {
434      this.IsPollingResults = jobResultPoller.IsPolling;
435    }
436    void jobResultPoller_PollingFinished(object sender, EventArgs e) {
437      LogMessage("Polling results finished");
438    }
439    void jobResultPoller_PollingStarted(object sender, EventArgs e) {
440      LogMessage("Polling results started");
441    }
442    void jobResultPoller_JobResultReceived(object sender, EventArgs<JobResultList> e) {
443      foreach (JobResult jobResult in e.Value) {
444        HiveJob hj = hiveJob.GetHiveJobByJobId(jobResult.Id);
445        if (hj != null) {
446          hj.UpdateFromJobResult(jobResult);
447          if ((hj.JobDto.State == JobState.Aborted ||
448               hj.JobDto.State == JobState.Failed ||
449               hj.JobDto.State == JobState.Finished) &&
450              !hj.IsFinishedOptimizerDownloaded) {
451            LogMessage(hj.JobDto.Id, "Downloading optimizer for job");
452            OptimizerJob optimizerJob = LoadOptimizerJob(hj.JobDto.Id);
453            if (jobResult.ParentJobId.HasValue) {
454              HiveJob parentHiveJob = HiveJob.GetHiveJobByJobId(jobResult.ParentJobId.Value);
455              parentHiveJob.UpdateChildOptimizer(optimizerJob, hj.JobDto.Id);
456            } else {
457              this.HiveJob.IsFinishedOptimizerDownloaded = true;
458            }
459          }
460        }
461      }
462      GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory)
463      if (AllJobsFinished()) {
464        this.ExecutionState = Core.ExecutionState.Stopped;
465        StopResultPolling();
466        OnStopped();
467      }
468    }
469
470    private bool AllJobsFinished() {
471      return HiveJob.GetAllHiveJobs().All(hj => hj.IsFinishedOptimizerDownloaded);
472    }
473
474    void jobResultPoller_ExceptionOccured(object sender, EventArgs<Exception> e) {
475      OnExceptionOccured(e.Value);
476    }
477    #endregion
478
479    #region Execution Time Timer
480    private void InitTimer() {
481      timer = new System.Timers.Timer(100);
482      timer.AutoReset = true;
483      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
484    }
485
486    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
487      DateTime now = DateTime.Now;
488      ExecutionTime += now - lastUpdateTime;
489      lastUpdateTime = now;
490    }
491    #endregion
492
493    #region Logging
494    private void LogMessage(string message) {
495      // HeuristicLab.Log is not Thread-Safe, so lock on every call
496      lock (locker) {
497        log.LogMessage(message);
498        Logger.Debug(message);
499      }
500    }
501
502    private void LogMessage(Guid jobId, string message) {
503      //GetJobItemById(jobId).LogMessage(message);
504      LogMessage(message + " (jobId: " + jobId + ")");
505    }
506
507    #endregion
508
509    /// <summary>
510    /// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem
511    /// </summary>
512    public void LoadHiveJob() {
513      progress = new Progress();
514      try {
515        IsProgressing = true;
516        int totalJobCount = 0;
517        int jobCount = 0;
518        progress.Status = "Connecting to Server...";
519        using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
520          // fetch all JobDto objects to create the full tree of tree of HiveJob objects
521          progress.Status = "Downloading list of jobs...";
522          JobResultList allResults = service.Obj.GetChildJobResults(rootJobId.Value, true, true).Obj;
523          totalJobCount = allResults.Count;
524
525          // download them first
526          IDictionary<Guid, SerializedJob> allSerializedJobs = new Dictionary<Guid, SerializedJob>();
527          foreach (JobResult jobResult in allResults) {
528            jobCount++;
529            progress.Status = string.Format("Downloading {0} of {1} jobs...", jobCount, totalJobCount);
530            allSerializedJobs.Add(jobResult.Id, service.Obj.GetLastSerializedResult(jobResult.Id).Obj);
531            progress.ProgressValue = (double)jobCount / totalJobCount;
532          }
533
534          jobCount = 1;
535          progress.Status = string.Format("Deserializing {0} of {1} jobs... ({2} kb)", jobCount, totalJobCount, allSerializedJobs[this.rootJobId.Value].SerializedJobData.Count() / 1024);
536          this.HiveJob = new HiveJob(allSerializedJobs[this.rootJobId.Value], false);
537          allSerializedJobs.Remove(this.rootJobId.Value); // reduce memory footprint
538          progress.ProgressValue = (double)jobCount / totalJobCount;
539
540          if (this.HiveJob.JobDto.DateFinished.HasValue) {
541            this.ExecutionTime = this.HiveJob.JobDto.DateFinished.Value - this.HiveJob.JobDto.DateCreated.Value;
542            this.lastUpdateTime = this.HiveJob.JobDto.DateFinished.Value;
543          } else {
544            this.ExecutionTime = DateTime.Now - this.HiveJob.JobDto.DateCreated.Value;
545            this.lastUpdateTime = DateTime.Now;
546          }
547
548          // build child-job tree
549          LoadChildResults(service.Obj, this.HiveJob, allResults, allSerializedJobs, progress, totalJobCount, ref jobCount);
550          StartResultPolling();
551        }
552      }
553      catch (Exception e) {
554        OnExceptionOccured(e);
555      }
556      finally {
557        IsProgressing = false;
558      }
559    }
560
561    private void LoadChildResults(IClientFacade service, HiveJob parentHiveJob, JobResultList allResults, IDictionary<Guid, SerializedJob> allSerializedJobs, IProgress progress, int totalJobCount, ref int jobCount) {
562      IEnumerable<JobResult> childResults = from result in allResults
563                                            where result.ParentJobId.HasValue && result.ParentJobId.Value == parentHiveJob.JobDto.Id
564                                            orderby result.DateCreated ascending
565                                            select result;
566      foreach (JobResult jobResult in childResults) {
567        jobCount++;
568        progress.Status = string.Format("Deserializing {0} of {1} jobs ({2} kb)...", jobCount, totalJobCount, allSerializedJobs[jobResult.Id].SerializedJobData.Count() / 1024);
569        OptimizerJob optimizerJob = SerializedJob.Deserialize<OptimizerJob>(allSerializedJobs[jobResult.Id].SerializedJobData);
570        progress.ProgressValue = (double)jobCount / totalJobCount;
571        HiveJob childHiveJob = new HiveJob(optimizerJob, false);
572        parentHiveJob.AddChildHiveJob(childHiveJob);
573        childHiveJob.JobDto = allSerializedJobs[jobResult.Id].JobInfo;
574        allSerializedJobs.Remove(jobResult.Id); // reduce memory footprint
575        if(jobCount % 10 == 0) GC.Collect(); // this is needed or otherwise HL takes over the system when the number of jobs is high
576        LoadChildResults(service, childHiveJob, allResults, allSerializedJobs, progress, totalJobCount, ref jobCount);
577      }
578    }
579
580    private OptimizerJob LoadOptimizerJob(Guid jobId) {
581      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
582        ResponseObject<SerializedJob> serializedJob = service.Obj.GetLastSerializedResult(jobId);
583        return SerializedJob.Deserialize<OptimizerJob>(serializedJob.Obj.SerializedJobData);
584      }
585    }
586  }
587}
Note: See TracBrowser for help on using the repository browser.