Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive.ExperimentManager/HeuristicLab.Hive.ExperimentManager/3.3/HiveExperiment.cs @ 4813

Last change on this file since 4813 was 4813, checked in by cneumuel, 13 years ago

#1260

  • minor bug fixes.
  • updated to latest trunk
File size: 22.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts.Interfaces;
31using HeuristicLab.Hive.Contracts.ResponseObjects;
32using HeuristicLab.Hive.ExperimentManager.Jobs;
33using HeuristicLab.Hive.Tracing;
34
35namespace HeuristicLab.Hive.ExperimentManager {
36  /// <summary>
37  /// An experiment which contains multiple batch runs of algorithms.
38  /// </summary>
39  [Item(itemName, itemDescription)]
40  public class HiveExperiment : NamedItem, IExecutable, IProgressReporter {
41    private object locker = new object();
42    private const string itemName = "Hive Experiment";
43    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
44    private System.Timers.Timer timer;
45
46    private JobResultPoller jobResultPoller;
47    private Guid? rootJobId;
48    private DateTime lastUpdateTime;
49
50    #region Properties
51    private Guid hiveExperimentId;
52    public Guid HiveExperimentId {
53      get { return hiveExperimentId; }
54      set { hiveExperimentId = value; }
55    }
56
57    private HiveJob hiveJob;
58    public HiveJob HiveJob {
59      get { return hiveJob; }
60      set {
61        DeregisterHiveJobEvents();
62        if (hiveJob != value) {
63          hiveJob = value;
64          RegisterHiveJobEvents();
65          OnHiveJobChanged();
66        }
67      }
68    }
69
70    private ILog log;
71    public ILog Log {
72      get { return log; }
73    }
74
75    private string resourceIds;
76    public string ResourceIds {
77      get { return resourceIds; }
78      set {
79        if (resourceIds != value) {
80          resourceIds = value;
81          OnResourceIdsChanged();
82        }
83      }
84    }
85
86    private bool isPollingResults;
87    public bool IsPollingResults {
88      get { return isPollingResults; }
89      private set {
90        if (isPollingResults != value) {
91          isPollingResults = value;
92          OnIsPollingResultsChanged();
93        }
94      }
95    }
96
97    private bool isProgressing;
98    public bool IsProgressing {
99      get { return isProgressing; }
100      set {
101        if (isProgressing != value) {
102          isProgressing = value;
103          OnIsProgressingChanged();
104        }
105      }
106    }
107
108    private IProgress progress;
109    public IProgress Progress {
110      get { return progress; }
111    }
112    #endregion
113
114    public HiveExperiment()
115      : base(itemName, itemDescription) {
116      this.ResourceIds = HeuristicLab.Hive.ExperimentManager.Properties.Settings.Default.ResourceIds;
117      this.log = new Log();
118      InitTimer();
119    }
120    public HiveExperiment(HiveExperimentDto hiveExperimentDto)
121      : this() {
122      UpdateFromDto(hiveExperimentDto);
123    }
124    protected HiveExperiment(HiveExperiment original, Cloner cloner)
125      : base(original, cloner) {
126      this.ResourceIds = original.resourceIds;
127      this.ExecutionState = original.executionState;
128      this.ExecutionTime = original.executionTime;
129      this.log = cloner.Clone(log);
130      this.lastUpdateTime = original.lastUpdateTime;
131      this.rootJobId = original.rootJobId;
132    }
133    public override IDeepCloneable Clone(Cloner cloner) {
134      return new HiveExperiment(this, cloner);
135    }
136
137    public void UpdateFromDto(HiveExperimentDto hiveExperimentDto) {
138      this.HiveExperimentId = hiveExperimentDto.Id;
139      this.Name = hiveExperimentDto.Name;
140      this.Description = hiveExperimentDto.Description;
141      this.ResourceIds = hiveExperimentDto.ResourceIds;
142      this.rootJobId = hiveExperimentDto.RootJobId;
143    }
144
145    public HiveExperimentDto ToHiveExperimentDto() {
146      return new HiveExperimentDto() {
147        Id = this.HiveExperimentId,
148        Name = this.Name,
149        Description = this.Description,
150        ResourceIds = this.ResourceIds,
151        RootJobId = this.rootJobId
152      };
153    }
154
155    public void SetExperiment(Optimization.Experiment experiment) {
156      this.HiveJob = new HiveJob(experiment);
157      Prepare();
158    }
159
160    private void RegisterHiveJobEvents() {
161      if (HiveJob != null) {
162        HiveJob.JobStateChanged += new EventHandler(HiveJob_JobStateChanged);
163      }
164    }
165
166    private void DeregisterHiveJobEvents() {
167      if (HiveJob != null) {
168        HiveJob.JobStateChanged -= new EventHandler(HiveJob_JobStateChanged);
169      }
170    }
171
172    /// <summary>
173    /// Returns the experiment from the root HiveJob
174    /// </summary>
175    public Optimization.Experiment GetExperiment() {
176      if (this.HiveJob != null) {
177        return HiveJob.Job.OptimizerAsExperiment;
178      }
179      return null;
180    }
181
182    #region IExecutable Members
183    private Core.ExecutionState executionState;
184    public ExecutionState ExecutionState {
185      get { return executionState; }
186      private set {
187        if (executionState != value) {
188          executionState = value;
189          OnExecutionStateChanged();
190        }
191      }
192    }
193
194    private TimeSpan executionTime;
195    public TimeSpan ExecutionTime {
196      get { return executionTime; }
197      private set {
198        if (executionTime != value) {
199          executionTime = value;
200          OnExecutionTimeChanged();
201        }
202      }
203    }
204
205    public void Pause() {
206      throw new NotSupportedException();
207    }
208
209    public void Prepare() {
210      // do nothing
211    }
212
213    public void Start() {
214      OnStarted();
215      ExecutionTime = new TimeSpan();
216      lastUpdateTime = DateTime.Now;
217      this.ExecutionState = Core.ExecutionState.Started;
218
219      Thread t = new Thread(RunUploadExperiment);
220      t.Name = "RunUploadExperimentThread";
221      t.Start();
222    }
223
224    private void RunUploadExperiment() {
225      try {
226        this.progress = new Progress("Connecting to server...");
227        IsProgressing = true;
228        using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
229          IEnumerable<string> groups = ToResourceIdList(this.ResourceIds);
230          this.HiveJob.SetIndexInParentOptimizerList(null);
231
232          int totalJobCount = this.HiveJob.GetAllHiveJobs().Count();
233          int jobCount = 0;
234
235          this.progress.Status = "Uploading jobs...";
236          UploadJobWithChildren(service.Obj, this.HiveJob, null, groups, ref jobCount, totalJobCount);
237          this.rootJobId = this.HiveJob.JobDto.Id;
238          LogMessage("Finished sending jobs to hive");
239
240          // insert or update HiveExperiment
241          this.progress.Status = "Uploading HiveExperiment...";
242          ResponseObject<HiveExperimentDto> resp = service.Obj.UpdateHiveExperiment(this.ToHiveExperimentDto());
243          this.UpdateFromDto(resp.Obj);
244
245          StartResultPolling();
246        }
247      }
248      catch (Exception e) {
249        OnExceptionOccured(e);
250      }
251      finally {
252        IsProgressing = false;
253      }
254    }
255
256    /// <summary>
257    /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs
258    ///
259    /// </summary>
260    /// <param name="service"></param>
261    /// <param name="hiveJob"></param>
262    /// <param name="parentHiveJob">shall be null if its the root job</param>
263    /// <param name="groups"></param>
264    private void UploadJobWithChildren(IClientFacade service, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<string> groups, ref int jobCount, int totalJobCount) {
265      jobCount++;
266      this.progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount);
267      SerializedJob serializedJob;
268      if (hiveJob.Job.ComputeInParallel &&
269        (hiveJob.Job.Optimizer is Optimization.Experiment || hiveJob.Job.Optimizer is Optimization.BatchRun)) {
270        hiveJob.JobDto.State = JobState.WaitForChildJobs;
271        hiveJob.Job.CollectChildJobs = false; // don't collect child-jobs on slaves
272        serializedJob = hiveJob.GetAsSerializedJob(true);
273      } else {
274        serializedJob = hiveJob.GetAsSerializedJob(false);
275      }
276
277      this.progress.Status = string.Format("Uploading job {0} of {1} ({2} kb)", jobCount, totalJobCount, serializedJob.SerializedJobData.Count() / 1024);
278      this.progress.ProgressValue = (double)jobCount / totalJobCount;
279      ResponseObject<JobDto> response;
280      if (parentHiveJob != null) {
281        response = service.AddChildJob(parentHiveJob.JobDto.Id, serializedJob);
282      } else {
283        response = service.AddJobWithGroupStrings(serializedJob, groups);
284      }
285
286      if (response.StatusMessage == ResponseStatus.Ok) {
287        LogMessage(response.Obj.Id, "Job sent to Hive");
288        hiveJob.JobDto = response.Obj;
289
290        foreach (HiveJob child in hiveJob.ChildHiveJobs) {
291          UploadJobWithChildren(service, child, hiveJob, groups, ref jobCount, totalJobCount);
292        }
293      } else {
294        throw new AddJobToHiveException(response.StatusMessage.ToString());
295      }
296    }
297
298    /// <summary>
299    /// Converts a string which can contain Ids separated by ';' to a enumerable
300    /// </summary>
301    private IEnumerable<string> ToResourceIdList(string resourceGroups) {
302      if (!string.IsNullOrEmpty(resourceGroups)) {
303        return resourceIds.Split(';');
304      } else {
305        return new List<string>();
306      }
307    }
308
309    public void Stop() {
310      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
311        foreach (HiveJob hj in HiveJob.GetAllHiveJobs()) {
312          service.Obj.AbortJob(hj.JobDto.Id);
313        }
314      }
315    }
316
317    #endregion
318
319    public void StartResultPolling() {
320      if (!jobResultPoller.IsPolling) {
321        jobResultPoller.Start();
322      } else {
323        throw new JobResultPollingException("Result polling already running");
324      }
325    }
326
327    public void StopResultPolling() {
328      if (jobResultPoller.IsPolling) {
329        jobResultPoller.Stop();
330      } else {
331        throw new JobResultPollingException("Result polling not running");
332      }
333    }
334
335    #region HiveJob Events
336    void HiveJob_JobStateChanged(object sender, EventArgs e) {
337      if (HiveJob != null) {
338        rootJobId = HiveJob.JobDto.Id;
339      }
340    }
341    #endregion
342
343    #region Eventhandler
344
345    public event EventHandler ExecutionTimeChanged;
346    private void OnExecutionTimeChanged() {
347      EventHandler handler = ExecutionTimeChanged;
348      if (handler != null) handler(this, EventArgs.Empty);
349    }
350
351    public event EventHandler ExecutionStateChanged;
352    private void OnExecutionStateChanged() {
353      LogMessage("ExecutionState changed to " + executionState.ToString());
354      EventHandler handler = ExecutionStateChanged;
355      if (handler != null) handler(this, EventArgs.Empty);
356    }
357
358    public event EventHandler Started;
359    private void OnStarted() {
360      LogMessage("Started");
361      timer.Start();
362      EventHandler handler = Started;
363      if (handler != null) handler(this, EventArgs.Empty);
364    }
365
366    public event EventHandler Stopped;
367    private void OnStopped() {
368      LogMessage("Stopped");
369      timer.Stop();
370      EventHandler handler = Stopped;
371      if (handler != null) handler(this, EventArgs.Empty);
372    }
373
374    public event EventHandler Paused;
375    private void OnPaused() {
376      LogMessage("Paused");
377      EventHandler handler = Paused;
378      if (handler != null) handler(this, EventArgs.Empty);
379    }
380
381    public event EventHandler Prepared;
382    protected virtual void OnPrepared() {
383      LogMessage("Prepared");
384      EventHandler handler = Prepared;
385      if (handler != null) handler(this, EventArgs.Empty);
386    }
387
388    public event EventHandler ResourceIdsChanged;
389    protected virtual void OnResourceIdsChanged() {
390      EventHandler handler = ResourceIdsChanged;
391      if (handler != null) handler(this, EventArgs.Empty);
392    }
393
394    public event EventHandler IsResultsPollingChanged;
395    private void OnIsPollingResultsChanged() {
396      if (this.IsPollingResults) {
397        LogMessage("Results Polling Started");
398      } else {
399        LogMessage("Results Polling Stopped");
400      }
401      EventHandler handler = IsResultsPollingChanged;
402      if (handler != null) handler(this, EventArgs.Empty);
403    }
404
405    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
406    private void OnExceptionOccured(Exception e) {
407      var handler = ExceptionOccurred;
408      if (handler != null) handler(this, new EventArgs<Exception>(e));
409    }
410
411    public event EventHandler HiveJobChanged;
412    private void OnHiveJobChanged() {
413      if (jobResultPoller != null && jobResultPoller.IsPolling) {
414        jobResultPoller.Stop();
415        DeregisterResultPollingEvents();
416      }
417      if (HiveJob != null) {
418        jobResultPoller = new JobResultPoller(HiveJob, ApplicationConstants.RESULT_POLLING_INTERVAL);
419        RegisterResultPollingEvents();
420      }
421      EventHandler handler = HiveJobChanged;
422      if (handler != null) handler(this, EventArgs.Empty);
423    }
424
425    public event EventHandler IsProgressingChanged;
426    private void OnIsProgressingChanged() {
427      var handler = IsProgressingChanged;
428      if (handler != null) handler(this, EventArgs.Empty);
429    }
430    #endregion
431
432    #region JobResultPoller Events
433    private void RegisterResultPollingEvents() {
434      jobResultPoller.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
435      jobResultPoller.JobResultsReceived += new EventHandler<EventArgs<JobResultList>>(jobResultPoller_JobResultReceived);
436      jobResultPoller.PollingStarted += new EventHandler(jobResultPoller_PollingStarted);
437      jobResultPoller.PollingFinished += new EventHandler(jobResultPoller_PollingFinished);
438      jobResultPoller.IsPollingChanged += new EventHandler(jobResultPoller_IsPollingChanged);
439    }
440    private void DeregisterResultPollingEvents() {
441      jobResultPoller.ExceptionOccured -= new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
442      jobResultPoller.JobResultsReceived -= new EventHandler<EventArgs<JobResultList>>(jobResultPoller_JobResultReceived);
443      jobResultPoller.PollingStarted -= new EventHandler(jobResultPoller_PollingStarted);
444      jobResultPoller.PollingFinished -= new EventHandler(jobResultPoller_PollingFinished);
445      jobResultPoller.IsPollingChanged -= new EventHandler(jobResultPoller_IsPollingChanged);
446    }
447    void jobResultPoller_IsPollingChanged(object sender, EventArgs e) {
448      this.IsPollingResults = jobResultPoller.IsPolling;
449    }
450    void jobResultPoller_PollingFinished(object sender, EventArgs e) {
451      LogMessage("Polling results finished");
452    }
453    void jobResultPoller_PollingStarted(object sender, EventArgs e) {
454      LogMessage("Polling results started");
455    }
456    void jobResultPoller_JobResultReceived(object sender, EventArgs<JobResultList> e) {
457      foreach (JobResult jobResult in e.Value) {
458        HiveJob hj = hiveJob.GetHiveJobByJobId(jobResult.Id);
459        if (hj != null) {
460          hj.UpdateFromJobResult(jobResult);
461          if ((hj.JobDto.State == JobState.Aborted ||
462               hj.JobDto.State == JobState.Failed ||
463               hj.JobDto.State == JobState.Finished) &&
464              !hj.IsFinishedOptimizerDownloaded) {
465            LogMessage(hj.JobDto.Id, "Downloading optimizer for job");
466            OptimizerJob optimizerJob = LoadOptimizerJob(hj.JobDto.Id);
467            if (optimizerJob == null) {
468              // something bad happened to this job. set to finished to allow the rest beeing downloaded
469              hj.IsFinishedOptimizerDownloaded = true;
470            } else {
471              if (jobResult.ParentJobId.HasValue) {
472                HiveJob parentHiveJob = HiveJob.GetHiveJobByJobId(jobResult.ParentJobId.Value);
473                parentHiveJob.UpdateChildOptimizer(optimizerJob, hj.JobDto.Id);
474              } else {
475                this.HiveJob.IsFinishedOptimizerDownloaded = true;
476              }
477            }
478          }
479        }
480      }
481      GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory)
482      if (AllJobsFinished()) {
483        this.ExecutionState = Core.ExecutionState.Stopped;
484        StopResultPolling();
485        OnStopped();
486      }
487    }
488
489    private bool AllJobsFinished() {
490      return HiveJob.GetAllHiveJobs().All(hj => hj.IsFinishedOptimizerDownloaded);
491    }
492
493    void jobResultPoller_ExceptionOccured(object sender, EventArgs<Exception> e) {
494      OnExceptionOccured(e.Value);
495    }
496    #endregion
497
498    #region Execution Time Timer
499    private void InitTimer() {
500      timer = new System.Timers.Timer(100);
501      timer.AutoReset = true;
502      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
503    }
504
505    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
506      DateTime now = DateTime.Now;
507      ExecutionTime += now - lastUpdateTime;
508      lastUpdateTime = now;
509    }
510    #endregion
511
512    #region Logging
513    private void LogMessage(string message) {
514      // HeuristicLab.Log is not Thread-Safe, so lock on every call
515      lock (locker) {
516        log.LogMessage(message);
517        Logger.Debug(message);
518      }
519    }
520
521    private void LogMessage(Guid jobId, string message) {
522      LogMessage(message + " (jobId: " + jobId + ")");
523    }
524
525    #endregion
526
527    /// <summary>
528    /// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem
529    /// </summary>
530    public void LoadHiveJob() {
531      progress = new Progress();
532      try {
533        IsProgressing = true;
534        int totalJobCount = 0;
535        int jobCount = 0;
536        progress.Status = "Connecting to Server...";
537        using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
538          // fetch all JobDto objects to create the full tree of tree of HiveJob objects
539          progress.Status = "Downloading list of jobs...";
540          JobResultList allResults = service.Obj.GetChildJobResults(rootJobId.Value, true, true).Obj;
541          totalJobCount = allResults.Count;
542
543          // download them first
544          IDictionary<Guid, SerializedJob> allSerializedJobs = new Dictionary<Guid, SerializedJob>();
545          foreach (JobResult jobResult in allResults) {
546            jobCount++;
547            progress.Status = string.Format("Downloading {0} of {1} jobs...", jobCount, totalJobCount);
548            allSerializedJobs.Add(jobResult.Id, service.Obj.GetLastSerializedResult(jobResult.Id).Obj);
549            progress.ProgressValue = (double)jobCount / totalJobCount;
550          }
551
552          jobCount = 1;
553          progress.Status = string.Format("Deserializing {0} of {1} jobs... ({2} kb)", jobCount, totalJobCount, allSerializedJobs[this.rootJobId.Value].SerializedJobData.Count() / 1024);
554          this.HiveJob = new HiveJob(allSerializedJobs[this.rootJobId.Value], false);
555          allSerializedJobs.Remove(this.rootJobId.Value); // reduce memory footprint
556          progress.ProgressValue = (double)jobCount / totalJobCount;
557
558          if (this.HiveJob.JobDto.DateFinished.HasValue) {
559            this.ExecutionTime = this.HiveJob.JobDto.DateFinished.Value - this.HiveJob.JobDto.DateCreated.Value;
560            this.lastUpdateTime = this.HiveJob.JobDto.DateFinished.Value;
561            this.ExecutionState = Core.ExecutionState.Stopped;
562            OnStopped();
563          } else {
564            this.ExecutionTime = DateTime.Now - this.HiveJob.JobDto.DateCreated.Value;
565            this.lastUpdateTime = DateTime.Now;
566            this.ExecutionState = Core.ExecutionState.Started;
567            OnStarted();
568          }
569
570          // build child-job tree
571          LoadChildResults(service.Obj, this.HiveJob, allResults, allSerializedJobs, progress, totalJobCount, ref jobCount);
572          StartResultPolling();
573        }
574      }
575      catch (Exception e) {
576        OnExceptionOccured(e);
577      }
578      finally {
579        IsProgressing = false;
580      }
581    }
582
583    private void LoadChildResults(IClientFacade service, HiveJob parentHiveJob, JobResultList allResults, IDictionary<Guid, SerializedJob> allSerializedJobs, IProgress progress, int totalJobCount, ref int jobCount) {
584      IEnumerable<JobResult> childResults = from result in allResults
585                                            where result.ParentJobId.HasValue && result.ParentJobId.Value == parentHiveJob.JobDto.Id
586                                            orderby result.DateCreated ascending
587                                            select result;
588      foreach (JobResult jobResult in childResults) {
589        jobCount++;
590        progress.Status = string.Format("Deserializing {0} of {1} jobs ({2} kb)...", jobCount, totalJobCount, allSerializedJobs[jobResult.Id].SerializedJobData.Count() / 1024);
591        OptimizerJob optimizerJob = null;
592        try {
593          optimizerJob = SerializedJob.Deserialize<OptimizerJob>(allSerializedJobs[jobResult.Id].SerializedJobData);
594        }
595        catch {
596          optimizerJob = null;
597        }
598        progress.ProgressValue = (double)jobCount / totalJobCount;
599        HiveJob childHiveJob = new HiveJob(optimizerJob, false);
600        parentHiveJob.AddChildHiveJob(childHiveJob);
601        childHiveJob.JobDto = allSerializedJobs[jobResult.Id].JobInfo;
602        allSerializedJobs.Remove(jobResult.Id); // reduce memory footprint
603        if (jobCount % 10 == 0) GC.Collect(); // this is needed or otherwise HL takes over the system when the number of jobs is high
604        LoadChildResults(service, childHiveJob, allResults, allSerializedJobs, progress, totalJobCount, ref jobCount);
605      }
606    }
607
608    private OptimizerJob LoadOptimizerJob(Guid jobId) {
609      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
610        ResponseObject<SerializedJob> serializedJob = service.Obj.GetLastSerializedResult(jobId);
611        try {
612          return SerializedJob.Deserialize<OptimizerJob>(serializedJob.Obj.SerializedJobData);
613        }
614        catch {
615          return null;
616        }
617      }
618    }
619  }
620}
Note: See TracBrowser for help on using the repository browser.