Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.ExperimentManager/3.3/HiveExperiment.cs @ 5179

Last change on this file since 5179 was 5179, checked in by cneumuel, 14 years ago

#1260

  • migrated to .NET 4.0
  • moved state-information about heartbeat timestamps into DB to reduce IIS-recycling issues
  • optimized memory usage of ExperimentManager when lots of large jobs are downloaded
File size: 21.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts.Interfaces;
31using HeuristicLab.Hive.Contracts.ResponseObjects;
32using HeuristicLab.Hive.ExperimentManager.Jobs;
33using HeuristicLab.Hive.Tracing;
34using HeuristicLab.Clients.Common;
35
36namespace HeuristicLab.Hive.ExperimentManager {
37  /// <summary>
38  /// An experiment which contains multiple batch runs of algorithms.
39  /// </summary>
40  [Item(itemName, itemDescription)]
41  public class HiveExperiment : NamedItem, IExecutable, IProgressReporter {
42    private object locker = new object();
43    private const string itemName = "Hive Experiment";
44    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
45    private System.Timers.Timer timer;
46
47    private JobResultPoller jobResultPoller;
48    private Guid? rootJobId;
49    private DateTime lastUpdateTime;
50
51    #region Properties
52    private Guid hiveExperimentId;
53    public Guid HiveExperimentId {
54      get { return hiveExperimentId; }
55      set { hiveExperimentId = value; }
56    }
57
58    private HiveJob hiveJob;
59    public HiveJob HiveJob {
60      get { return hiveJob; }
61      set {
62        DeregisterHiveJobEvents();
63        if (hiveJob != value) {
64          hiveJob = value;
65          RegisterHiveJobEvents();
66          OnHiveJobChanged();
67        }
68      }
69    }
70
71    private ILog log;
72    public ILog Log {
73      get { return log; }
74    }
75
76    private string resourceIds;
77    public string ResourceIds {
78      get { return resourceIds; }
79      set {
80        if (resourceIds != value) {
81          resourceIds = value;
82          OnResourceIdsChanged();
83        }
84      }
85    }
86
87    private bool isPollingResults;
88    public bool IsPollingResults {
89      get { return isPollingResults; }
90      private set {
91        if (isPollingResults != value) {
92          isPollingResults = value;
93          OnIsPollingResultsChanged();
94        }
95      }
96    }
97
98    private bool isProgressing;
99    public bool IsProgressing {
100      get { return isProgressing; }
101      set {
102        if (isProgressing != value) {
103          isProgressing = value;
104          OnIsProgressingChanged();
105        }
106      }
107    }
108
109    private IProgress progress;
110    public IProgress Progress {
111      get { return progress; }
112    }
113    #endregion
114
115    public HiveExperiment()
116      : base(itemName, itemDescription) {
117      this.ResourceIds = "HEAL";
118      this.log = new Log();
119      InitTimer();
120    }
121    public HiveExperiment(HiveExperimentDto hiveExperimentDto)
122      : this() {
123      UpdateFromDto(hiveExperimentDto);
124    }
125    protected HiveExperiment(HiveExperiment original, Cloner cloner)
126      : base(original, cloner) {
127      this.ResourceIds = original.resourceIds;
128      this.ExecutionState = original.executionState;
129      this.ExecutionTime = original.executionTime;
130      this.log = cloner.Clone(log);
131      this.lastUpdateTime = original.lastUpdateTime;
132      this.rootJobId = original.rootJobId;
133    }
134    public override IDeepCloneable Clone(Cloner cloner) {
135      return new HiveExperiment(this, cloner);
136    }
137
138    public void UpdateFromDto(HiveExperimentDto hiveExperimentDto) {
139      this.HiveExperimentId = hiveExperimentDto.Id;
140      this.Name = hiveExperimentDto.Name;
141      this.Description = hiveExperimentDto.Description;
142      this.ResourceIds = hiveExperimentDto.ResourceIds;
143      this.rootJobId = hiveExperimentDto.RootJobId;
144    }
145
146    public HiveExperimentDto ToHiveExperimentDto() {
147      return new HiveExperimentDto() {
148        Id = this.HiveExperimentId,
149        Name = this.Name,
150        Description = this.Description,
151        ResourceIds = this.ResourceIds,
152        RootJobId = this.rootJobId
153      };
154    }
155
156    public void SetExperiment(Optimization.Experiment experiment) {
157      this.HiveJob = new HiveJob(experiment);
158      // do not prepare job, because it should be possible to resume paused jobs on hive
159    }
160
161    private void RegisterHiveJobEvents() {
162      if (HiveJob != null) {
163        HiveJob.JobStateChanged += new EventHandler(HiveJob_JobStateChanged);
164      }
165    }
166
167    private void DeregisterHiveJobEvents() {
168      if (HiveJob != null) {
169        HiveJob.JobStateChanged -= new EventHandler(HiveJob_JobStateChanged);
170      }
171    }
172
173    /// <summary>
174    /// Returns the experiment from the root HiveJob
175    /// </summary>
176    public Optimization.Experiment GetExperiment() {
177      if (this.HiveJob != null) {
178        return HiveJob.Job.OptimizerAsExperiment;
179      }
180      return null;
181    }
182
183    #region IExecutable Members
184    private Core.ExecutionState executionState;
185    public ExecutionState ExecutionState {
186      get { return executionState; }
187      private set {
188        if (executionState != value) {
189          executionState = value;
190          OnExecutionStateChanged();
191        }
192      }
193    }
194
195    private TimeSpan executionTime;
196    public TimeSpan ExecutionTime {
197      get { return executionTime; }
198      private set {
199        if (executionTime != value) {
200          executionTime = value;
201          OnExecutionTimeChanged();
202        }
203      }
204    }
205
206    public void Pause() {
207      throw new NotSupportedException();
208    }
209
210    public void Prepare() {
211      // do nothing
212    }
213
214    public void Start() {
215      OnStarted();
216      ExecutionTime = new TimeSpan();
217      lastUpdateTime = DateTime.Now;
218      this.ExecutionState = Core.ExecutionState.Started;
219
220      Thread t = new Thread(RunUploadExperiment);
221      t.Name = "RunUploadExperimentThread";
222      t.Start();
223    }
224
225    private void RunUploadExperiment() {
226      try {
227        this.progress = new Progress("Connecting to server...");
228        IsProgressing = true;
229        using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
230          IEnumerable<string> groups = ToResourceIdList(this.ResourceIds);
231          this.HiveJob.SetIndexInParentOptimizerList(null);
232
233          int totalJobCount = this.HiveJob.GetAllHiveJobs().Count();
234          int jobCount = 0;
235
236          this.progress.Status = "Uploading jobs...";
237          UploadJobWithChildren(service.Obj, this.HiveJob, null, groups, ref jobCount, totalJobCount);
238          this.rootJobId = this.HiveJob.JobDto.Id;
239          LogMessage("Finished sending jobs to hive");
240
241          // insert or update HiveExperiment
242          this.progress.Status = "Uploading HiveExperiment...";
243          ResponseObject<HiveExperimentDto> resp = service.Obj.UpdateHiveExperiment(this.ToHiveExperimentDto());
244          this.UpdateFromDto(resp.Obj);
245
246          StartResultPolling();
247        }
248      }
249      catch (Exception e) {
250        OnExceptionOccured(e);
251      }
252      finally {
253        IsProgressing = false;
254      }
255    }
256
257    /// <summary>
258    /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs
259    ///
260    /// </summary>
261    /// <param name="service"></param>
262    /// <param name="hiveJob"></param>
263    /// <param name="parentHiveJob">shall be null if its the root job</param>
264    /// <param name="groups"></param>
265    private void UploadJobWithChildren(IClientFacade service, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<string> groups, ref int jobCount, int totalJobCount) {
266      jobCount++;
267      this.progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount);
268      SerializedJob serializedJob;
269      if (hiveJob.Job.ComputeInParallel &&
270        (hiveJob.Job.Optimizer is Optimization.Experiment || hiveJob.Job.Optimizer is Optimization.BatchRun)) {
271        hiveJob.JobDto.State = JobState.WaitForChildJobs;
272        hiveJob.Job.CollectChildJobs = false; // don't collect child-jobs on slaves
273        serializedJob = hiveJob.GetAsSerializedJob(true);
274      } else {
275        serializedJob = hiveJob.GetAsSerializedJob(false);
276      }
277
278      this.progress.Status = string.Format("Uploading job {0} of {1} ({2} kb)", jobCount, totalJobCount, serializedJob.SerializedJobData.Count() / 1024);
279      this.progress.ProgressValue = (double)jobCount / totalJobCount;
280      ResponseObject<JobDto> response;
281      if (parentHiveJob != null) {
282        response = service.AddChildJob(parentHiveJob.JobDto.Id, serializedJob);
283      } else {
284        response = service.AddJobWithGroupStrings(serializedJob, groups);
285      }
286
287      if (response.StatusMessage == ResponseStatus.Ok) {
288        LogMessage(response.Obj.Id, "Job sent to Hive");
289        hiveJob.JobDto = response.Obj;
290
291        foreach (HiveJob child in hiveJob.ChildHiveJobs) {
292          UploadJobWithChildren(service, child, hiveJob, groups, ref jobCount, totalJobCount);
293        }
294      } else {
295        throw new AddJobToHiveException(response.StatusMessage.ToString());
296      }
297    }
298
299    /// <summary>
300    /// Converts a string which can contain Ids separated by ';' to a enumerable
301    /// </summary>
302    private IEnumerable<string> ToResourceIdList(string resourceGroups) {
303      if (!string.IsNullOrEmpty(resourceGroups)) {
304        return resourceIds.Split(';');
305      } else {
306        return new List<string>();
307      }
308    }
309
310    public void Stop() {
311      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
312        foreach (HiveJob hj in HiveJob.GetAllHiveJobs()) {
313          service.Obj.AbortJob(hj.JobDto.Id);
314        }
315      }
316    }
317
318    #endregion
319
320    public void StartResultPolling() {
321      if (!jobResultPoller.IsPolling) {
322        jobResultPoller.Start();
323      } else {
324        throw new JobResultPollingException("Result polling already running");
325      }
326    }
327
328    public void StopResultPolling() {
329      if (jobResultPoller.IsPolling) {
330        jobResultPoller.Stop();
331      } else {
332        throw new JobResultPollingException("Result polling not running");
333      }
334    }
335
336    #region HiveJob Events
337    void HiveJob_JobStateChanged(object sender, EventArgs e) {
338      if (HiveJob != null) {
339        rootJobId = HiveJob.JobDto.Id;
340      }
341    }
342    #endregion
343
344    #region Eventhandler
345
346    public event EventHandler ExecutionTimeChanged;
347    private void OnExecutionTimeChanged() {
348      EventHandler handler = ExecutionTimeChanged;
349      if (handler != null) handler(this, EventArgs.Empty);
350    }
351
352    public event EventHandler ExecutionStateChanged;
353    private void OnExecutionStateChanged() {
354      LogMessage("ExecutionState changed to " + executionState.ToString());
355      EventHandler handler = ExecutionStateChanged;
356      if (handler != null) handler(this, EventArgs.Empty);
357    }
358
359    public event EventHandler Started;
360    private void OnStarted() {
361      LogMessage("Started");
362      timer.Start();
363      EventHandler handler = Started;
364      if (handler != null) handler(this, EventArgs.Empty);
365    }
366
367    public event EventHandler Stopped;
368    private void OnStopped() {
369      LogMessage("Stopped");
370      timer.Stop();
371      EventHandler handler = Stopped;
372      if (handler != null) handler(this, EventArgs.Empty);
373    }
374
375    public event EventHandler Paused;
376    private void OnPaused() {
377      LogMessage("Paused");
378      EventHandler handler = Paused;
379      if (handler != null) handler(this, EventArgs.Empty);
380    }
381
382    public event EventHandler Prepared;
383    protected virtual void OnPrepared() {
384      LogMessage("Prepared");
385      EventHandler handler = Prepared;
386      if (handler != null) handler(this, EventArgs.Empty);
387    }
388
389    public event EventHandler ResourceIdsChanged;
390    protected virtual void OnResourceIdsChanged() {
391      EventHandler handler = ResourceIdsChanged;
392      if (handler != null) handler(this, EventArgs.Empty);
393    }
394
395    public event EventHandler IsResultsPollingChanged;
396    private void OnIsPollingResultsChanged() {
397      if (this.IsPollingResults) {
398        LogMessage("Results Polling Started");
399      } else {
400        LogMessage("Results Polling Stopped");
401      }
402      EventHandler handler = IsResultsPollingChanged;
403      if (handler != null) handler(this, EventArgs.Empty);
404    }
405
406    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
407    private void OnExceptionOccured(Exception e) {
408      var handler = ExceptionOccurred;
409      if (handler != null) handler(this, new EventArgs<Exception>(e));
410    }
411
412    public event EventHandler HiveJobChanged;
413    private void OnHiveJobChanged() {
414      if (jobResultPoller != null && jobResultPoller.IsPolling) {
415        jobResultPoller.Stop();
416        DeregisterResultPollingEvents();
417      }
418      if (HiveJob != null) {
419        jobResultPoller = new JobResultPoller(HiveJob, ApplicationConstants.RESULT_POLLING_INTERVAL);
420        RegisterResultPollingEvents();
421      }
422      EventHandler handler = HiveJobChanged;
423      if (handler != null) handler(this, EventArgs.Empty);
424    }
425
426    public event EventHandler IsProgressingChanged;
427    private void OnIsProgressingChanged() {
428      var handler = IsProgressingChanged;
429      if (handler != null) handler(this, EventArgs.Empty);
430    }
431    #endregion
432
433    #region JobResultPoller Events
434    private void RegisterResultPollingEvents() {
435      jobResultPoller.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
436      jobResultPoller.JobResultsReceived += new EventHandler<EventArgs<JobResultList>>(jobResultPoller_JobResultReceived);
437      jobResultPoller.PollingStarted += new EventHandler(jobResultPoller_PollingStarted);
438      jobResultPoller.PollingFinished += new EventHandler(jobResultPoller_PollingFinished);
439      jobResultPoller.IsPollingChanged += new EventHandler(jobResultPoller_IsPollingChanged);
440    }
441    private void DeregisterResultPollingEvents() {
442      jobResultPoller.ExceptionOccured -= new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
443      jobResultPoller.JobResultsReceived -= new EventHandler<EventArgs<JobResultList>>(jobResultPoller_JobResultReceived);
444      jobResultPoller.PollingStarted -= new EventHandler(jobResultPoller_PollingStarted);
445      jobResultPoller.PollingFinished -= new EventHandler(jobResultPoller_PollingFinished);
446      jobResultPoller.IsPollingChanged -= new EventHandler(jobResultPoller_IsPollingChanged);
447    }
448    void jobResultPoller_IsPollingChanged(object sender, EventArgs e) {
449      this.IsPollingResults = jobResultPoller.IsPolling;
450    }
451    void jobResultPoller_PollingFinished(object sender, EventArgs e) {
452      LogMessage("Polling results finished");
453    }
454    void jobResultPoller_PollingStarted(object sender, EventArgs e) {
455      LogMessage("Polling results started");
456    }
457    void jobResultPoller_JobResultReceived(object sender, EventArgs<JobResultList> e) {
458      foreach (JobResult jobResult in e.Value) {
459        HiveJob hj = hiveJob.GetHiveJobByJobId(jobResult.Id);
460        if (hj != null) {
461          hj.UpdateFromJobResult(jobResult);
462          if ((hj.JobDto.State == JobState.Aborted ||
463               hj.JobDto.State == JobState.Failed ||
464               hj.JobDto.State == JobState.Finished) &&
465              !hj.IsFinishedOptimizerDownloaded) {
466            LogMessage(hj.JobDto.Id, "Downloading optimizer for job");
467            OptimizerJob optimizerJob = LoadOptimizerJob(hj.JobDto.Id);
468            if (optimizerJob == null) {
469              // something bad happened to this job. set to finished to allow the rest beeing downloaded
470              hj.IsFinishedOptimizerDownloaded = true;
471            } else {
472              if (jobResult.ParentJobId.HasValue) {
473                HiveJob parentHiveJob = HiveJob.GetHiveJobByJobId(jobResult.ParentJobId.Value);
474                parentHiveJob.UpdateChildOptimizer(optimizerJob, hj.JobDto.Id);
475              } else {
476                this.HiveJob.IsFinishedOptimizerDownloaded = true;
477              }
478            }
479          }
480        }
481      }
482      GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory)
483      if (AllJobsFinished()) {
484        this.ExecutionState = Core.ExecutionState.Stopped;
485        StopResultPolling();
486        OnStopped();
487      }
488    }
489
490    private bool AllJobsFinished() {
491      return HiveJob.GetAllHiveJobs().All(hj => hj.IsFinishedOptimizerDownloaded);
492    }
493
494    void jobResultPoller_ExceptionOccured(object sender, EventArgs<Exception> e) {
495      OnExceptionOccured(e.Value);
496    }
497    #endregion
498
499    #region Execution Time Timer
500    private void InitTimer() {
501      timer = new System.Timers.Timer(100);
502      timer.AutoReset = true;
503      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
504    }
505
506    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
507      DateTime now = DateTime.Now;
508      ExecutionTime += now - lastUpdateTime;
509      lastUpdateTime = now;
510    }
511    #endregion
512
513    #region Logging
514    private void LogMessage(string message) {
515      // HeuristicLab.Log is not Thread-Safe, so lock on every call
516      lock (locker) {
517        log.LogMessage(message);
518        Logger.Debug(message);
519      }
520    }
521
522    private void LogMessage(Guid jobId, string message) {
523      LogMessage(message + " (jobId: " + jobId + ")");
524    }
525
526    #endregion
527
528    /// <summary>
529    /// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem
530    /// </summary>
531    public void LoadHiveJob() {
532      progress = new Progress();
533      try {
534        IsProgressing = true;
535        int totalJobCount = 0;
536        JobResultList allResults;
537
538        progress.Status = "Connecting to Server...";
539        using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
540          progress.Status = "Downloading list of jobs...";
541          allResults = service.Obj.GetChildJobResults(rootJobId.Value, true, true).Obj;
542          totalJobCount = allResults.Count;
543        }
544
545        HiveJobDownloader downloader = new HiveJobDownloader(allResults.Select(x => x.Id));
546        downloader.StartAsync();
547
548        while (!downloader.IsFinished) {
549          progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
550          progress.Status = string.Format("Downloading/deserializing jobs... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
551          Thread.Sleep(500);
552        }
553        IDictionary<Guid, HiveJob> allHiveJobs = downloader.Results;
554
555        this.HiveJob = allHiveJobs[this.rootJobId.Value];
556
557        if (this.HiveJob.JobDto.DateFinished.HasValue) {
558          this.ExecutionTime = this.HiveJob.JobDto.DateFinished.Value - this.HiveJob.JobDto.DateCreated.Value;
559          this.lastUpdateTime = this.HiveJob.JobDto.DateFinished.Value;
560          this.ExecutionState = Core.ExecutionState.Stopped;
561          OnStopped();
562        } else {
563          this.ExecutionTime = DateTime.Now - this.HiveJob.JobDto.DateCreated.Value;
564          this.lastUpdateTime = DateTime.Now;
565          this.ExecutionState = Core.ExecutionState.Started;
566          OnStarted();
567        }
568
569        BuildHiveJobTree(this.HiveJob, allResults, allHiveJobs);
570        StartResultPolling();
571      }
572      catch (Exception e) {
573        OnExceptionOccured(e);
574      }
575      finally {
576        IsProgressing = false;
577      }
578    }
579
580    private void BuildHiveJobTree(HiveJob parentHiveJob, JobResultList allResults, IDictionary<Guid, HiveJob> allHiveJobs) {
581      IEnumerable<JobResult> childResults = from result in allResults
582                                            where result.ParentJobId.HasValue && result.ParentJobId.Value == parentHiveJob.JobDto.Id
583                                            orderby result.DateCreated ascending
584                                            select result;
585      foreach (JobResult jobResult in childResults) {
586        HiveJob childHiveJob = allHiveJobs[jobResult.Id];
587        parentHiveJob.AddChildHiveJob(childHiveJob);
588        BuildHiveJobTree(childHiveJob, allResults, allHiveJobs);
589      }
590    }
591
592    private OptimizerJob LoadOptimizerJob(Guid jobId) {
593      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
594        ResponseObject<SerializedJob> serializedJob = service.Obj.GetLastSerializedResult(jobId);
595        try {
596          return SerializedJob.Deserialize<OptimizerJob>(serializedJob.Obj.SerializedJobData);
597        }
598        catch {
599          return null;
600        }
601      }
602    }
603  }
604}
Note: See TracBrowser for help on using the repository browser.