Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4333

Last change on this file since 4333 was 4333, checked in by cneumuel, 14 years ago

added authorizationManager which checks for permission to specific jobs (#1168)

File size: 35.8 KB
RevLine 
[4116]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Linq;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using HeuristicLab.Optimization;
28using System.Drawing;
29using HeuristicLab.Collections;
30using System.Collections.Generic;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using HeuristicLab.Persistence.Default.Xml;
34using HeuristicLab.PluginInfrastructure;
35using System.Reflection;
36using HeuristicLab.Hive.Contracts.Interfaces;
37using HeuristicLab.Hive.Contracts;
38using System.Threading;
39using HeuristicLab.Tracing;
[4119]40using HeuristicLab.Hive.JobBase;
41using System.Diagnostics;
42using System.Collections;
[4170]43using System.ServiceModel;
[4263]44using HeuristicLab.Hive.Contracts.ResponseObjects;
[4305]45using HeuristicLab.Hive.Experiment.Properties;
[4116]46
47namespace HeuristicLab.Hive.Experiment {
48  /// <summary>
49  /// An experiment which contains multiple batch runs of algorithms.
50  /// </summary>
51  [Item(itemName, itemDescription)]
52  [Creatable("Testing & Analysis")]
53  [StorableClass]
54  public class HiveExperiment : NamedItem, IExecutable {
55    private const string itemName = "Hive Experiment";
[4139]56    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
[4170]57    private const int resultPollingIntervalMs = 5000;
[4141]58    private const int snapshotPollingIntervalMs = 1000;
[4133]59    private const int maxSnapshotRetries = 20;
60    private object locker = new object();
[4119]61
[4120]62    private System.Timers.Timer timer;
63    private bool pausePending, stopPending;
[4170]64    private bool sendingJobsFinished = false;
[4133]65
[4170]66    // ensure that only 2 threads can fetch jobresults simultaniously
67    private Semaphore fetchJobSemaphore = new Semaphore(2, 2);
68
[4333]69    private static object pendingOptimizerMappingsLocker = new object();
70
[4173]71    private bool stopResultsPollingPending = false;
[4116]72
[4173]73    private Thread resultPollingThread;
74
[4133]75    private bool isPollingResults;
76    public bool IsPollingResults {
77      get { return isPollingResults; }
78      private set {
79        if (isPollingResults != value) {
80          isPollingResults = value;
81          OnIsPollingResultsChanged();
82        }
83      }
84    }
85
[4173]86    public IEnumerable<string> ResourceGroups {
87      get {
88        if (!string.IsNullOrEmpty(resourceIds)) {
89          return resourceIds.Split(';');
90        } else {
91          return new List<string>();
92        }
93      }
94    }
[4305]95
[4173]96    #region Storable Properties
97    [Storable]
98    private DateTime lastUpdateTime;
[4133]99
[4139]100    /// <summary>
101    /// Mapping from JobId to an optimizer.
102    /// Stores all pending optimizers. If an optimizer is finished it is removed from this collection
103    /// </summary>
[4119]104    [Storable]
[4139]105    private IDictionary<Guid, IOptimizer> pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
[4119]106
[4139]107    /// <summary>
108    /// Stores a mapping from the child-optimizer to the parent optimizer.
109    /// Needed to replace a finished optimizer in the optimizer-tree.
110    /// Only pending optmizers are stored.
111    /// </summary>
[4120]112    [Storable]
[4139]113    private IDictionary<IOptimizer, IOptimizer> parentOptimizersByPendingOptimizer = new Dictionary<IOptimizer, IOptimizer>();
114
115    [Storable]
[4120]116    private JobItemList jobItems;
117    public JobItemList JobItems {
118      get { return jobItems; }
119    }
[4116]120
[4119]121    [Storable]
[4116]122    private string resourceIds;
123    public string ResourceIds {
124      get { return resourceIds; }
[4120]125      set {
126        if (resourceIds != value) {
[4133]127          resourceIds = value;
[4120]128          OnResourceIdsChanged();
129        }
130      }
[4116]131    }
132
[4119]133    [Storable]
[4116]134    private HeuristicLab.Optimization.Experiment experiment;
135    public HeuristicLab.Optimization.Experiment Experiment {
136      get { return experiment; }
[4120]137      set {
138        if (experiment != value) {
139          experiment = value;
140          OnExperimentChanged();
141        }
142      }
[4116]143    }
[4119]144
[4120]145    [Storable]
146    private ILog log;
147    public ILog Log {
148      get { return log; }
149    }
150
[4173]151    [Storable]
152    private Core.ExecutionState executionState;
153    public ExecutionState ExecutionState {
154      get { return executionState; }
155      private set {
156        if (executionState != value) {
157          executionState = value;
158          OnExecutionStateChanged();
159        }
160      }
161    }
162
163    [Storable]
164    private TimeSpan executionTime;
165    public TimeSpan ExecutionTime {
166      get { return executionTime; }
167      private set {
168        if (executionTime != value) {
169          executionTime = value;
170          OnExecutionTimeChanged();
171        }
172      }
173    }
174    #endregion
175
[4119]176    [StorableConstructor]
177    public HiveExperiment(bool deserializing)
178      : base(deserializing) {
[4116]179    }
180
181    public HiveExperiment()
182      : base(itemName, itemDescription) {
183      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
[4120]184      this.log = new Log();
185      pausePending = stopPending = false;
186      jobItems = new JobItemList();
[4133]187      isPollingResults = false;
[4170]188      RegisterJobItemListEvents();
[4120]189      InitTimer();
[4116]190    }
191
[4119]192    public override IDeepCloneable Clone(Cloner cloner) {
[4121]193      LogMessage("I am beeing cloned");
[4119]194      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
195      clone.resourceIds = this.resourceIds;
196      clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment);
197      clone.executionState = this.executionState;
198      clone.executionTime = this.executionTime;
[4139]199      clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
[4141]200
[4333]201      lock (pendingOptimizerMappingsLocker) {
202        foreach (var pair in this.pendingOptimizersByJobId)
203          clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
[4141]204
[4333]205        foreach (var pair in this.parentOptimizersByPendingOptimizer)
206          clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);
207      }
[4120]208      clone.log = (ILog)cloner.Clone(log);
209      clone.stopPending = this.stopPending;
210      clone.pausePending = this.pausePending;
[4170]211      clone.jobItems.AddRange((JobItemList)cloner.Clone(jobItems));
[4133]212      clone.lastUpdateTime = this.lastUpdateTime;
213      clone.isPollingResults = this.isPollingResults;
[4119]214      return clone;
215    }
216
[4120]217    [StorableHook(HookType.AfterDeserialization)]
218    private void AfterDeserialization() {
219      InitTimer();
[4133]220      this.IsPollingResults = false;
221      this.stopResultsPollingPending = false;
[4170]222      RegisterJobItemListEvents();
[4121]223      LogMessage("I was deserialized.");
[4120]224    }
[4170]225
[4173]226    #region Execution Time Timer
[4120]227    private void InitTimer() {
228      timer = new System.Timers.Timer(100);
229      timer.AutoReset = true;
230      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
231    }
232
233    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
234      DateTime now = DateTime.Now;
235      ExecutionTime += now - lastUpdateTime;
236      lastUpdateTime = now;
237    }
[4173]238    #endregion
[4120]239
[4173]240    #region IExecutable Methods
[4116]241    public void Pause() {
242      throw new NotSupportedException();
243    }
244
245    public void Prepare() {
[4119]246      if (experiment != null) {
[4141]247        StopResultPolling();
[4333]248        lock (pendingOptimizerMappingsLocker) {
[4173]249          pendingOptimizersByJobId.Clear();
[4333]250          parentOptimizersByPendingOptimizer.Clear();
[4173]251        }
252        lock (jobItems) {
253          jobItems.Clear();
254        }
[4119]255        experiment.Prepare();
[4120]256        this.ExecutionState = Core.ExecutionState.Prepared;
[4119]257        OnPrepared();
258      }
[4116]259    }
260
261    public void Start() {
[4173]262      sendingJobsFinished = false;
[4120]263      OnStarted();
[4141]264      ExecutionTime = new TimeSpan();
[4120]265      lastUpdateTime = DateTime.Now;
[4119]266      this.ExecutionState = Core.ExecutionState.Started;
[4170]267      StartResultPolling();
268
[4120]269      Thread t = new Thread(() => {
[4305]270        IClientFacade clientFacade = CreateStreamedClientFacade();
[4173]271
[4170]272        try {
273          pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
[4120]274
[4170]275          LogMessage("Extracting jobs from Experiment");
276          parentOptimizersByPendingOptimizer = GetOptimizers(true);
277          LogMessage("Extraction of jobs from Experiment finished");
[4173]278
[4170]279          IEnumerable<string> groups = ResourceGroups;
[4333]280          lock (pendingOptimizerMappingsLocker) {
281            foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) {
282              SerializedJob serializedJob = CreateSerializedJob(optimizer);
283              ResponseObject<JobDto> response = clientFacade.AddJobWithGroupStrings(serializedJob, groups);
[4173]284              pendingOptimizersByJobId.Add(response.Obj.Id, optimizer);
[4170]285
[4333]286              JobItem jobItem = new JobItem() {
287                JobDto = response.Obj,
288                LatestSnapshot = null,
289                Optimizer = optimizer
290              };
291              lock (jobItems) {
292                jobItems.Add(jobItem);
293              }
294              LogMessage(jobItem.JobDto.Id, "Job sent to Hive");
[4170]295            }
296          }
[4305]297        }
298        catch (Exception e) {
[4170]299          LogMessage("Error: Starting HiveExperiment failed: " + e.Message);
300          this.ExecutionState = Core.ExecutionState.Stopped;
301          OnStopped();
[4120]302        }
[4316]303        finally {
304          ServiceLocator.DisposeClientFacade(clientFacade);
305        }
[4139]306
[4173]307        sendingJobsFinished = true;
[4120]308      });
309      t.Start();
310    }
311
[4135]312    public void Stop() {
313      this.ExecutionState = Core.ExecutionState.Stopped;
314      foreach (JobItem jobItem in jobItems) {
315        AbortJob(jobItem.JobDto.Id);
316      }
317      OnStopped();
318    }
[4173]319    #endregion
[4135]320
[4173]321    #region Optimizier Management
[4120]322    /// <summary>
323    /// Returns all optimizers in the current Experiment
324    /// </summary>
325    /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
326    /// <returns></returns>
[4139]327    private IDictionary<IOptimizer, IOptimizer> GetOptimizers(bool flatout) {
[4120]328      if (!flatout) {
[4139]329        var optimizers = new Dictionary<IOptimizer, IOptimizer>();
330        foreach (IOptimizer opt in experiment.Optimizers) {
331          optimizers.Add(experiment, opt);
332        }
333        return optimizers;
[4120]334      } else {
[4144]335        return FlatOptimizerTree(null, experiment, "");
[4116]336      }
337    }
338
[4139]339    /// <summary>
340    /// Recursively iterates all IOptimizers in the optimizer-tree and returns them.
341    ///
342    /// [chn] this could be implemented more cleanly if Experiment and BatchRun would implement an interface like:
343    /// interface IParallelizable {
344    ///   IEnumerable&lt;IOptimizer&gt; GetOptimizers();
345    /// }
346    /// </summary>
347    /// <returns>a dictionary mapping from the parent optimizer to the child optimizer</returns>
[4144]348    private IDictionary<IOptimizer, IOptimizer> FlatOptimizerTree(IOptimizer parent, IOptimizer optimizer, string prepend) {
[4139]349      IDictionary<IOptimizer, IOptimizer> optimizers = new Dictionary<IOptimizer, IOptimizer>();
350      if (optimizer is HeuristicLab.Optimization.Experiment) {
351        HeuristicLab.Optimization.Experiment experiment = optimizer as HeuristicLab.Optimization.Experiment;
[4144]352        if (this.experiment != experiment) {
[4170]353          prepend += experiment.Name + "/"; // don't prepend for top-level optimizers
[4144]354        }
[4139]355        foreach (IOptimizer opt in experiment.Optimizers) {
[4144]356          AddRange(optimizers, FlatOptimizerTree(experiment, opt, prepend));
[4139]357        }
358      } else if (optimizer is BatchRun) {
359        BatchRun batchRun = optimizer as BatchRun;
[4170]360        prepend += batchRun.Name + "/";
[4139]361        for (int i = 0; i < batchRun.Repetitions; i++) {
362          IOptimizer opt = (IOptimizer)batchRun.Algorithm.Clone();
[4144]363          opt.Name += " [" + i + "]";
364          IDictionary<IOptimizer, IOptimizer> batchOptimizers = FlatOptimizerTree(batchRun, opt, prepend);
365          AddRange(optimizers, batchOptimizers);
[4139]366        }
367      } else if (optimizer is EngineAlgorithm) {
[4144]368        optimizer.Name = prepend + optimizer.Name;
[4139]369        optimizers.Add(optimizer, parent);
[4173]370        LogMessage("Optimizer extracted: " + optimizer.Name);
[4139]371      } else {
372        Logger.Warn("Optimizer of type " + optimizers.GetType().ToString() + " unknown");
[4144]373        optimizer.Name = prepend + optimizer.Name;
[4139]374        optimizers.Add(optimizer, parent);
[4173]375        LogMessage("Optimizer extracted: " + optimizer.Name);
[4139]376      }
377      return optimizers;
378    }
379
380    private void ReplaceOptimizer(IOptimizer parentOptimizer, IOptimizer originalOptimizer, IOptimizer newOptimizer) {
[4121]381      lock (locker) {
[4139]382        if (parentOptimizer is HeuristicLab.Optimization.Experiment) {
383          HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)parentOptimizer;
384          int originalOptimizerIndex = exp.Optimizers.IndexOf(originalOptimizer);
385          exp.Optimizers[originalOptimizerIndex] = newOptimizer;
386        } else if (parentOptimizer is BatchRun) {
387          BatchRun batchRun = (BatchRun)parentOptimizer;
388          if (newOptimizer is IAlgorithm) {
[4170]389            batchRun.Runs.Add(new Run(newOptimizer.Name, (IAlgorithm)newOptimizer));
[4139]390          } else {
391            throw new NotSupportedException("Only IAlgorithm types supported");
392          }
393        } else {
394          throw new NotSupportedException("Invalid parentOptimizer");
395        }
[4121]396      }
[4120]397    }
398
[4173]399    private bool NoMorePendingOptimizers() {
[4333]400      lock (pendingOptimizerMappingsLocker) {
[4173]401        return pendingOptimizersByJobId.Count == 0;
402      }
[4133]403    }
[4135]404
[4173]405    /// <summary>
406    /// Removes optimizers from
407    ///  - parentOptimizersByPendingOptimizer
408    ///  - pendingOptimizersByJobId
409    /// </summary>
410    /// <param name="jobId"></param>
411    private void DisposeOptimizerMappings(Guid jobId) {
[4333]412      LogMessage(jobId, "Disposing Optimizer Mappings");
413      lock (pendingOptimizerMappingsLocker) {
[4173]414        parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]);
415        pendingOptimizersByJobId.Remove(jobId);
416      }
417    }
418
[4116]419    #endregion
420
[4173]421    #region Job Management
422    /// <summary>
423    /// Updates all JobItems with the results
424    /// </summary>
425    /// <param name="jobResultList"></param>
426    private void UpdateJobItems(JobResultList jobResultList) {
427      // use a Dict to avoid quadratic runtime complexity
428      IDictionary<Guid, JobResult> jobResultDict = jobResultList.ToDictionary(job => job.JobId);
429      lock (jobItems) {
430        foreach (JobItem jobItem in JobItems) {
431          if (jobResultDict.ContainsKey(jobItem.JobDto.Id)) {
432            jobItem.JobResult = jobResultDict[jobItem.JobDto.Id];
433          }
[4170]434        }
[4173]435      }
[4133]436    }
437
[4173]438    private void JobItem_JobStateChanged(object sender, EventArgs e) {
439      JobItem jobItem = (JobItem)sender;
[4333]440
[4173]441      Thread t = new Thread(() => {
[4333]442        try {
443          if (jobItem.State == JobState.Finished) {
444            FetchAndUpdateJob(jobItem.JobDto.Id);
445            DisposeOptimizerMappings(jobItem.JobDto.Id);
446          } else if (jobItem.State == JobState.Failed) {
447            DisposeOptimizerMappings(jobItem.JobDto.Id);
448          }
[4173]449
[4333]450          if (NoMorePendingOptimizers()) {
451            StopResultPolling();
452            this.ExecutionState = Core.ExecutionState.Stopped;
453            OnStopped();
454          }
[4173]455        }
[4333]456        catch (Exception ex) {
457          Logger.Error("JobItem_JobStateChanged failed badly: " + ex.Message);
458          LogMessage("JobItem_JobStateChanged failed badly: " + ex.Message);
459        }
[4173]460      });
461      t.Start();
462    }
463
464    /// <summary>
465    /// Fetches the finished job from the server and updates the jobItem
466    /// </summary>
467    private void FetchAndUpdateJob(Guid jobId) {
[4333]468      bool tryagain = false;
[4173]469      LogMessage(jobId, "FetchAndUpdateJob started");
[4333]470      if (fetchJobSemaphore.WaitOne(new TimeSpan(0, 2, 0))) {
471        IClientFacade clientFacade = null;
472        try {
473          clientFacade = CreateStreamedClientFacade();
474          IOptimizer originalOptimizer;
475          originalOptimizer = pendingOptimizersByJobId[jobId];
476
477          ResponseObject<SerializedJob> jobResponse = clientFacade.GetLastSerializedResult(jobId, false, false);
478          IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
479          IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
480          ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer);
481          LogMessage(jobId, "FetchAndUpdateJob ended");
482        }
483        catch (Exception e) {
484          LogMessage(jobId, "FetchAndUpdateJob failed: " + e.Message + ". Will try again!");
485          tryagain = true;
486        }
487        finally {
488          ServiceLocator.DisposeClientFacade(clientFacade);
489          fetchJobSemaphore.Release();
490        }
491      } else {
492        LogMessage(jobId, "FetchAndUpdateJob timed out. Will try again!");
493        tryagain = true;
[4173]494      }
495
[4333]496      if (tryagain) {
497        FetchAndUpdateJob(jobId);
498      }
[4173]499    }
500
501    private void UpdateJobItem(JobDto jobDto) {
502      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);
503      jobItem.JobDto = jobDto;
504    }
505
506    public void AbortJob(Guid jobId) {
[4305]507      IClientFacade clientFacade = CreateClientFacade();
[4302]508      Response response = clientFacade.AbortJob(jobId);
[4173]509      LogMessage(jobId, "Aborting Job: " + response.StatusMessage);
510    }
511
[4119]512    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
513      IJob job = new OptimizerJob() {
514        Optimizer = optimizer
515      };
516
517      // serialize job
[4116]518      MemoryStream memStream = new MemoryStream();
[4119]519      XmlGenerator.Serialize(job, memStream);
520      byte[] jobByteArray = memStream.ToArray();
521      memStream.Dispose();
[4116]522
523      // find out which which plugins are needed for the given object
524      List<HivePluginInfoDto> pluginsNeeded = (
[4170]525        from p in GetDeclaringPlugins(job.GetType())
[4116]526        select new HivePluginInfoDto() {
527          Name = p.Name,
[4119]528          Version = p.Version
[4116]529        }).ToList();
530
531      JobDto jobDto = new JobDto() {
532        CoresNeeded = 1, // [chn] how to determine real cores needed?
533        PluginsNeeded = pluginsNeeded,
[4264]534        State = JobState.Offline,
[4333]535        MemoryNeeded = 0
[4116]536      };
537
538      SerializedJob serializedJob = new SerializedJob() {
539        JobInfo = jobDto,
[4119]540        SerializedJobData = jobByteArray
[4116]541      };
542
543      return serializedJob;
544    }
545
[4173]546    private JobItem GetJobItemById(Guid jobId) {
547      return jobItems.Single(x => x.JobDto.Id == jobId);
548    }
549    #endregion
550
551    #region Result Polling
552    public void StartResultPolling() {
553      this.stopResultsPollingPending = false;
554      this.IsPollingResults = true;
555      resultPollingThread = CreateResultPollingThread();
556      if (resultPollingThread.ThreadState != System.Threading.ThreadState.Running)
557        resultPollingThread.Start();
558    }
559
560    public void StopResultPolling() {
561      this.stopResultsPollingPending = true;
[4333]562      if (resultPollingThread != null && resultPollingThread.ThreadState == System.Threading.ThreadState.WaitSleepJoin) {
563        resultPollingThread.Interrupt();
564      }
[4173]565      this.stopResultsPollingPending = false;
566    }
567
[4170]568    private Thread CreateResultPollingThread() {
[4133]569      return new Thread(() => {
570        try {
571          do {
[4305]572            IClientFacade clientFacade = CreateStreamedClientFacade();
[4170]573            IEnumerable<Guid> jobIdsToQuery = from job in JobItems
[4264]574                                              where job.State != JobState.Finished &&
575                                              job.State != JobState.Failed
[4170]576                                              select job.JobDto.Id;
577            if (jobIdsToQuery.Count() > 0) {
578              LogMessage("Polling results for " + jobIdsToQuery.Count() + " jobs");
579              try {
[4302]580                ResponseObject<JobResultList> response = clientFacade.GetJobResults(jobIdsToQuery);
[4263]581                if (response.StatusMessage == ResponseStatus.Ok) {
[4170]582                  JobResultList jobItemList = response.Obj;
583                  UpdateJobItems(jobItemList);
[4144]584
[4170]585                  LogMessage("Polling successfully finished");
[4263]586                } else {
587                  throw new Exception(response.StatusMessage.ToString());
[4170]588                }
[4305]589              }
590              catch (Exception e) {
[4170]591                LogMessage("Polling results failed: " + e.Message);
592              }
[4316]593              finally {
594                ServiceLocator.DisposeClientFacade(clientFacade);
595              }
[4170]596              Thread.Sleep(resultPollingIntervalMs);
597            } else {
598              if (sendingJobsFinished) {
599                // all the jobs have been sent to hive, but non are to query any more (all finished or failed)
600                this.stopResultsPollingPending = true;
601              }
[4133]602            }
[4170]603          } while (!this.stopResultsPollingPending);
[4305]604        }
605        catch (ThreadInterruptedException exception) {
[4170]606          // thread has been interuppted
[4305]607        }
[4333]608        catch (Exception e) {
609          LogMessage("Result Polling Thread failed badly: " + e.Message);
610          Logger.Error("Result Polling Thread failed badly: " + e.Message);
611        }
[4305]612        finally {
[4170]613          this.IsPollingResults = false;
614        }
615      });
616    }
[4121]617
[4173]618    #endregion
[4121]619
[4173]620    #region Snapshots
[4133]621
[4121]622    private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
623      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
624      jobItem.LatestSnapshot = response;
625    }
626
[4173]627    public void RequestSnapshot(Guid jobId) {
628      Thread t = new Thread(() => {
[4333]629        IClientFacade clientFacade = null;
[4316]630        try {
[4333]631          clientFacade = CreateStreamedClientFacade();
632
[4316]633          ResponseObject<SerializedJob> response;
634          int retryCount = 0;
[4133]635
[4316]636          Response snapShotResponse = clientFacade.RequestSnapshot(jobId);
637          if (snapShotResponse.StatusMessage == ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated) {
638            // job already finished
639            Logger.Debug("HiveExperiment: Abort - GetLastResult(false)");
640            response = clientFacade.GetLastSerializedResult(jobId, false, false);
[4263]641            Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
[4316]642          } else {
643            // server sent snapshot request to client
644            // poll until snapshot is ready
645            do {
646              Thread.Sleep(snapshotPollingIntervalMs);
647              Logger.Debug("HiveExperiment: Abort - GetLastResult(true)");
648              response = clientFacade.GetLastSerializedResult(jobId, false, true);
649              Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
650              retryCount++;
651              // loop while
652              // 1. problem with communication with server
653              // 2. job result not yet ready
654            } while (
655              (retryCount < maxSnapshotRetries) && (
656              response.StatusMessage == ResponseStatus.GetLastSerializedResult_JobResultNotYetThere)
657              );
658          }
659          if (response.StatusMessage == ResponseStatus.Ok) {
660            LogMessage(jobId, "Snapshot polling successfull for job " + jobId);
661            UpdateSnapshot(response);
662          } else {
663            LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage);
664          }
[4173]665        }
[4333]666        catch (Exception e) {
667          LogMessage("RequestSnapshot Thread failed badly: " + e.Message);
668          Logger.Error("RequestSnapshot Thread failed badly: " + e.Message);
669        }
[4316]670        finally {
671          ServiceLocator.DisposeClientFacade(clientFacade);
[4173]672        }
673      });
674      t.Start();
[4170]675    }
676
[4145]677    void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) {
678      JobItem jobItem = (JobItem)sender;
679      if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) {
680        RequestSnapshot(jobItem.JobDto.Id);
681      }
682    }
683
[4173]684    #endregion
[4141]685
[4116]686    #region Required Plugin Search
687    /// <summary>
688    /// Returns a list of plugins in which the type itself and all members
689    /// of the type are declared. Objectgraph is searched recursively.
690    /// </summary>
691    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
692      HashSet<Type> types = new HashSet<Type>();
693      FindTypes(type, types, "HeuristicLab.");
694      return GetDeclaringPlugins(types);
695    }
696
697    /// <summary>
698    /// Returns the plugins (including dependencies) in which the given types are declared
699    /// </summary>
700    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
701      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
702      foreach (Type t in types) {
703        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
704      }
705      return plugins;
706    }
707
708    /// <summary>
709    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
710    /// Also searches the dependencies recursively.
711    /// </summary>
712    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
713      if (!plugins.Contains(plugin)) {
714        plugins.Add(plugin);
715        foreach (IPluginDescription dependency in plugin.Dependencies) {
716          FindDeclaringPlugins(dependency, plugins);
717        }
718      }
719    }
720
721    /// <summary>
722    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
723    /// Be aware that search is not performed on attributes
724    /// </summary>
725    /// <param name="type">the type to be searched</param>
726    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
727    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
728    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
729      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
730        types.Add(type);
731
732        // constructors
733        foreach (ConstructorInfo info in type.GetConstructors()) {
734          foreach (ParameterInfo paramInfo in info.GetParameters()) {
735            FindTypes(paramInfo.ParameterType, types, namespaceStart);
736          }
737        }
738
739        // interfaces
740        foreach (Type t in type.GetInterfaces()) {
741          FindTypes(t, types, namespaceStart);
742        }
743
744        // events
745        foreach (EventInfo info in type.GetEvents()) {
746          FindTypes(info.EventHandlerType, types, namespaceStart);
747          FindTypes(info.DeclaringType, types, namespaceStart);
748        }
749
750        // properties
751        foreach (PropertyInfo info in type.GetProperties()) {
752          FindTypes(info.PropertyType, types, namespaceStart);
753        }
754
755        // fields
756        foreach (FieldInfo info in type.GetFields()) {
757          FindTypes(info.FieldType, types, namespaceStart);
758        }
759
760        // methods
761        foreach (MethodInfo info in type.GetMethods()) {
762          foreach (ParameterInfo paramInfo in info.GetParameters()) {
763            FindTypes(paramInfo.ParameterType, types, namespaceStart);
764          }
765          FindTypes(info.ReturnType, types, namespaceStart);
766        }
767      }
768    }
769    #endregion
[4119]770
771    #region Eventhandler
772
773    public event EventHandler ExecutionTimeChanged;
774    private void OnExecutionTimeChanged() {
775      EventHandler handler = ExecutionTimeChanged;
776      if (handler != null) handler(this, EventArgs.Empty);
777    }
778
779    public event EventHandler ExecutionStateChanged;
780    private void OnExecutionStateChanged() {
[4121]781      LogMessage("ExecutionState changed to " + executionState.ToString());
[4119]782      EventHandler handler = ExecutionStateChanged;
783      if (handler != null) handler(this, EventArgs.Empty);
784    }
785
786    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
787
788    public event EventHandler Started;
789    private void OnStarted() {
[4121]790      LogMessage("Started");
[4120]791      timer.Start();
[4119]792      EventHandler handler = Started;
793      if (handler != null) handler(this, EventArgs.Empty);
794    }
795
796    public event EventHandler Stopped;
797    private void OnStopped() {
[4120]798      timer.Stop();
[4121]799      LogMessage("Stopped");
[4119]800      EventHandler handler = Stopped;
801      if (handler != null) handler(this, EventArgs.Empty);
802    }
803
804    public event EventHandler Paused;
805    private void OnPaused() {
[4120]806      timer.Stop();
[4121]807      LogMessage("Paused");
[4119]808      EventHandler handler = Paused;
809      if (handler != null) handler(this, EventArgs.Empty);
810    }
811
812    public event EventHandler Prepared;
813    protected virtual void OnPrepared() {
[4121]814      LogMessage("Prepared");
[4119]815      EventHandler handler = Prepared;
816      if (handler != null) handler(this, EventArgs.Empty);
817    }
818
819    public event EventHandler ResourceIdsChanged;
820    protected virtual void OnResourceIdsChanged() {
821      EventHandler handler = ResourceIdsChanged;
822      if (handler != null) handler(this, EventArgs.Empty);
823    }
824
825    public event EventHandler ExperimentChanged;
826    protected virtual void OnExperimentChanged() {
[4121]827      LogMessage("Experiment changed");
[4119]828      EventHandler handler = ExperimentChanged;
829      if (handler != null) handler(this, EventArgs.Empty);
830    }
831
832    public event EventHandler ServerUrlChanged;
833    protected virtual void OnServerUrlChanged() {
834      EventHandler handler = ServerUrlChanged;
835      if (handler != null) handler(this, EventArgs.Empty);
836    }
837
[4133]838    public event EventHandler IsResultsPollingChanged;
839    private void OnIsPollingResultsChanged() {
840      if (this.IsPollingResults) {
841        LogMessage("Results Polling Started");
842        timer.Start();
843      } else {
844        LogMessage("Results Polling Stopped");
845        timer.Stop();
846      }
847      EventHandler handler = IsResultsPollingChanged;
848      if (handler != null) handler(this, EventArgs.Empty);
849    }
[4145]850
[4170]851    private void RegisterJobItemListEvents() {
[4145]852      jobItems.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
853      jobItems.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
854      jobItems.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
855      jobItems.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
[4173]856      foreach (JobItem jobItem in jobItems) {
857        RegisterJobItemEvents(jobItem);
858      }
[4145]859    }
860
[4170]861    private void DeregisterJobItemListEvents() {
862      jobItems.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
863      jobItems.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
864      jobItems.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
865      jobItems.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
[4173]866      foreach (JobItem jobItem in jobItems) {
867        DeregisterJobItemEvents(jobItem);
868      }
[4145]869    }
870
871    void jobItems_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
[4170]872      UpdateJobItemEvents(e);
[4145]873    }
874
[4170]875    private void UpdateJobItemEvents(CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
876      if (e.OldItems != null) {
877        foreach (var item in e.OldItems) {
[4173]878          DeregisterJobItemEvents(item.Value);
[4170]879        }
[4145]880      }
[4170]881      if (e.Items != null) {
882        foreach (var item in e.Items) {
[4173]883          RegisterJobItemEvents(item.Value);
[4170]884        }
[4145]885      }
886    }
887
[4173]888    private void RegisterJobItemEvents(JobItem jobItem) {
889      jobItem.SnapshotRequestedStateChanged += new EventHandler(JobItem_SnapshotRequestedStateChanged);
890      jobItem.JobStateChanged += new EventHandler(JobItem_JobStateChanged);
891    }
892
893    private void DeregisterJobItemEvents(JobItem jobItem) {
894      jobItem.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
895      jobItem.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
896    }
897
[4145]898    void jobItems_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
[4170]899      UpdateJobItemEvents(e);
[4145]900    }
901
902    void jobItems_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
[4170]903      UpdateJobItemEvents(e);
[4145]904    }
905
906    void jobItems_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
907      foreach (var item in e.OldItems) {
[4170]908        item.Value.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
[4145]909        item.Value.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
910      }
911    }
[4119]912    #endregion
[4173]913
914    #region Helper Functions
[4305]915    private IClientFacade CreateClientFacade() {
[4302]916      IClientFacade clientFacade = null;
[4173]917      do {
918        try {
[4316]919          //clientFacade = ServiceLocator.CreateClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientServiceName));
920          clientFacade = ServiceLocator.CreateClientFacade(Settings.Default.HiveServerIp);
[4305]921
[4333]922        }
923        catch (EndpointNotFoundException exception) {
[4173]924          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
925          Thread.Sleep(resultPollingIntervalMs);
926        }
[4302]927      } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
928      return clientFacade;
[4173]929    }
930
[4305]931    private IClientFacade CreateStreamedClientFacade() {
932      IClientFacade clientFacade = null;
933      do {
934        try {
[4316]935          //clientFacade = ServiceLocator.CreateStreamedClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientStreamedServiceName));
936          clientFacade = ServiceLocator.CreateStreamedClientFacade(Settings.Default.HiveServerIp);
[4333]937        }
938        catch (EndpointNotFoundException exception) {
[4305]939          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
940          Thread.Sleep(resultPollingIntervalMs);
941        }
942      } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
943      return clientFacade;
944    }
945
[4173]946    private void AddRange(IDictionary<IOptimizer, IOptimizer> optimizers, IDictionary<IOptimizer, IOptimizer> childs) {
947      foreach (KeyValuePair<IOptimizer, IOptimizer> kvp in childs) {
948        optimizers.Add(kvp);
949      }
950    }
951
952    #endregion
953
954    #region Logging
955    private void LogMessage(string message) {
956      // HeuristicLab.Log is not Thread-Safe, so lock on every call
957      lock (locker) {
958        log.LogMessage(message);
959      }
960    }
961
962    private void LogMessage(Guid jobId, string message) {
963      GetJobItemById(jobId).LogMessage(message);
964      LogMessage(message + " (jobId: " + jobId + ")");
965    }
966
967    #endregion
[4116]968  }
969}
Note: See TracBrowser for help on using the repository browser.