Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4750

Last change on this file since 4750 was 4750, checked in by cneumuel, 13 years ago

#1159 some minor cleanups

File size: 22.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts.Interfaces;
31using HeuristicLab.Hive.Contracts.ResponseObjects;
32using HeuristicLab.Hive.Experiment.Jobs;
33using HeuristicLab.Hive.Tracing;
34
35namespace HeuristicLab.Hive.Experiment {
36  /// <summary>
37  /// An experiment which contains multiple batch runs of algorithms.
38  /// </summary>
39  [Item(itemName, itemDescription)]
40  public class HiveExperiment : NamedItem, IExecutable, IProgressReporter {
41    private object locker = new object();
42    private const string itemName = "Hive Experiment";
43    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
44    private System.Timers.Timer timer;
45
46    private Guid hiveExperimentId;
47    public Guid HiveExperimentId {
48      get { return hiveExperimentId; }
49      set { hiveExperimentId = value; }
50    }
51
52    private HiveJob hiveJob;
53    public HiveJob HiveJob {
54      get { return hiveJob; }
55      set {
56        DeregisterHiveJobEvents();
57        if (hiveJob != value) {
58          hiveJob = value;
59          RegisterHiveJobEvents();
60          OnHiveJobChanged();
61        }
62      }
63    }
64
65    private ILog log;
66    public ILog Log {
67      get { return log; }
68    }
69
70    private DateTime lastUpdateTime;
71
72    private string resourceIds;
73    public string ResourceIds {
74      get { return resourceIds; }
75      set {
76        if (resourceIds != value) {
77          resourceIds = value;
78          OnResourceIdsChanged();
79        }
80      }
81    }
82
83    private Guid? rootJobId;
84
85    private bool isPollingResults;
86    public bool IsPollingResults {
87      get { return isPollingResults; }
88      private set {
89        if (isPollingResults != value) {
90          isPollingResults = value;
91          OnIsPollingResultsChanged();
92        }
93      }
94    }
95
96    private bool isProgressing;
97    public bool IsProgressing {
98      get { return isProgressing; }
99      set {
100        if (isProgressing != value) {
101          isProgressing = value;
102          OnIsProgressingChanged();
103        }
104      }
105    }
106
107    private IProgress progress;
108    public IProgress Progress {
109      get { return progress; }
110    }
111   
112    private JobResultPoller jobResultPoller;
113
114    public HiveExperiment()
115      : base(itemName, itemDescription) {
116      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
117      this.log = new Log();
118      InitTimer();
119    }
120
121    public HiveExperiment(HiveExperimentDto hiveExperimentDto)
122      : this() {
123      UpdateFromDto(hiveExperimentDto);
124    }
125
126    public void UpdateFromDto(HiveExperimentDto hiveExperimentDto) {
127      this.HiveExperimentId = hiveExperimentDto.Id;
128      this.Name = hiveExperimentDto.Name;
129      this.Description = hiveExperimentDto.Description;
130      this.ResourceIds = hiveExperimentDto.ResourceIds;
131      this.rootJobId = hiveExperimentDto.RootJobId;
132    }
133
134    public HiveExperimentDto ToHiveExperimentDto() {
135      return new HiveExperimentDto() {
136        Id = this.HiveExperimentId,
137        Name = this.Name,
138        Description = this.Description,
139        ResourceIds = this.ResourceIds,
140        RootJobId = this.rootJobId
141      };
142    }
143
144    public override IDeepCloneable Clone(Cloner cloner) {
145      LogMessage("I am beeing cloned");
146      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
147      clone.resourceIds = this.resourceIds;
148      clone.executionState = this.executionState;
149      clone.executionTime = this.executionTime;
150      clone.log = (ILog)cloner.Clone(log);
151      clone.lastUpdateTime = this.lastUpdateTime;
152      clone.rootJobId = this.rootJobId;
153      return clone;
154    }
155
156    public void SetExperiment(Optimization.Experiment experiment) {
157      this.HiveJob = new HiveJob(experiment);
158      Prepare();
159    }
160
161    private void RegisterHiveJobEvents() {
162      if (HiveJob != null) {
163        HiveJob.JobStateChanged += new EventHandler(HiveJob_JobStateChanged);
164      }
165    }
166
167    private void DeregisterHiveJobEvents() {
168      if (HiveJob != null) {
169        HiveJob.JobStateChanged -= new EventHandler(HiveJob_JobStateChanged);
170      }
171    }
172
173    /// <summary>
174    /// Returns the experiment from the root HiveJob
175    /// </summary>
176    public Optimization.Experiment GetExperiment() {
177      if (this.HiveJob != null) {
178        return HiveJob.Job.OptimizerAsExperiment;
179      }
180      return null;
181    }
182
183    #region IExecutable Members
184    private Core.ExecutionState executionState;
185    public ExecutionState ExecutionState {
186      get { return executionState; }
187      private set {
188        if (executionState != value) {
189          executionState = value;
190          OnExecutionStateChanged();
191        }
192      }
193    }
194
195    private TimeSpan executionTime;
196    public TimeSpan ExecutionTime {
197      get { return executionTime; }
198      private set {
199        if (executionTime != value) {
200          executionTime = value;
201          OnExecutionTimeChanged();
202        }
203      }
204    }
205
206    public void Pause() {
207      throw new NotSupportedException();
208    }
209
210    public void Prepare() {
211      // do nothing
212    }
213
214    public void Start() {
215      OnStarted();
216      ExecutionTime = new TimeSpan();
217      lastUpdateTime = DateTime.Now;
218      this.ExecutionState = Core.ExecutionState.Started;
219
220      Thread t = new Thread(RunUploadExperiment);
221      t.Name = "RunUploadExperimentThread";
222      t.Start();
223    }
224
225    private void RunUploadExperiment() {
226      try {
227        this.progress = new Progress("Connecting to server...");
228        IsProgressing = true;
229        using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
230          IEnumerable<string> groups = ToResourceIdList(this.ResourceIds);
231          this.HiveJob.SetIndexInParentOptimizerList(null);
232
233          int totalJobCount = this.HiveJob.GetAllHiveJobs().Count();
234          int jobCount = 0;
235
236          this.progress.Status = "Uploading jobs...";
237          UploadJobWithChildren(service.Obj, this.HiveJob, null, groups, ref jobCount, totalJobCount);
238          this.rootJobId = this.HiveJob.JobDto.Id;
239          LogMessage("Finished sending jobs to hive");
240
241          // insert or update HiveExperiment
242          this.progress.Status = "Uploading HiveExperiment...";
243          ResponseObject<HiveExperimentDto> resp = service.Obj.UpdateHiveExperiment(this.ToHiveExperimentDto());
244          this.UpdateFromDto(resp.Obj);
245
246          StartResultPolling();
247        }
248      }
249      catch (Exception e) {
250        OnExceptionOccured(e);
251      }
252      finally {
253        IsProgressing = false;
254      }
255    }
256
257    /// <summary>
258    /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs
259    ///
260    /// </summary>
261    /// <param name="service"></param>
262    /// <param name="hiveJob"></param>
263    /// <param name="parentHiveJob">shall be null if its the root job</param>
264    /// <param name="groups"></param>
265    private void UploadJobWithChildren(IClientFacade service, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<string> groups, ref int jobCount, int totalJobCount) {
266      jobCount++;
267      this.progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount);
268      SerializedJob serializedJob;
269      if (hiveJob.Job.ComputeInParallel &&
270        (hiveJob.Job.Optimizer is Optimization.Experiment || hiveJob.Job.Optimizer is Optimization.BatchRun)) {
271        hiveJob.JobDto.State = JobState.WaitForChildJobs;
272        hiveJob.Job.CollectChildJobs = false; // don't collect child-jobs on slaves
273        serializedJob = hiveJob.GetAsSerializedJob(true);
274      } else {
275        serializedJob = hiveJob.GetAsSerializedJob(false);
276      }
277
278      this.progress.Status = string.Format("Uploading job {0} of {1} ({2} kb)", jobCount, totalJobCount, serializedJob.SerializedJobData.Count() / 1024);
279      this.progress.ProgressValue = (double)jobCount / totalJobCount;
280      ResponseObject<JobDto> response;
281      if (parentHiveJob != null) {
282        response = service.AddChildJob(parentHiveJob.JobDto.Id, serializedJob);
283      } else {
284        response = service.AddJobWithGroupStrings(serializedJob, groups);
285      }
286
287      if (response.StatusMessage == ResponseStatus.Ok) {
288        LogMessage(response.Obj.Id, "Job sent to Hive");
289        hiveJob.JobDto = response.Obj;
290
291        foreach (HiveJob child in hiveJob.ChildHiveJobs) {
292          UploadJobWithChildren(service, child, hiveJob, groups, ref jobCount, totalJobCount);
293        }
294      } else {
295        throw new AddJobToHiveException(response.StatusMessage.ToString());
296      }
297    }
298
299    /// <summary>
300    /// Converts a string which can contain Ids separated by ';' to a enumerable
301    /// </summary>
302    private IEnumerable<string> ToResourceIdList(string resourceGroups) {
303      if (!string.IsNullOrEmpty(resourceGroups)) {
304        return resourceIds.Split(';');
305      } else {
306        return new List<string>();
307      }
308    }
309
310    public void Stop() {
311      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
312        foreach(HiveJob hj in HiveJob.GetAllHiveJobs()) {
313          service.Obj.AbortJob(hj.JobDto.Id);
314        }
315      }
316    }
317
318    #endregion
319
320    public void StartResultPolling() {
321      if (!jobResultPoller.IsPolling) {
322        jobResultPoller.Start();
323      } else {
324        throw new JobResultPollingException("Result polling already running");
325      }
326    }
327
328    public void StopResultPolling() {
329      if (jobResultPoller.IsPolling) {
330        jobResultPoller.Stop();
331      } else {
332        throw new JobResultPollingException("Result polling not running");
333      }
334    }
335
336    #region HiveJob Events
337    void HiveJob_JobStateChanged(object sender, EventArgs e) {
338      if (HiveJob != null) {
339        rootJobId = HiveJob.JobDto.Id;
340      }
341    }
342    #endregion
343
344    #region Eventhandler
345
346    public event EventHandler ExecutionTimeChanged;
347    private void OnExecutionTimeChanged() {
348      EventHandler handler = ExecutionTimeChanged;
349      if (handler != null) handler(this, EventArgs.Empty);
350    }
351
352    public event EventHandler ExecutionStateChanged;
353    private void OnExecutionStateChanged() {
354      LogMessage("ExecutionState changed to " + executionState.ToString());
355      EventHandler handler = ExecutionStateChanged;
356      if (handler != null) handler(this, EventArgs.Empty);
357    }
358
359    public event EventHandler Started;
360    private void OnStarted() {
361      LogMessage("Started");
362      timer.Start();
363      EventHandler handler = Started;
364      if (handler != null) handler(this, EventArgs.Empty);
365    }
366
367    public event EventHandler Stopped;
368    private void OnStopped() {
369      LogMessage("Stopped");
370      timer.Stop();
371      EventHandler handler = Stopped;
372      if (handler != null) handler(this, EventArgs.Empty);
373    }
374
375    public event EventHandler Paused;
376    private void OnPaused() {
377      LogMessage("Paused");
378      EventHandler handler = Paused;
379      if (handler != null) handler(this, EventArgs.Empty);
380    }
381
382    public event EventHandler Prepared;
383    protected virtual void OnPrepared() {
384      LogMessage("Prepared");
385      EventHandler handler = Prepared;
386      if (handler != null) handler(this, EventArgs.Empty);
387    }
388
389    public event EventHandler ResourceIdsChanged;
390    protected virtual void OnResourceIdsChanged() {
391      EventHandler handler = ResourceIdsChanged;
392      if (handler != null) handler(this, EventArgs.Empty);
393    }
394
395    public event EventHandler IsResultsPollingChanged;
396    private void OnIsPollingResultsChanged() {
397      if (this.IsPollingResults) {
398        LogMessage("Results Polling Started");
399      } else {
400        LogMessage("Results Polling Stopped");
401      }
402      EventHandler handler = IsResultsPollingChanged;
403      if (handler != null) handler(this, EventArgs.Empty);
404    }
405
406    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
407    private void OnExceptionOccured(Exception e) {
408      var handler = ExceptionOccurred;
409      if (handler != null) handler(this, new EventArgs<Exception>(e));
410    }
411
412    public event EventHandler HiveJobChanged;
413    private void OnHiveJobChanged() {
414      if (jobResultPoller != null && jobResultPoller.IsPolling) {
415        jobResultPoller.Stop();
416        DeregisterResultPollingEvents();
417      }
418      if (HiveJob != null) {
419        jobResultPoller = new JobResultPoller(HiveJob, ApplicationConstants.RESULT_POLLING_INTERVAL);
420        RegisterResultPollingEvents();
421      }
422      EventHandler handler = HiveJobChanged;
423      if (handler != null) handler(this, EventArgs.Empty);
424    }
425
426    public event EventHandler IsProgressingChanged;
427    private void OnIsProgressingChanged() {
428      var handler = IsProgressingChanged;
429      if (handler != null) handler(this, EventArgs.Empty);
430    }
431    #endregion
432
433    #region JobResultPoller Events
434    private void RegisterResultPollingEvents() {
435      jobResultPoller.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
436      jobResultPoller.JobResultsReceived += new EventHandler<EventArgs<JobResultList>>(jobResultPoller_JobResultReceived);
437      jobResultPoller.PollingStarted += new EventHandler(jobResultPoller_PollingStarted);
438      jobResultPoller.PollingFinished += new EventHandler(jobResultPoller_PollingFinished);
439      jobResultPoller.IsPollingChanged += new EventHandler(jobResultPoller_IsPollingChanged);
440    }
441    private void DeregisterResultPollingEvents() {
442      jobResultPoller.ExceptionOccured -= new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
443      jobResultPoller.JobResultsReceived -= new EventHandler<EventArgs<JobResultList>>(jobResultPoller_JobResultReceived);
444      jobResultPoller.PollingStarted -= new EventHandler(jobResultPoller_PollingStarted);
445      jobResultPoller.PollingFinished -= new EventHandler(jobResultPoller_PollingFinished);
446      jobResultPoller.IsPollingChanged -= new EventHandler(jobResultPoller_IsPollingChanged);
447    }
448    void jobResultPoller_IsPollingChanged(object sender, EventArgs e) {
449      this.IsPollingResults = jobResultPoller.IsPolling;
450    }
451    void jobResultPoller_PollingFinished(object sender, EventArgs e) {
452      LogMessage("Polling results finished");
453    }
454    void jobResultPoller_PollingStarted(object sender, EventArgs e) {
455      LogMessage("Polling results started");
456    }
457    void jobResultPoller_JobResultReceived(object sender, EventArgs<JobResultList> e) {
458      foreach (JobResult jobResult in e.Value) {
459        HiveJob hj = hiveJob.GetHiveJobByJobId(jobResult.Id);
460        if (hj != null) {
461          hj.UpdateFromJobResult(jobResult);
462          if ((hj.JobDto.State == JobState.Aborted ||
463               hj.JobDto.State == JobState.Failed ||
464               hj.JobDto.State == JobState.Finished) &&
465              !hj.IsFinishedOptimizerDownloaded) {
466            LogMessage(hj.JobDto.Id, "Downloading optimizer for job");
467            OptimizerJob optimizerJob = LoadOptimizerJob(hj.JobDto.Id);
468            if (jobResult.ParentJobId.HasValue) {
469              HiveJob parentHiveJob = HiveJob.GetHiveJobByJobId(jobResult.ParentJobId.Value);
470              parentHiveJob.UpdateChildOptimizer(optimizerJob, hj.JobDto.Id);
471            } else {
472              this.HiveJob.IsFinishedOptimizerDownloaded = true;
473            }
474          }
475        }
476      }
477      GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory)
478      if (AllJobsFinished()) {
479        this.ExecutionState = Core.ExecutionState.Stopped;
480        StopResultPolling();
481        OnStopped();
482      }
483    }
484
485    private bool AllJobsFinished() {
486      return HiveJob.GetAllHiveJobs().All(hj => hj.IsFinishedOptimizerDownloaded);
487    }
488
489    void jobResultPoller_ExceptionOccured(object sender, EventArgs<Exception> e) {
490      OnExceptionOccured(e.Value);
491    }
492    #endregion
493
494    #region Execution Time Timer
495    private void InitTimer() {
496      timer = new System.Timers.Timer(100);
497      timer.AutoReset = true;
498      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
499    }
500
501    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
502      DateTime now = DateTime.Now;
503      ExecutionTime += now - lastUpdateTime;
504      lastUpdateTime = now;
505    }
506    #endregion
507
508    #region Logging
509    private void LogMessage(string message) {
510      // HeuristicLab.Log is not Thread-Safe, so lock on every call
511      lock (locker) {
512        log.LogMessage(message);
513        Logger.Debug(message);
514      }
515    }
516
517    private void LogMessage(Guid jobId, string message) {
518      LogMessage(message + " (jobId: " + jobId + ")");
519    }
520
521    #endregion
522
523    /// <summary>
524    /// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem
525    /// </summary>
526    public void LoadHiveJob() {
527      progress = new Progress();
528      try {
529        IsProgressing = true;
530        int totalJobCount = 0;
531        int jobCount = 0;
532        progress.Status = "Connecting to Server...";
533        using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
534          // fetch all JobDto objects to create the full tree of tree of HiveJob objects
535          progress.Status = "Downloading list of jobs...";
536          JobResultList allResults = service.Obj.GetChildJobResults(rootJobId.Value, true, true).Obj;
537          totalJobCount = allResults.Count;
538
539          // download them first
540          IDictionary<Guid, SerializedJob> allSerializedJobs = new Dictionary<Guid, SerializedJob>();
541          foreach (JobResult jobResult in allResults) {
542            jobCount++;
543            progress.Status = string.Format("Downloading {0} of {1} jobs...", jobCount, totalJobCount);
544            allSerializedJobs.Add(jobResult.Id, service.Obj.GetLastSerializedResult(jobResult.Id).Obj);
545            progress.ProgressValue = (double)jobCount / totalJobCount;
546          }
547
548          jobCount = 1;
549          progress.Status = string.Format("Deserializing {0} of {1} jobs... ({2} kb)", jobCount, totalJobCount, allSerializedJobs[this.rootJobId.Value].SerializedJobData.Count() / 1024);
550          this.HiveJob = new HiveJob(allSerializedJobs[this.rootJobId.Value], false);
551          allSerializedJobs.Remove(this.rootJobId.Value); // reduce memory footprint
552          progress.ProgressValue = (double)jobCount / totalJobCount;
553
554          if (this.HiveJob.JobDto.DateFinished.HasValue) {
555            this.ExecutionTime = this.HiveJob.JobDto.DateFinished.Value - this.HiveJob.JobDto.DateCreated.Value;
556            this.lastUpdateTime = this.HiveJob.JobDto.DateFinished.Value;
557            this.ExecutionState = Core.ExecutionState.Stopped;
558            OnStopped();
559          } else {
560            this.ExecutionTime = DateTime.Now - this.HiveJob.JobDto.DateCreated.Value;
561            this.lastUpdateTime = DateTime.Now;
562            this.ExecutionState = Core.ExecutionState.Started;
563            OnStarted();
564          }
565
566          // build child-job tree
567          LoadChildResults(service.Obj, this.HiveJob, allResults, allSerializedJobs, progress, totalJobCount, ref jobCount);
568          StartResultPolling();
569        }
570      }
571      catch (Exception e) {
572        OnExceptionOccured(e);
573      }
574      finally {
575        IsProgressing = false;
576      }
577    }
578
579    private void LoadChildResults(IClientFacade service, HiveJob parentHiveJob, JobResultList allResults, IDictionary<Guid, SerializedJob> allSerializedJobs, IProgress progress, int totalJobCount, ref int jobCount) {
580      IEnumerable<JobResult> childResults = from result in allResults
581                                            where result.ParentJobId.HasValue && result.ParentJobId.Value == parentHiveJob.JobDto.Id
582                                            orderby result.DateCreated ascending
583                                            select result;
584      foreach (JobResult jobResult in childResults) {
585        jobCount++;
586        progress.Status = string.Format("Deserializing {0} of {1} jobs ({2} kb)...", jobCount, totalJobCount, allSerializedJobs[jobResult.Id].SerializedJobData.Count() / 1024);
587        OptimizerJob optimizerJob = SerializedJob.Deserialize<OptimizerJob>(allSerializedJobs[jobResult.Id].SerializedJobData);
588        progress.ProgressValue = (double)jobCount / totalJobCount;
589        HiveJob childHiveJob = new HiveJob(optimizerJob, false);
590        parentHiveJob.AddChildHiveJob(childHiveJob);
591        childHiveJob.JobDto = allSerializedJobs[jobResult.Id].JobInfo;
592        allSerializedJobs.Remove(jobResult.Id); // reduce memory footprint
593        if(jobCount % 10 == 0) GC.Collect(); // this is needed or otherwise HL takes over the system when the number of jobs is high
594        LoadChildResults(service, childHiveJob, allResults, allSerializedJobs, progress, totalJobCount, ref jobCount);
595      }
596    }
597
598    private OptimizerJob LoadOptimizerJob(Guid jobId) {
599      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
600        ResponseObject<SerializedJob> serializedJob = service.Obj.GetLastSerializedResult(jobId);
601        return SerializedJob.Deserialize<OptimizerJob>(serializedJob.Obj.SerializedJobData);
602      }
603    }
604  }
605}
Note: See TracBrowser for help on using the repository browser.