Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/ExperimentManager/HiveExperimentClient.cs @ 5526

Last change on this file since 5526 was 5526, checked in by cneumuel, 13 years ago

#1233

  • fixed handling of StateLog in DataLayer
  • extended unit tests
  • changed style of service calls to OKB-like style (using delegates)
  • added possibility that parent jobs can be finished immediately when child jobs are finished
File size: 29.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Threading;
26using HeuristicLab.Clients.Hive.Jobs;
27using HeuristicLab.Common;
28using HeuristicLab.Core;
29using HeuristicLab.Optimization;
30using HeuristicLab.Services.Hive.Common;
31using HeuristicLab.Services.Hive.Common.DataTransfer;
32using HeuristicLab.Services.Hive.Common.ServiceContracts;
33using HeuristicLab.Tracing;
34
35namespace HeuristicLab.Clients.Hive {
36  using System.Configuration;
37  using System.IO;
38  using HeuristicLab.PluginInfrastructure;
39  using DT = HeuristicLab.Services.Hive.Common.DataTransfer;
40
41  /// <summary>
42  /// An experiment which contains multiple batch runs of algorithms.
43  /// </summary>
44  [Item(itemName, itemDescription)]
45  public class HiveExperimentClient : NamedItem, IExecutable, IProgressReporter {
46    private object locker = new object();
47    private const string itemName = "Hive Experiment";
48    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
49    private System.Timers.Timer timer;
50    private DateTime lastUpdateTime;
51    private Guid rootJobId;
52    private JobResultPoller jobResultPoller;
53
54    private Guid hiveExperimentId;
55    public Guid HiveExperimentId {
56      get { return hiveExperimentId; }
57      set { hiveExperimentId = value; }
58    }
59
60    private HiveJob hiveJob;
61    public HiveJob HiveJob {
62      get { return hiveJob; }
63      set {
64        DeregisterHiveJobEvents();
65        if (hiveJob != value) {
66          hiveJob = value;
67          RegisterHiveJobEvents();
68          OnHiveJobChanged();
69        }
70      }
71    }
72
73    private ILog log;
74    public ILog Log {
75      get { return log; }
76    }
77
78    private string resourceNames;
79    public string ResourceNames {
80      get { return resourceNames; }
81      set {
82        if (resourceNames != value) {
83          resourceNames = value;
84          OnResourceNamesChanged();
85        }
86      }
87    }
88
89    private bool isPollingResults;
90    public bool IsPollingResults {
91      get { return isPollingResults; }
92      private set {
93        if (isPollingResults != value) {
94          isPollingResults = value;
95          OnIsPollingResultsChanged();
96        }
97      }
98    }
99
100    private bool isProgressing;
101    public bool IsProgressing {
102      get { return isProgressing; }
103      set {
104        if (isProgressing != value) {
105          isProgressing = value;
106          OnIsProgressingChanged();
107        }
108      }
109    }
110
111    private IProgress progress;
112    public IProgress Progress {
113      get { return progress; }
114    }
115
116    private IEnumerable<Plugin> onlinePlugins;
117    public IEnumerable<Plugin> OnlinePlugins {
118      get { return onlinePlugins; }
119      set { onlinePlugins = value; }
120    }
121
122    private List<Plugin> alreadyUploadedPlugins;
123    public List<Plugin> AlreadyUploadedPlugins {
124      get { return alreadyUploadedPlugins; }
125      set { alreadyUploadedPlugins = value; }
126    }
127
128    private bool useLocalPlugins;
129    public bool UseLocalPlugins {
130      get { return useLocalPlugins; }
131      set { useLocalPlugins = value; }
132    }
133
134    public HiveExperimentClient()
135      : base(itemName, itemDescription) {
136      this.ResourceNames = "HEAL";
137      this.log = new Log();
138      InitTimer();
139    }
140    public HiveExperimentClient(DT.HiveExperiment hiveExperimentDto)
141      : this() {
142      UpdateFromDto(hiveExperimentDto);
143    }
144    protected HiveExperimentClient(HiveExperimentClient original, Cloner cloner)
145      : base(original, cloner) {
146      this.ResourceNames = original.resourceNames;
147      this.ExecutionState = original.executionState;
148      this.ExecutionTime = original.executionTime;
149      this.log = cloner.Clone(original.log);
150      this.lastUpdateTime = original.lastUpdateTime;
151      this.rootJobId = original.rootJobId;
152    }
153    public override IDeepCloneable Clone(Cloner cloner) {
154      return new HiveExperimentClient(this, cloner);
155    }
156
157    public void UpdateFromDto(DT.HiveExperiment hiveExperimentDto) {
158      this.HiveExperimentId = hiveExperimentDto.Id;
159      this.Name = hiveExperimentDto.Name;
160      this.Description = hiveExperimentDto.Description;
161      this.ResourceNames = hiveExperimentDto.ResourceNames;
162      this.rootJobId = hiveExperimentDto.RootJobId;
163    }
164
165    public DT.HiveExperiment ToHiveExperimentDto() {
166      return new DT.HiveExperiment() {
167        Id = this.HiveExperimentId,
168        Name = this.Name,
169        Description = this.Description,
170        ResourceNames = this.ResourceNames,
171        RootJobId = this.rootJobId
172      };
173    }
174
175    public void SetExperiment(Experiment experiment) {
176      this.HiveJob = new HiveJob(experiment);
177      Prepare();
178    }
179
180    private void RegisterHiveJobEvents() {
181      if (HiveJob != null) {
182        HiveJob.JobStateChanged += new EventHandler(HiveJob_JobStateChanged);
183      }
184    }
185
186    private void DeregisterHiveJobEvents() {
187      if (HiveJob != null) {
188        HiveJob.JobStateChanged -= new EventHandler(HiveJob_JobStateChanged);
189      }
190    }
191
192    /// <summary>
193    /// Returns the experiment from the root HiveJob
194    /// </summary>
195    public Experiment GetExperiment() {
196      if (this.HiveJob != null) {
197        return HiveJob.OptimizerJob.OptimizerAsExperiment;
198      }
199      return null;
200    }
201
202    #region IExecutable Members
203    private ExecutionState executionState;
204    public ExecutionState ExecutionState {
205      get { return executionState; }
206      private set {
207        if (executionState != value) {
208          executionState = value;
209          OnExecutionStateChanged();
210        }
211      }
212    }
213
214    private TimeSpan executionTime;
215    public TimeSpan ExecutionTime {
216      get { return executionTime; }
217      private set {
218        if (executionTime != value) {
219          executionTime = value;
220          OnExecutionTimeChanged();
221        }
222      }
223    }
224
225    public void Pause() {
226      throw new NotSupportedException();
227    }
228
229    public void Prepare() {
230      this.timer.Stop();
231      this.ExecutionState = Core.ExecutionState.Prepared;
232      this.ExecutionTime = TimeSpan.Zero;
233    }
234
235    public void Start() {
236      OnStarted();
237      ExecutionTime = TimeSpan.Zero;
238      lastUpdateTime = DateTime.Now;
239      this.ExecutionState = Core.ExecutionState.Started;
240
241      Thread t = new Thread(RunUploadExperiment);
242      t.Name = "RunUploadExperimentThread";
243      t.Start();
244    }
245
246    private void RunUploadExperiment() {
247      try {
248        this.progress = new Progress("Connecting to server...");
249        IsProgressing = true;
250        ServiceLocator.Instance.CallHiveService(service => {
251          IEnumerable<string> resourceNames = ToResourceNameList(this.ResourceNames);
252          var resourceIds = new List<Guid>();
253          foreach (var resourceName in resourceNames) {
254            Guid resourceId = service.GetResourceId(resourceName);
255            if (resourceId == Guid.Empty) {
256              throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
257            }
258            resourceIds.Add(resourceId);
259          }
260
261          this.HiveJob.SetIndexInParentOptimizerList(null);
262
263          int totalJobCount = this.HiveJob.GetAllHiveJobs().Count();
264          int jobCount = 0;
265
266          this.progress.Status = "Uploading plugins...";
267          this.OnlinePlugins = service.GetPlugins();
268          this.AlreadyUploadedPlugins = new List<Plugin>();
269          Plugin configFilePlugin = UploadConfigurationFile(service);
270          this.alreadyUploadedPlugins.Add(configFilePlugin);
271
272          this.progress.Status = "Uploading jobs...";
273          UploadJobWithChildren(service, this.HiveJob, null, resourceIds, ref jobCount, totalJobCount, configFilePlugin.Id);
274          this.rootJobId = this.HiveJob.Job.Id;
275          LogMessage("Finished sending jobs to hive");
276
277          // insert or update HiveExperiment
278          this.progress.Status = "Uploading HiveExperiment...";
279
280          DT.HiveExperiment he = service.GetHiveExperiment(service.AddHiveExperiment(this.ToHiveExperimentDto()));
281          this.UpdateFromDto(he);
282
283          StartResultPolling();
284        });
285      }
286      catch (Exception e) {
287        OnExceptionOccured(e);
288        this.Prepare();
289      }
290      finally {
291        IsProgressing = false;
292      }
293    }
294
295    /// <summary>
296    /// Uploads the local configuration file as plugin
297    /// </summary>
298    private static Plugin UploadConfigurationFile(IHiveService service) {
299      string exeFilePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "HeuristicLab 3.3.exe");
300      string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
301
302      Plugin configPlugin = new Plugin() {
303        Name = "Configuration",
304        IsLocal = true,
305        Version = new Version()
306      };
307      PluginData configFile = new PluginData() {
308        FileName = configFileName,
309        Data = File.ReadAllBytes(configFileName)
310      };
311      configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
312      return configPlugin;
313    }
314
315    /// <summary>
316    /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs
317    /// </summary>
318    /// <param name="service"></param>
319    /// <param name="hiveJob"></param>
320    /// <param name="parentHiveJob">shall be null if its the root job</param>
321    /// <param name="groups"></param>
322    private void UploadJobWithChildren(IHiveService service, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<Guid> groups, ref int jobCount, int totalJobCount, Guid configPluginId) {
323      jobCount++;
324      this.progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount);
325      JobData jobData;
326      List<IPluginDescription> plugins;
327     
328      if (hiveJob.OptimizerJob.ComputeInParallel &&
329        (hiveJob.OptimizerJob.Optimizer is Optimization.Experiment || hiveJob.OptimizerJob.Optimizer is Optimization.BatchRun)) {
330        hiveJob.Job.IsParentJob = true;
331        hiveJob.Job.FinishWhenChildJobsFinished = true;
332        hiveJob.OptimizerJob.CollectChildJobs = false; // don't collect child-jobs on slaves
333        jobData = hiveJob.GetAsJobData(true, out plugins);
334      } else {
335        hiveJob.Job.IsParentJob = false;
336        hiveJob.Job.FinishWhenChildJobsFinished = false;
337        jobData = hiveJob.GetAsJobData(false, out plugins);
338      }
339
340      hiveJob.Job.PluginsNeededIds = GetPluginDependencies(service, onlinePlugins, alreadyUploadedPlugins, plugins, useLocalPlugins);
341      hiveJob.Job.PluginsNeededIds.Add(configPluginId);
342
343      this.progress.Status = string.Format("Uploading job {0} of {1} ({2} kb)", jobCount, totalJobCount, jobData.Data.Count() / 1024);
344      this.progress.ProgressValue = (double)jobCount / totalJobCount;
345
346      hiveJob.Job.SetState(JobState.Transferring);
347      if (parentHiveJob != null) {
348        hiveJob.Job.Id = service.AddChildJob(parentHiveJob.Job.Id, hiveJob.Job, jobData);
349      } else {
350        hiveJob.Job.Id = service.AddJob(hiveJob.Job, jobData, groups);
351      }
352
353      LogMessage(hiveJob.Job.Id, "Job sent to Hive");
354
355      foreach (HiveJob child in hiveJob.ChildHiveJobs) {
356        UploadJobWithChildren(service, child, hiveJob, groups, ref jobCount, totalJobCount, configPluginId);
357      }
358    }
359
360    /// <summary>
361    /// Converts a string which can contain Ids separated by ';' to a enumerable
362    /// </summary>
363    private IEnumerable<string> ToResourceNameList(string resourceGroups) {
364      if (!string.IsNullOrEmpty(resourceGroups)) {
365        return resourceNames.Split(';');
366      } else {
367        return new List<string>();
368      }
369    }
370
371    public void Stop() {
372      ServiceLocator.Instance.CallHiveService(service => {
373        foreach (HiveJob hj in HiveJob.GetAllHiveJobs()) {
374          service.StopJob(hj.Job.Id);
375        }
376      });
377    }
378
379    #endregion
380
381
382    #region HiveJob Events
383    void HiveJob_JobStateChanged(object sender, EventArgs e) {
384      if (HiveJob != null) {
385        rootJobId = HiveJob.Job.Id;
386      }
387    }
388    #endregion
389
390    #region Eventhandler
391
392    public event EventHandler ExecutionTimeChanged;
393    private void OnExecutionTimeChanged() {
394      EventHandler handler = ExecutionTimeChanged;
395      if (handler != null) handler(this, EventArgs.Empty);
396    }
397
398    public event EventHandler ExecutionStateChanged;
399    private void OnExecutionStateChanged() {
400      LogMessage("ExecutionState changed to " + executionState.ToString());
401      EventHandler handler = ExecutionStateChanged;
402      if (handler != null) handler(this, EventArgs.Empty);
403    }
404
405    public event EventHandler Started;
406    private void OnStarted() {
407      LogMessage("Started");
408      timer.Start();
409      EventHandler handler = Started;
410      if (handler != null) handler(this, EventArgs.Empty);
411    }
412
413    public event EventHandler Stopped;
414    private void OnStopped() {
415      LogMessage("Stopped");
416      timer.Stop();
417      EventHandler handler = Stopped;
418      if (handler != null) handler(this, EventArgs.Empty);
419    }
420
421    public event EventHandler Paused;
422    private void OnPaused() {
423      LogMessage("Paused");
424      EventHandler handler = Paused;
425      if (handler != null) handler(this, EventArgs.Empty);
426    }
427
428    public event EventHandler Prepared;
429    protected virtual void OnPrepared() {
430      LogMessage("Prepared");
431      EventHandler handler = Prepared;
432      if (handler != null) handler(this, EventArgs.Empty);
433    }
434
435    public event EventHandler ResourceNamesChanged;
436    protected virtual void OnResourceNamesChanged() {
437      EventHandler handler = ResourceNamesChanged;
438      if (handler != null) handler(this, EventArgs.Empty);
439    }
440
441    public event EventHandler IsResultsPollingChanged;
442    private void OnIsPollingResultsChanged() {
443      if (this.IsPollingResults) {
444        LogMessage("Results Polling Started");
445      } else {
446        LogMessage("Results Polling Stopped");
447      }
448      EventHandler handler = IsResultsPollingChanged;
449      if (handler != null) handler(this, EventArgs.Empty);
450    }
451
452    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
453    private void OnExceptionOccured(Exception e) {
454      var handler = ExceptionOccurred;
455      if (handler != null) handler(this, new EventArgs<Exception>(e));
456    }
457
458    public event EventHandler HiveJobChanged;
459    private void OnHiveJobChanged() {
460      if (jobResultPoller != null && jobResultPoller.IsPolling) {
461        jobResultPoller.Stop();
462        DeregisterResultPollingEvents();
463      }
464      if (HiveJob != null) {
465        jobResultPoller = new JobResultPoller(HiveJob, ApplicationConstants.ResultPollingInterval);
466        RegisterResultPollingEvents();
467      }
468      EventHandler handler = HiveJobChanged;
469      if (handler != null) handler(this, EventArgs.Empty);
470    }
471
472    public event EventHandler IsProgressingChanged;
473    private void OnIsProgressingChanged() {
474      var handler = IsProgressingChanged;
475      if (handler != null) handler(this, EventArgs.Empty);
476    }
477    #endregion
478
479    #region JobResultPoller Events
480
481    public void StartResultPolling() {
482      if (!jobResultPoller.IsPolling) {
483        jobResultPoller.Start();
484      } else {
485        throw new JobResultPollingException("Result polling already running");
486      }
487    }
488
489    public void StopResultPolling() {
490      if (jobResultPoller.IsPolling) {
491        jobResultPoller.Stop();
492      } else {
493        throw new JobResultPollingException("Result polling not running");
494      }
495    }
496
497    private void RegisterResultPollingEvents() {
498      jobResultPoller.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
499      jobResultPoller.JobResultsReceived += new EventHandler<EventArgs<IEnumerable<LightweightJob>>>(jobResultPoller_JobResultReceived);
500      jobResultPoller.PollingStarted += new EventHandler(jobResultPoller_PollingStarted);
501      jobResultPoller.PollingFinished += new EventHandler(jobResultPoller_PollingFinished);
502      jobResultPoller.IsPollingChanged += new EventHandler(jobResultPoller_IsPollingChanged);
503    }
504    private void DeregisterResultPollingEvents() {
505      jobResultPoller.ExceptionOccured -= new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
506      jobResultPoller.JobResultsReceived -= new EventHandler<EventArgs<IEnumerable<LightweightJob>>>(jobResultPoller_JobResultReceived);
507      jobResultPoller.PollingStarted -= new EventHandler(jobResultPoller_PollingStarted);
508      jobResultPoller.PollingFinished -= new EventHandler(jobResultPoller_PollingFinished);
509      jobResultPoller.IsPollingChanged -= new EventHandler(jobResultPoller_IsPollingChanged);
510    }
511    private void jobResultPoller_IsPollingChanged(object sender, EventArgs e) {
512      this.IsPollingResults = jobResultPoller.IsPolling;
513    }
514    private void jobResultPoller_PollingFinished(object sender, EventArgs e) {
515      LogMessage("Polling results finished");
516    }
517    private void jobResultPoller_PollingStarted(object sender, EventArgs e) {
518      LogMessage("Polling results started");
519    }
520    private void jobResultPoller_JobResultReceived(object sender, EventArgs<IEnumerable<LightweightJob>> e) {
521      foreach (LightweightJob lightweightJob in e.Value) {
522        HiveJob hj = hiveJob.GetHiveJobByJobId(lightweightJob.Id);
523        if (hj != null) {
524          hj.UpdateFromLightweightJob(lightweightJob);
525          if ((hj.Job.State == JobState.Aborted ||
526               hj.Job.State == JobState.Failed ||
527               hj.Job.State == JobState.Finished) &&
528              !hj.IsFinishedOptimizerDownloaded) {
529            LogMessage(hj.Job.Id, "Downloading optimizer for job");
530            OptimizerJob optimizerJob = LoadOptimizerJob(hj.Job.Id);
531            if (optimizerJob == null) {
532              // something bad happened to this job. set to finished to allow the rest beeing downloaded
533              hj.IsFinishedOptimizerDownloaded = true;
534            } else {
535              if (lightweightJob.ParentJobId.HasValue) {
536                HiveJob parentHiveJob = HiveJob.GetHiveJobByJobId(lightweightJob.ParentJobId.Value);
537                parentHiveJob.UpdateChildOptimizer(optimizerJob, hj.Job.Id);
538              } else {
539                this.HiveJob.IsFinishedOptimizerDownloaded = true;
540              }
541            }
542          }
543        }
544      }
545      GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory)
546      if (AllJobsFinished()) {
547        this.ExecutionState = Core.ExecutionState.Stopped;
548        StopResultPolling();
549        OnStopped();
550      }
551    }
552
553    private bool AllJobsFinished() {
554      return HiveJob.GetAllHiveJobs().All(hj => hj.IsFinishedOptimizerDownloaded);
555    }
556
557    private void jobResultPoller_ExceptionOccured(object sender, EventArgs<Exception> e) {
558      OnExceptionOccured(e.Value);
559    }
560    #endregion
561
562    #region Execution Time Timer
563    private void InitTimer() {
564      timer = new System.Timers.Timer(100);
565      timer.AutoReset = true;
566      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
567    }
568
569    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
570      DateTime now = DateTime.Now;
571      ExecutionTime += now - lastUpdateTime;
572      lastUpdateTime = now;
573    }
574    #endregion
575
576    #region Logging
577    private void LogMessage(string message) {
578      // HeuristicLab.Log is not Thread-Safe, so lock on every call
579      lock (locker) {
580        log.LogMessage(message);
581        Logger.Debug(message);
582      }
583    }
584
585    private void LogMessage(Guid jobId, string message) {
586      //GetJobItemById(jobId).LogMessage(message);
587      LogMessage(message + " (jobId: " + jobId + ")");
588    }
589
590    #endregion
591
592    #region Job Loading
593    /// <summary>
594    /// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem
595    /// </summary>
596    public void LoadHiveJob() {
597      progress = new Progress();
598      try {
599        IsProgressing = true;
600        int totalJobCount = 0;
601        int jobCount = 0;
602        progress.Status = "Connecting to Server...";
603        ServiceLocator.Instance.CallHiveService(service => {
604          // fetch all Job objects to create the full tree of tree of HiveJob objects
605          progress.Status = "Downloading list of jobs...";
606          IEnumerable<LightweightJob> allResults = service.GetLightweightChildJobs(rootJobId, true, true);
607          totalJobCount = allResults.Count();
608
609          // download them first
610          IDictionary<Guid, Job> allJobs = new Dictionary<Guid, Job>();
611          IDictionary<Guid, JobData> allJobDatas = new Dictionary<Guid, JobData>();
612          foreach (LightweightJob lightweightJob in allResults) {
613            jobCount++;
614            progress.Status = string.Format("Downloading {0} of {1} jobs...", jobCount, totalJobCount);
615            allJobs.Add(lightweightJob.Id, service.GetJob(lightweightJob.Id));
616            allJobDatas.Add(lightweightJob.Id, service.GetJobData(lightweightJob.Id));
617            progress.ProgressValue = (double)jobCount / totalJobCount;
618          }
619
620          jobCount = 1;
621          progress.Status = string.Format("Deserializing {0} of {1} jobs... ({2} kb)", jobCount, totalJobCount, allJobDatas[this.rootJobId].Data.Count() / 1024);
622          this.HiveJob = new HiveJob(allJobs[this.rootJobId], allJobDatas[this.rootJobId], false);
623          allJobDatas.Remove(this.rootJobId); // reduce memory footprint
624          allJobs.Remove(this.rootJobId);
625          progress.ProgressValue = (double)jobCount / totalJobCount;
626
627          if (this.HiveJob.Job.DateFinished.HasValue) {
628            this.ExecutionTime = this.HiveJob.Job.DateFinished.Value - this.HiveJob.Job.DateCreated;
629            this.lastUpdateTime = this.HiveJob.Job.DateFinished.Value;
630            this.ExecutionState = Core.ExecutionState.Stopped;
631            OnStopped();
632          } else {
633            this.ExecutionTime = DateTime.Now - this.HiveJob.Job.DateCreated;
634            this.lastUpdateTime = DateTime.Now;
635            this.ExecutionState = Core.ExecutionState.Started;
636            OnStarted();
637          }
638
639          // build child-job tree
640          LoadChildResults(service, this.HiveJob, allResults, allJobs, allJobDatas, progress, totalJobCount, ref jobCount);
641          StartResultPolling();
642        });
643      }
644      catch (Exception e) {
645        OnExceptionOccured(e);
646      }
647      finally {
648        IsProgressing = false;
649      }
650    }
651
652    private void LoadChildResults(IHiveService service, HiveJob parentHiveJob, IEnumerable<LightweightJob> allResults, IDictionary<Guid, Job> allJobs, IDictionary<Guid, JobData> allJobDatas, IProgress progress, int totalJobCount, ref int jobCount) {
653      IEnumerable<LightweightJob> childResults = from result in allResults
654                                                 where result.ParentJobId.HasValue && result.ParentJobId.Value == parentHiveJob.Job.Id
655                                                 orderby result.DateCreated ascending
656                                                 select result;
657      foreach (LightweightJob lightweightJob in childResults) {
658        jobCount++;
659        progress.Status = string.Format("Deserializing {0} of {1} jobs ({2} kb)...", jobCount, totalJobCount, allJobDatas[lightweightJob.Id].Data.Count() / 1024);
660        OptimizerJob optimizerJob = null;
661        try {
662          optimizerJob = PersistenceUtil.Deserialize<OptimizerJob>(allJobDatas[lightweightJob.Id].Data);
663        }
664        catch {
665          optimizerJob = null;
666        }
667        progress.ProgressValue = (double)jobCount / totalJobCount;
668        HiveJob childHiveJob = new HiveJob(optimizerJob, false);
669        parentHiveJob.AddChildHiveJob(childHiveJob);
670        childHiveJob.Job = allJobs[lightweightJob.Id];
671        allJobDatas.Remove(lightweightJob.Id); // reduce memory footprint
672        allJobs.Remove(lightweightJob.Id);
673        if (jobCount % 10 == 0) GC.Collect(); // this is needed or otherwise HL takes over the system when the number of jobs is high
674        LoadChildResults(service, childHiveJob, allResults, allJobs, allJobDatas, progress, totalJobCount, ref jobCount);
675      }
676    }
677
678    private OptimizerJob LoadOptimizerJob(Guid jobId) {
679      return ServiceLocator.Instance.CallHiveService(service => {
680        JobData jobData = service.GetJobData(jobId);
681        try {
682          return PersistenceUtil.Deserialize<OptimizerJob>(jobData.Data);
683        }
684        catch {
685          return null;
686        }
687      });
688    }
689    #endregion
690
691    #region Plugin Management
692    /// <summary>
693    /// Checks if plugins are available on Hive Server. If not they are uploaded. Ids are returned.
694    /// </summary>
695    /// <param name="service">An active service-proxy</param>
696    /// <param name="onlinePlugins">List of plugins which are available online</param>
697    /// <param name="alreadyUploadedPlugins">List of plugins which have been uploaded from this HiveExperiment</param>
698    /// <param name="neededPlugins">List of plugins which need to be uploaded</param>
699    /// <param name="useLocalPlugins">If true, the plugins which are already online are ignored. All local plugins are uploaded, but only once.</param>
700    /// <returns></returns>
701    private static List<Guid> GetPluginDependencies(IHiveService service, IEnumerable<Plugin> onlinePlugins, List<Plugin> alreadyUploadedPlugins, IEnumerable<IPluginDescription> neededPlugins, bool useLocalPlugins) {
702      var pluginIds = new List<Guid>();
703      foreach (var neededPlugin in neededPlugins) {
704        Plugin foundPlugin = alreadyUploadedPlugins.SingleOrDefault(p => p.Name == neededPlugin.Name && p.Version == neededPlugin.Version);
705        if (foundPlugin == null) {
706          foundPlugin = onlinePlugins.SingleOrDefault(p => p.Name == neededPlugin.Name && p.Version == neededPlugin.Version);
707          if (useLocalPlugins || foundPlugin == null) {
708            Plugin p = CreatePlugin(neededPlugin, useLocalPlugins);
709            List<PluginData> pd = CreatePluginDatas(neededPlugin);
710            p.Id = service.AddPlugin(p, pd);
711            alreadyUploadedPlugins.Add(p);
712          } else {
713            pluginIds.Add(foundPlugin.Id);
714          }
715        } else {
716          pluginIds.Add(foundPlugin.Id);
717        }
718      }
719      return pluginIds;
720    }
721
722    private static Plugin CreatePlugin(IPluginDescription plugin, bool useLocalPlugins) {
723      return new Plugin() { Name = plugin.Name, Version = plugin.Version, IsLocal = useLocalPlugins };
724    }
725
726    private static List<PluginData> CreatePluginDatas(IPluginDescription plugin) {
727      List<PluginData> pluginDatas = new List<PluginData>();
728
729      foreach (IPluginFile pf in plugin.Files) {
730        PluginData pluginData = new PluginData();
731
732        pluginData.Data = File.ReadAllBytes(pf.Name);
733        pluginData.FileName = Path.GetFileName(pf.Name);
734        pluginDatas.Add(pluginData);
735      }
736      return pluginDatas;
737    }
738
739    /// <summary>
740    /// Gets the Ids of all plugins needed for executing the job.
741    /// All loaded plugins are assumed to be necessary.
742    /// If a plugin with the same name and version is already online, it is used. Otherwise the local plugin is uploaded.
743    /// If useLocalPlugins is true, all local plugins are uploaded regardless of the existence of the same plugin online.
744    /// </summary>
745    //public static List<Guid> GetPluginsNeededIds(bool useLocalPlugins) {
746    //  IEnumerable<IPluginDescription> localPlugins = ApplicationManager.Manager.Plugins;
747    //  List<Guid> pluginsNeededIds = new List<Guid>();
748
749    //  using (var service = ServiceLocator.Instance.GetService()) {
750    //    IEnumerable<Plugin> onlinePlugins = service.Obj.GetPlugins();
751
752    //    foreach (IPluginDescription localPlugin in localPlugins) {
753    //      Plugin found = onlinePlugins.Where(onlinePlugin => onlinePlugin.Name == localPlugin.Name && onlinePlugin.Version == localPlugin.Version).SingleOrDefault();
754    //      if (!useLocalPlugins && found != null) {
755    //        // plugin is available online; reuse
756    //        pluginsNeededIds.Add(found.Id);
757    //      } else {
758    //        // upload the plugin
759    //        Plugin p = new Plugin() { Name = localPlugin.Name, Version = localPlugin.Version, IsLocal = useLocalPlugins };
760    //        List<PluginData> pluginDatas = new List<PluginData>();
761
762    //        foreach (IPluginFile pf in localPlugin.Files) {
763    //          PluginData pluginData = new PluginData();
764
765    //          pluginData.Data = File.ReadAllBytes(pf.Name);
766    //          pluginDatas.Add(pluginData);
767    //        }
768    //        pluginsNeededIds.Add(service.Obj.AddPlugin(p, pluginDatas));
769    //      }
770    //    }
771    //  }
772    //  return pluginsNeededIds;
773    //}
774    #endregion
775  }
776}
Note: See TracBrowser for help on using the repository browser.