Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive/sources/HeuristicLab.Hive.New/HeuristicLab.Clients.Hive/3.3/HiveExperiment/HiveExperimentClient.cs @ 4796

Last change on this file since 4796 was 4796, checked in by cneumuel, 13 years ago

#1233 applied new cloning mechanism

File size: 22.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Common;
27using HeuristicLab.Core;
28using HeuristicLab.Tracing;
29using HeuristicLab.Services.Hive.Common;
30using HeuristicLab.Services.Hive.Common.ServiceContracts;
31
32namespace HeuristicLab.Clients.Hive {
33  using DT = HeuristicLab.Services.Hive.Common.DataTransfer;
34  using HeuristicLab.Services.Hive.Common.DataTransfer;
35  using HeuristicLab.Clients.Hive.Jobs;
36
37  /// <summary>
38  /// An experiment which contains multiple batch runs of algorithms.
39  /// </summary>
40  [Item(itemName, itemDescription)]
41  public class HiveExperimentClient : NamedItem, IExecutable, IProgressReporter {
42    private object locker = new object();
43    private const string itemName = "Hive Experiment";
44    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
45    private System.Timers.Timer timer;
46
47    private Guid hiveExperimentId;
48    public Guid HiveExperimentId {
49      get { return hiveExperimentId; }
50      set { hiveExperimentId = value; }
51    }
52
53    private HiveJob hiveJob;
54    public HiveJob HiveJob {
55      get { return hiveJob; }
56      set {
57        DeregisterHiveJobEvents();
58        if (hiveJob != value) {
59          hiveJob = value;
60          RegisterHiveJobEvents();
61          OnHiveJobChanged();
62        }
63      }
64    }
65
66    private ILog log;
67    public ILog Log {
68      get { return log; }
69    }
70
71    private DateTime lastUpdateTime;
72
73    private string resourceIds;
74    public string ResourceIds {
75      get { return resourceIds; }
76      set {
77        if (resourceIds != value) {
78          resourceIds = value;
79          OnResourceIdsChanged();
80        }
81      }
82    }
83
84    private Guid? rootJobId;
85
86    private bool isPollingResults;
87    public bool IsPollingResults {
88      get { return isPollingResults; }
89      private set {
90        if (isPollingResults != value) {
91          isPollingResults = value;
92          OnIsPollingResultsChanged();
93        }
94      }
95    }
96
97    private bool isProgressing;
98    public bool IsProgressing {
99      get { return isProgressing; }
100      set {
101        if (isProgressing != value) {
102          isProgressing = value;
103          OnIsProgressingChanged();
104        }
105      }
106    }
107
108    private IProgress progress;
109    public IProgress Progress {
110      get { return progress; }
111    }
112
113    private JobResultPoller jobResultPoller;
114
115
116    public HiveExperimentClient()
117      : base(itemName, itemDescription) {
118      //this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
119      this.ResourceIds = "MyGroup";
120      this.log = new Log();
121      InitTimer();
122    }
123    public HiveExperimentClient(DT.HiveExperiment hiveExperimentDto)
124      : this() {
125      UpdateFromDto(hiveExperimentDto);
126    }
127    protected HiveExperimentClient(HiveExperimentClient original, Cloner cloner)
128      : base(original, cloner) {
129      this.resourceIds = original.resourceIds;
130      this.executionState = original.executionState;
131      this.executionTime = original.executionTime;
132      this.log = cloner.Clone(original.log);
133      this.lastUpdateTime = original.lastUpdateTime;
134      this.rootJobId = original.rootJobId;
135    }
136    public override IDeepCloneable Clone(Cloner cloner) {
137      return new HiveExperimentClient(this, cloner);
138    }
139
140
141    public void UpdateFromDto(DT.HiveExperiment hiveExperimentDto) {
142      this.HiveExperimentId = hiveExperimentDto.Id;
143      this.Name = hiveExperimentDto.Name;
144      this.Description = hiveExperimentDto.Description;
145      //this.ResourceIds = hiveExperimentDto.ResourceIds;
146      this.rootJobId = hiveExperimentDto.RootJobId;
147    }
148
149    public DT.HiveExperiment ToHiveExperimentDto() {
150      return new DT.HiveExperiment() {
151        Id = this.HiveExperimentId,
152        Name = this.Name,
153        Description = this.Description,
154        //ResourceIds = this.ResourceIds,
155        RootJobId = this.rootJobId
156      };
157    }
158
159    public void SetExperiment(Optimization.Experiment experiment) {
160      this.HiveJob = new HiveJob(experiment);
161      Prepare();
162    }
163
164    private void RegisterHiveJobEvents() {
165      if (HiveJob != null) {
166        HiveJob.JobStateChanged += new EventHandler(HiveJob_JobStateChanged);
167      }
168    }
169
170    private void DeregisterHiveJobEvents() {
171      if (HiveJob != null) {
172        HiveJob.JobStateChanged -= new EventHandler(HiveJob_JobStateChanged);
173      }
174    }
175
176    /// <summary>
177    /// Returns the experiment from the root HiveJob
178    /// </summary>
179    public Optimization.Experiment GetExperiment() {
180      if (this.HiveJob != null) {
181        return HiveJob.OptimizerJob.OptimizerAsExperiment;
182      }
183      return null;
184    }
185
186    #region IExecutable Members
187    private Core.ExecutionState executionState;
188    public ExecutionState ExecutionState {
189      get { return executionState; }
190      private set {
191        if (executionState != value) {
192          executionState = value;
193          OnExecutionStateChanged();
194        }
195      }
196    }
197
198    private TimeSpan executionTime;
199    public TimeSpan ExecutionTime {
200      get { return executionTime; }
201      private set {
202        if (executionTime != value) {
203          executionTime = value;
204          OnExecutionTimeChanged();
205        }
206      }
207    }
208
209    public void Pause() {
210      throw new NotSupportedException();
211    }
212
213    public void Prepare() {
214      // do nothing
215    }
216
217    public void Start() {
218      OnStarted();
219      ExecutionTime = new TimeSpan();
220      lastUpdateTime = DateTime.Now;
221      this.ExecutionState = Core.ExecutionState.Started;
222
223      Thread t = new Thread(RunUploadExperiment);
224      t.Name = "RunUploadExperimentThread";
225      t.Start();
226    }
227
228    private void RunUploadExperiment() {
229      try {
230        this.progress = new Progress("Connecting to server...");
231        IsProgressing = true;
232        using (Disposable<IHiveService> service = ServiceLocator.Instance.ServicePool.GetService()) {
233          IEnumerable<string> groups = ToResourceIdList(this.ResourceIds);
234          this.HiveJob.SetIndexInParentOptimizerList(null);
235
236          int totalJobCount = this.HiveJob.GetAllHiveJobs().Count();
237          int jobCount = 0;
238
239          this.progress.Status = "Uploading jobs...";
240          UploadJobWithChildren(service.Obj, this.HiveJob, null, groups, ref jobCount, totalJobCount);
241          this.rootJobId = this.HiveJob.Job.Id;
242          LogMessage("Finished sending jobs to hive");
243
244          // insert or update HiveExperiment
245          this.progress.Status = "Uploading HiveExperiment...";
246
247          DT.HiveExperiment he = service.Obj.GetHiveExperiment(service.Obj.AddHiveExperiment(this.ToHiveExperimentDto()));
248          this.UpdateFromDto(he);
249
250          StartResultPolling();
251        }
252      }
253      catch (Exception e) {
254        OnExceptionOccured(e);
255      }
256      finally {
257        IsProgressing = false;
258      }
259    }
260
261    /// <summary>
262    /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs
263    ///
264    /// </summary>
265    /// <param name="service"></param>
266    /// <param name="hiveJob"></param>
267    /// <param name="parentHiveJob">shall be null if its the root job</param>
268    /// <param name="groups"></param>
269    private void UploadJobWithChildren(IHiveService service, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<string> groups, ref int jobCount, int totalJobCount) {
270      jobCount++;
271      this.progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount);
272      JobData serializedJob;
273      if (hiveJob.OptimizerJob.ComputeInParallel &&
274        (hiveJob.OptimizerJob.Optimizer is Optimization.Experiment || hiveJob.OptimizerJob.Optimizer is Optimization.BatchRun)) {
275        hiveJob.Job.JobState = JobState.WaitingForChildJobs;
276        hiveJob.OptimizerJob.CollectChildJobs = false; // don't collect child-jobs on slaves
277        serializedJob = hiveJob.GetAsSerializedJob(true);
278      } else {
279        serializedJob = hiveJob.GetAsSerializedJob(false);
280      }
281
282      this.progress.Status = string.Format("Uploading job {0} of {1} ({2} kb)", jobCount, totalJobCount, serializedJob.Data.Count() / 1024);
283      this.progress.ProgressValue = (double)jobCount / totalJobCount;
284
285      if (parentHiveJob != null) {
286        //response = service.AddChildJob(parentHiveJob.Job.Id, serializedJob);
287        hiveJob.Job.Id = service.AddChildJob(parentHiveJob.Job.Id, hiveJob.Job, serializedJob);
288      } else {
289        // response = service.AddJobWithGroupStrings(serializedJob, groups);
290        hiveJob.Job.Id = service.AddJob(hiveJob.Job, serializedJob);
291      }
292
293      LogMessage(hiveJob.Job.Id, "Job sent to Hive");
294
295      foreach (HiveJob child in hiveJob.ChildHiveJobs) {
296        UploadJobWithChildren(service, child, hiveJob, groups, ref jobCount, totalJobCount);
297      }
298    }
299
300    /// <summary>
301    /// Converts a string which can contain Ids separated by ';' to a enumerable
302    /// </summary>
303    private IEnumerable<string> ToResourceIdList(string resourceGroups) {
304      if (!string.IsNullOrEmpty(resourceGroups)) {
305        return resourceIds.Split(';');
306      } else {
307        return new List<string>();
308      }
309    }
310
311    public void Stop() {
312      using (Disposable<IHiveService> service = ServiceLocator.Instance.ServicePool.GetService()) {
313        foreach (HiveJob hj in HiveJob.GetAllHiveJobs()) {
314          service.Obj.AbortJob(hj.Job.Id);
315        }
316      }
317    }
318
319    #endregion
320
321    public void StartResultPolling() {
322      if (!jobResultPoller.IsPolling) {
323        jobResultPoller.Start();
324      } else {
325        throw new JobResultPollingException("Result polling already running");
326      }
327    }
328
329    public void StopResultPolling() {
330      if (jobResultPoller.IsPolling) {
331        jobResultPoller.Stop();
332      } else {
333        throw new JobResultPollingException("Result polling not running");
334      }
335    }
336
337    #region HiveJob Events
338    void HiveJob_JobStateChanged(object sender, EventArgs e) {
339      if (HiveJob != null) {
340        rootJobId = HiveJob.Job.Id;
341      }
342    }
343    #endregion
344
345    #region Eventhandler
346
347    public event EventHandler ExecutionTimeChanged;
348    private void OnExecutionTimeChanged() {
349      EventHandler handler = ExecutionTimeChanged;
350      if (handler != null) handler(this, EventArgs.Empty);
351    }
352
353    public event EventHandler ExecutionStateChanged;
354    private void OnExecutionStateChanged() {
355      LogMessage("ExecutionState changed to " + executionState.ToString());
356      EventHandler handler = ExecutionStateChanged;
357      if (handler != null) handler(this, EventArgs.Empty);
358    }
359
360    public event EventHandler Started;
361    private void OnStarted() {
362      LogMessage("Started");
363      timer.Start();
364      EventHandler handler = Started;
365      if (handler != null) handler(this, EventArgs.Empty);
366    }
367
368    public event EventHandler Stopped;
369    private void OnStopped() {
370      LogMessage("Stopped");
371      timer.Stop();
372      EventHandler handler = Stopped;
373      if (handler != null) handler(this, EventArgs.Empty);
374    }
375
376    public event EventHandler Paused;
377    private void OnPaused() {
378      LogMessage("Paused");
379      EventHandler handler = Paused;
380      if (handler != null) handler(this, EventArgs.Empty);
381    }
382
383    public event EventHandler Prepared;
384    protected virtual void OnPrepared() {
385      LogMessage("Prepared");
386      EventHandler handler = Prepared;
387      if (handler != null) handler(this, EventArgs.Empty);
388    }
389
390    public event EventHandler ResourceIdsChanged;
391    protected virtual void OnResourceIdsChanged() {
392      EventHandler handler = ResourceIdsChanged;
393      if (handler != null) handler(this, EventArgs.Empty);
394    }
395
396    public event EventHandler IsResultsPollingChanged;
397    private void OnIsPollingResultsChanged() {
398      if (this.IsPollingResults) {
399        LogMessage("Results Polling Started");
400      } else {
401        LogMessage("Results Polling Stopped");
402      }
403      EventHandler handler = IsResultsPollingChanged;
404      if (handler != null) handler(this, EventArgs.Empty);
405    }
406
407    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
408    private void OnExceptionOccured(Exception e) {
409      var handler = ExceptionOccurred;
410      if (handler != null) handler(this, new EventArgs<Exception>(e));
411    }
412
413    public event EventHandler HiveJobChanged;
414    private void OnHiveJobChanged() {
415      if (jobResultPoller != null && jobResultPoller.IsPolling) {
416        jobResultPoller.Stop();
417        DeregisterResultPollingEvents();
418      }
419      if (HiveJob != null) {
420        jobResultPoller = new JobResultPoller(HiveJob, ApplicationConstants.ResultPollingInterval);
421        RegisterResultPollingEvents();
422      }
423      EventHandler handler = HiveJobChanged;
424      if (handler != null) handler(this, EventArgs.Empty);
425    }
426
427    public event EventHandler IsProgressingChanged;
428    private void OnIsProgressingChanged() {
429      var handler = IsProgressingChanged;
430      if (handler != null) handler(this, EventArgs.Empty);
431    }
432    #endregion
433
434    #region JobResultPoller Events
435    private void RegisterResultPollingEvents() {
436      jobResultPoller.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
437      jobResultPoller.JobResultsReceived += new EventHandler<EventArgs<IEnumerable<LightweightJob>>>(jobResultPoller_JobResultReceived);
438      jobResultPoller.PollingStarted += new EventHandler(jobResultPoller_PollingStarted);
439      jobResultPoller.PollingFinished += new EventHandler(jobResultPoller_PollingFinished);
440      jobResultPoller.IsPollingChanged += new EventHandler(jobResultPoller_IsPollingChanged);
441    }
442    private void DeregisterResultPollingEvents() {
443      jobResultPoller.ExceptionOccured -= new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
444      jobResultPoller.JobResultsReceived -= new EventHandler<EventArgs<IEnumerable<LightweightJob>>>(jobResultPoller_JobResultReceived);
445      jobResultPoller.PollingStarted -= new EventHandler(jobResultPoller_PollingStarted);
446      jobResultPoller.PollingFinished -= new EventHandler(jobResultPoller_PollingFinished);
447      jobResultPoller.IsPollingChanged -= new EventHandler(jobResultPoller_IsPollingChanged);
448    }
449    void jobResultPoller_IsPollingChanged(object sender, EventArgs e) {
450      this.IsPollingResults = jobResultPoller.IsPolling;
451    }
452    void jobResultPoller_PollingFinished(object sender, EventArgs e) {
453      LogMessage("Polling results finished");
454    }
455    void jobResultPoller_PollingStarted(object sender, EventArgs e) {
456      LogMessage("Polling results started");
457    }
458    void jobResultPoller_JobResultReceived(object sender, EventArgs<IEnumerable<LightweightJob>> e) {
459      foreach (LightweightJob lightweightJob in e.Value) {
460        HiveJob hj = hiveJob.GetHiveJobByJobId(lightweightJob.Id);
461        if (hj != null) {
462          hj.UpdateFromLightweightJob(lightweightJob);
463          if ((hj.Job.JobState == JobState.Aborted ||
464               hj.Job.JobState == JobState.Failed ||
465               hj.Job.JobState == JobState.Finished) &&
466              !hj.IsFinishedOptimizerDownloaded) {
467            LogMessage(hj.Job.Id, "Downloading optimizer for job");
468            OptimizerJob optimizerJob = LoadOptimizerJob(hj.Job.Id);
469            if (lightweightJob.ParentJobId.HasValue) {
470              HiveJob parentHiveJob = HiveJob.GetHiveJobByJobId(lightweightJob.ParentJobId.Value);
471              parentHiveJob.UpdateChildOptimizer(optimizerJob, hj.Job.Id);
472            } else {
473              this.HiveJob.IsFinishedOptimizerDownloaded = true;
474            }
475          }
476        }
477      }
478      GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory)
479      if (AllJobsFinished()) {
480        this.ExecutionState = Core.ExecutionState.Stopped;
481        StopResultPolling();
482        OnStopped();
483      }
484    }
485
486    private bool AllJobsFinished() {
487      return HiveJob.GetAllHiveJobs().All(hj => hj.IsFinishedOptimizerDownloaded);
488    }
489
490    void jobResultPoller_ExceptionOccured(object sender, EventArgs<Exception> e) {
491      OnExceptionOccured(e.Value);
492    }
493    #endregion
494
495    #region Execution Time Timer
496    private void InitTimer() {
497      timer = new System.Timers.Timer(100);
498      timer.AutoReset = true;
499      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
500    }
501
502    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
503      DateTime now = DateTime.Now;
504      ExecutionTime += now - lastUpdateTime;
505      lastUpdateTime = now;
506    }
507    #endregion
508
509    #region Logging
510    private void LogMessage(string message) {
511      // HeuristicLab.Log is not Thread-Safe, so lock on every call
512      lock (locker) {
513        log.LogMessage(message);
514        Logger.Debug(message);
515      }
516    }
517
518    private void LogMessage(Guid jobId, string message) {
519      //GetJobItemById(jobId).LogMessage(message);
520      LogMessage(message + " (jobId: " + jobId + ")");
521    }
522
523    #endregion
524
525    /// <summary>
526    /// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem
527    /// </summary>
528    public void LoadHiveJob() {
529      progress = new Progress();
530      try {
531        IsProgressing = true;
532        int totalJobCount = 0;
533        int jobCount = 0;
534        progress.Status = "Connecting to Server...";
535        using (var service = ServiceLocator.Instance.ServicePool.GetService()) {
536          // fetch all Job objects to create the full tree of tree of HiveJob objects
537          progress.Status = "Downloading list of jobs...";
538          IEnumerable<LightweightJob> allResults = service.Obj.GetLightweightChildJobs(rootJobId.Value, true, true);
539          totalJobCount = allResults.Count();
540
541          // download them first
542          IDictionary<Guid, Job> allJobs = new Dictionary<Guid, Job>();
543          IDictionary<Guid, JobData> allSerializedJobs = new Dictionary<Guid, JobData>();
544          foreach (LightweightJob lightweightJob in allResults) {
545            jobCount++;
546            progress.Status = string.Format("Downloading {0} of {1} jobs...", jobCount, totalJobCount);
547            allJobs.Add(lightweightJob.Id, service.Obj.GetJob(lightweightJob.Id));
548            allSerializedJobs.Add(lightweightJob.Id, service.Obj.GetJobData(lightweightJob.Id));
549            progress.ProgressValue = (double)jobCount / totalJobCount;
550          }
551
552          jobCount = 1;
553          progress.Status = string.Format("Deserializing {0} of {1} jobs... ({2} kb)", jobCount, totalJobCount, allSerializedJobs[this.rootJobId.Value].Data.Count() / 1024);
554          this.HiveJob = new HiveJob(allJobs[this.rootJobId.Value], allSerializedJobs[this.rootJobId.Value], false);
555          allSerializedJobs.Remove(this.rootJobId.Value); // reduce memory footprint
556          allJobs.Remove(this.rootJobId.Value);
557          progress.ProgressValue = (double)jobCount / totalJobCount;
558
559          if (this.HiveJob.Job.DateFinished.HasValue) {
560            this.ExecutionTime = this.HiveJob.Job.DateFinished.Value - this.HiveJob.Job.DateCreated.Value;
561            this.lastUpdateTime = this.HiveJob.Job.DateFinished.Value;
562            this.ExecutionState = Core.ExecutionState.Stopped;
563            OnStopped();
564          } else {
565            this.ExecutionTime = DateTime.Now - this.HiveJob.Job.DateCreated.Value;
566            this.lastUpdateTime = DateTime.Now;
567            this.ExecutionState = Core.ExecutionState.Started;
568            OnStarted();
569          }
570
571          // build child-job tree
572          LoadChildResults(service.Obj, this.HiveJob, allResults, allJobs, allSerializedJobs, progress, totalJobCount, ref jobCount);
573          StartResultPolling();
574        }
575      }
576      catch (Exception e) {
577        OnExceptionOccured(e);
578      }
579      finally {
580        IsProgressing = false;
581      }
582    }
583
584    private void LoadChildResults(IHiveService service, HiveJob parentHiveJob, IEnumerable<LightweightJob> allResults, IDictionary<Guid, Job> allJobs, IDictionary<Guid, JobData> allSerializedJobs, IProgress progress, int totalJobCount, ref int jobCount) {
585      IEnumerable<LightweightJob> childResults = from result in allResults
586                                                 where result.ParentJobId.HasValue && result.ParentJobId.Value == parentHiveJob.Job.Id
587                                                 orderby result.DateCreated ascending
588                                                 select result;
589      foreach (LightweightJob lightweightJob in childResults) {
590        jobCount++;
591        progress.Status = string.Format("Deserializing {0} of {1} jobs ({2} kb)...", jobCount, totalJobCount, allSerializedJobs[lightweightJob.Id].Data.Count() / 1024);
592        OptimizerJob optimizerJob = PersistenceUtil.Deserialize<OptimizerJob>(allSerializedJobs[lightweightJob.Id].Data);
593        progress.ProgressValue = (double)jobCount / totalJobCount;
594        HiveJob childHiveJob = new HiveJob(optimizerJob, false);
595        parentHiveJob.AddChildHiveJob(childHiveJob);
596        childHiveJob.Job = allJobs[lightweightJob.Id];
597        allSerializedJobs.Remove(lightweightJob.Id); // reduce memory footprint
598        allJobs.Remove(lightweightJob.Id);
599        if (jobCount % 10 == 0) GC.Collect(); // this is needed or otherwise HL takes over the system when the number of jobs is high
600        LoadChildResults(service, childHiveJob, allResults, allJobs, allSerializedJobs, progress, totalJobCount, ref jobCount);
601      }
602    }
603
604    private OptimizerJob LoadOptimizerJob(Guid jobId) {
605      using (var service = ServiceLocator.Instance.ServicePool.GetService()) {
606        JobData serializedJob = service.Obj.GetJobData(jobId);
607        return PersistenceUtil.Deserialize<OptimizerJob>(serializedJob.Data);
608      }
609    }
610  }
611}
Note: See TracBrowser for help on using the repository browser.