Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4248

Last change on this file since 4248 was 4173, checked in by cneumuel, 14 years ago
  • reorganized HiveExperiment code
  • disabled snapshot-functionality... this needs more refactoring serverside
  • added short documentation which explains how to use hive
  • some minor changes
File size: 33.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Linq;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using HeuristicLab.Optimization;
28using System.Drawing;
29using HeuristicLab.Collections;
30using System.Collections.Generic;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using HeuristicLab.Persistence.Default.Xml;
34using HeuristicLab.PluginInfrastructure;
35using System.Reflection;
36using HeuristicLab.Hive.Contracts.Interfaces;
37using HeuristicLab.Hive.Contracts;
38using System.Threading;
39using HeuristicLab.Tracing;
40using HeuristicLab.Hive.JobBase;
41using System.Diagnostics;
42using System.Collections;
43using System.ServiceModel;
44
45namespace HeuristicLab.Hive.Experiment {
46  /// <summary>
47  /// An experiment which contains multiple batch runs of algorithms.
48  /// </summary>
49  [Item(itemName, itemDescription)]
50  [Creatable("Testing & Analysis")]
51  [StorableClass]
52  public class HiveExperiment : NamedItem, IExecutable {
53    private const string itemName = "Hive Experiment";
54    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
55    private const int resultPollingIntervalMs = 5000;
56    private const int snapshotPollingIntervalMs = 1000;
57    private const int maxSnapshotRetries = 20;
58    private object locker = new object();
59
60    private System.Timers.Timer timer;
61    private bool pausePending, stopPending;
62    private bool sendingJobsFinished = false;
63
64    // ensure that only 2 threads can fetch jobresults simultaniously
65    private Semaphore fetchJobSemaphore = new Semaphore(2, 2);
66
67    private bool stopResultsPollingPending = false;
68
69    private Thread resultPollingThread;
70
71    private bool isPollingResults;
72    public bool IsPollingResults {
73      get { return isPollingResults; }
74      private set {
75        if (isPollingResults != value) {
76          isPollingResults = value;
77          OnIsPollingResultsChanged();
78        }
79      }
80    }
81
82    public IEnumerable<string> ResourceGroups {
83      get {
84        if (!string.IsNullOrEmpty(resourceIds)) {
85          return resourceIds.Split(';');
86        } else {
87          return new List<string>();
88        }
89      }
90    }
91   
92    #region Storable Properties
93    [Storable]
94    private DateTime lastUpdateTime;
95
96    /// <summary>
97    /// Mapping from JobId to an optimizer.
98    /// Stores all pending optimizers. If an optimizer is finished it is removed from this collection
99    /// </summary>
100    [Storable]
101    private IDictionary<Guid, IOptimizer> pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
102
103    /// <summary>
104    /// Stores a mapping from the child-optimizer to the parent optimizer.
105    /// Needed to replace a finished optimizer in the optimizer-tree.
106    /// Only pending optmizers are stored.
107    /// </summary>
108    [Storable]
109    private IDictionary<IOptimizer, IOptimizer> parentOptimizersByPendingOptimizer = new Dictionary<IOptimizer, IOptimizer>();
110
111    [Storable]
112    private JobItemList jobItems;
113    public JobItemList JobItems {
114      get { return jobItems; }
115    }
116
117
118    [Storable]
119    private string serverUrl;
120    public string ServerUrl {
121      get { return serverUrl; }
122      set {
123        if (serverUrl != value) {
124          serverUrl = value;
125          OnServerUrlChanged();
126        }
127      }
128    }
129
130    [Storable]
131    private string resourceIds;
132    public string ResourceIds {
133      get { return resourceIds; }
134      set {
135        if (resourceIds != value) {
136          resourceIds = value;
137          OnResourceIdsChanged();
138        }
139      }
140    }
141
142    [Storable]
143    private HeuristicLab.Optimization.Experiment experiment;
144    public HeuristicLab.Optimization.Experiment Experiment {
145      get { return experiment; }
146      set {
147        if (experiment != value) {
148          experiment = value;
149          OnExperimentChanged();
150        }
151      }
152    }
153
154    [Storable]
155    private ILog log;
156    public ILog Log {
157      get { return log; }
158    }
159
160    [Storable]
161    private Core.ExecutionState executionState;
162    public ExecutionState ExecutionState {
163      get { return executionState; }
164      private set {
165        if (executionState != value) {
166          executionState = value;
167          OnExecutionStateChanged();
168        }
169      }
170    }
171
172    [Storable]
173    private TimeSpan executionTime;
174    public TimeSpan ExecutionTime {
175      get { return executionTime; }
176      private set {
177        if (executionTime != value) {
178          executionTime = value;
179          OnExecutionTimeChanged();
180        }
181      }
182    }
183    #endregion
184
185    [StorableConstructor]
186    public HiveExperiment(bool deserializing)
187      : base(deserializing) {
188    }
189
190    public HiveExperiment()
191      : base(itemName, itemDescription) {
192      this.ServerUrl = HeuristicLab.Hive.Experiment.Properties.Settings.Default.HiveServerUrl;
193      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
194      this.log = new Log();
195      pausePending = stopPending = false;
196      jobItems = new JobItemList();
197      isPollingResults = false;
198      RegisterJobItemListEvents();
199      InitTimer();
200    }
201
202    public override IDeepCloneable Clone(Cloner cloner) {
203      LogMessage("I am beeing cloned");
204      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
205      clone.resourceIds = this.resourceIds;
206      clone.serverUrl = this.serverUrl;
207      clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment);
208      clone.executionState = this.executionState;
209      clone.executionTime = this.executionTime;
210      clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
211
212      foreach (var pair in this.pendingOptimizersByJobId)
213        clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
214
215      foreach (var pair in this.parentOptimizersByPendingOptimizer)
216        clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);
217
218      clone.log = (ILog)cloner.Clone(log);
219      clone.stopPending = this.stopPending;
220      clone.pausePending = this.pausePending;
221      clone.jobItems.AddRange((JobItemList)cloner.Clone(jobItems));
222      clone.lastUpdateTime = this.lastUpdateTime;
223      clone.isPollingResults = this.isPollingResults;
224      return clone;
225    }
226
227    [StorableHook(HookType.AfterDeserialization)]
228    private void AfterDeserialization() {
229      InitTimer();
230      this.IsPollingResults = false;
231      this.stopResultsPollingPending = false;
232      RegisterJobItemListEvents();
233      LogMessage("I was deserialized.");
234    }
235
236    #region Execution Time Timer
237    private void InitTimer() {
238      timer = new System.Timers.Timer(100);
239      timer.AutoReset = true;
240      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
241    }
242
243    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
244      DateTime now = DateTime.Now;
245      ExecutionTime += now - lastUpdateTime;
246      lastUpdateTime = now;
247    }
248    #endregion
249
250    #region IExecutable Methods
251    public void Pause() {
252      throw new NotSupportedException();
253    }
254
255    public void Prepare() {
256      if (experiment != null) {
257        StopResultPolling();
258        lock (pendingOptimizersByJobId) {
259          pendingOptimizersByJobId.Clear();
260        }
261        parentOptimizersByPendingOptimizer.Clear();
262        lock (jobItems) {
263          jobItems.Clear();
264        }
265        experiment.Prepare();
266        this.ExecutionState = Core.ExecutionState.Prepared;
267        OnPrepared();
268      }
269    }
270
271    public void Start() {
272      sendingJobsFinished = false;
273      OnStarted();
274      ExecutionTime = new TimeSpan();
275      lastUpdateTime = DateTime.Now;
276      this.ExecutionState = Core.ExecutionState.Started;
277      StartResultPolling();
278
279      Thread t = new Thread(() => {
280        IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
281
282        try {
283          pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
284
285          LogMessage("Extracting jobs from Experiment");
286          parentOptimizersByPendingOptimizer = GetOptimizers(true);
287          LogMessage("Extraction of jobs from Experiment finished");
288
289          IEnumerable<string> groups = ResourceGroups;
290
291          foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) {
292            SerializedJob serializedJob = CreateSerializedJob(optimizer);
293            ResponseObject<JobDto> response = executionEngineFacade.AddJobWithGroupStrings(serializedJob, groups);
294            lock (pendingOptimizersByJobId) {
295              pendingOptimizersByJobId.Add(response.Obj.Id, optimizer);
296            }
297
298            JobItem jobItem = new JobItem() {
299              JobDto = response.Obj,
300              LatestSnapshot = null,
301              Optimizer = optimizer
302            };
303            lock (jobItems) {
304              jobItems.Add(jobItem);
305            }
306            LogMessage(jobItem.JobDto.Id, "Job sent to Hive");
307          }
308        } catch (Exception e) {
309          LogMessage("Error: Starting HiveExperiment failed: " + e.Message);
310          this.ExecutionState = Core.ExecutionState.Stopped;
311          OnStopped();
312        }
313
314        sendingJobsFinished = true;
315      });
316      t.Start();
317    }
318
319    public void Stop() {
320      this.ExecutionState = Core.ExecutionState.Stopped;
321      foreach (JobItem jobItem in jobItems) {
322        AbortJob(jobItem.JobDto.Id);
323      }
324      OnStopped();
325    }
326    #endregion
327
328    #region Optimizier Management
329    /// <summary>
330    /// Returns all optimizers in the current Experiment
331    /// </summary>
332    /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
333    /// <returns></returns>
334    private IDictionary<IOptimizer, IOptimizer> GetOptimizers(bool flatout) {
335      if (!flatout) {
336        var optimizers = new Dictionary<IOptimizer, IOptimizer>();
337        foreach (IOptimizer opt in experiment.Optimizers) {
338          optimizers.Add(experiment, opt);
339        }
340        return optimizers;
341      } else {
342        return FlatOptimizerTree(null, experiment, "");
343      }
344    }
345
346    /// <summary>
347    /// Recursively iterates all IOptimizers in the optimizer-tree and returns them.
348    ///
349    /// [chn] this could be implemented more cleanly if Experiment and BatchRun would implement an interface like:
350    /// interface IParallelizable {
351    ///   IEnumerable&lt;IOptimizer&gt; GetOptimizers();
352    /// }
353    /// </summary>
354    /// <returns>a dictionary mapping from the parent optimizer to the child optimizer</returns>
355    private IDictionary<IOptimizer, IOptimizer> FlatOptimizerTree(IOptimizer parent, IOptimizer optimizer, string prepend) {
356      IDictionary<IOptimizer, IOptimizer> optimizers = new Dictionary<IOptimizer, IOptimizer>();
357      if (optimizer is HeuristicLab.Optimization.Experiment) {
358        HeuristicLab.Optimization.Experiment experiment = optimizer as HeuristicLab.Optimization.Experiment;
359        if (this.experiment != experiment) {
360          prepend += experiment.Name + "/"; // don't prepend for top-level optimizers
361        }
362        foreach (IOptimizer opt in experiment.Optimizers) {
363          AddRange(optimizers, FlatOptimizerTree(experiment, opt, prepend));
364        }
365      } else if (optimizer is BatchRun) {
366        BatchRun batchRun = optimizer as BatchRun;
367        prepend += batchRun.Name + "/";
368        for (int i = 0; i < batchRun.Repetitions; i++) {
369          IOptimizer opt = (IOptimizer)batchRun.Algorithm.Clone();
370          opt.Name += " [" + i + "]";
371          IDictionary<IOptimizer, IOptimizer> batchOptimizers = FlatOptimizerTree(batchRun, opt, prepend);
372          AddRange(optimizers, batchOptimizers);
373        }
374      } else if (optimizer is EngineAlgorithm) {
375        optimizer.Name = prepend + optimizer.Name;
376        optimizers.Add(optimizer, parent);
377        LogMessage("Optimizer extracted: " + optimizer.Name);
378      } else {
379        Logger.Warn("Optimizer of type " + optimizers.GetType().ToString() + " unknown");
380        optimizer.Name = prepend + optimizer.Name;
381        optimizers.Add(optimizer, parent);
382        LogMessage("Optimizer extracted: " + optimizer.Name);
383      }
384      return optimizers;
385    }
386
387    private void ReplaceOptimizer(IOptimizer parentOptimizer, IOptimizer originalOptimizer, IOptimizer newOptimizer) {
388      lock (locker) {
389        if (parentOptimizer is HeuristicLab.Optimization.Experiment) {
390          HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)parentOptimizer;
391          int originalOptimizerIndex = exp.Optimizers.IndexOf(originalOptimizer);
392          exp.Optimizers[originalOptimizerIndex] = newOptimizer;
393        } else if (parentOptimizer is BatchRun) {
394          BatchRun batchRun = (BatchRun)parentOptimizer;
395          if (newOptimizer is IAlgorithm) {
396            batchRun.Runs.Add(new Run(newOptimizer.Name, (IAlgorithm)newOptimizer));
397          } else {
398            throw new NotSupportedException("Only IAlgorithm types supported");
399          }
400        } else {
401          throw new NotSupportedException("Invalid parentOptimizer");
402        }
403      }
404    }
405
406    private bool NoMorePendingOptimizers() {
407      lock (pendingOptimizersByJobId) {
408        return pendingOptimizersByJobId.Count == 0;
409      }
410    }
411
412    /// <summary>
413    /// Removes optimizers from
414    ///  - parentOptimizersByPendingOptimizer
415    ///  - pendingOptimizersByJobId
416    /// </summary>
417    /// <param name="jobId"></param>
418    private void DisposeOptimizerMappings(Guid jobId) {
419      lock (pendingOptimizersByJobId) {
420        LogMessage(jobId, "Disposing Optimizer Mappings");
421        parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]);
422        pendingOptimizersByJobId.Remove(jobId);
423      }
424    }
425
426    #endregion
427
428    #region Job Management
429    /// <summary>
430    /// Updates all JobItems with the results
431    /// </summary>
432    /// <param name="jobResultList"></param>
433    private void UpdateJobItems(JobResultList jobResultList) {
434      // use a Dict to avoid quadratic runtime complexity
435      IDictionary<Guid, JobResult> jobResultDict = jobResultList.ToDictionary(job => job.JobId);
436      lock (jobItems) {
437        foreach (JobItem jobItem in JobItems) {
438          if (jobResultDict.ContainsKey(jobItem.JobDto.Id)) {
439            jobItem.JobResult = jobResultDict[jobItem.JobDto.Id];
440          }
441        }
442      }
443    }
444
445    private void JobItem_JobStateChanged(object sender, EventArgs e) {
446      JobItem jobItem = (JobItem)sender;
447      Thread t = new Thread(() => {
448        if (jobItem.State == State.Finished) {
449          FetchAndUpdateJob(jobItem.JobDto.Id);
450          DisposeOptimizerMappings(jobItem.JobDto.Id);
451        } else if (jobItem.State == State.Failed) {
452          DisposeOptimizerMappings(jobItem.JobDto.Id);
453        }
454
455        if (NoMorePendingOptimizers()) {
456          StopResultPolling();
457          this.ExecutionState = Core.ExecutionState.Stopped;
458          OnStopped();
459        }
460      });
461      t.Start();
462    }
463
464    /// <summary>
465    /// Fetches the finished job from the server and updates the jobItem
466    /// </summary>
467    private void FetchAndUpdateJob(Guid jobId) {
468      LogMessage(jobId, "FetchAndUpdateJob started");
469      IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
470      IOptimizer originalOptimizer;
471      lock (pendingOptimizersByJobId) {
472        originalOptimizer = pendingOptimizersByJobId[jobId];
473      }
474
475      fetchJobSemaphore.WaitOne();
476      ResponseObject<SerializedJob> jobResponse = executionEngineFacade.GetLastSerializedResult(jobId, false, false);
477      IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
478      IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
479
480      ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer);
481      fetchJobSemaphore.Release();
482      LogMessage(jobId, "FetchAndUpdateJob ended");
483    }
484
485    private void UpdateJobItem(JobDto jobDto) {
486      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);
487      jobItem.JobDto = jobDto;
488    }
489
490    public void AbortJob(Guid jobId) {
491      IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
492      Response response = executionEngineFacade.AbortJob(jobId);
493      LogMessage(jobId, "Aborting Job: " + response.StatusMessage);
494    }
495
496    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
497      IJob job = new OptimizerJob() {
498        Optimizer = optimizer
499      };
500
501      // serialize job
502      MemoryStream memStream = new MemoryStream();
503      XmlGenerator.Serialize(job, memStream);
504      byte[] jobByteArray = memStream.ToArray();
505      memStream.Dispose();
506
507      // find out which which plugins are needed for the given object
508      List<HivePluginInfoDto> pluginsNeeded = (
509        from p in GetDeclaringPlugins(job.GetType())
510        select new HivePluginInfoDto() {
511          Name = p.Name,
512          Version = p.Version
513        }).ToList();
514
515      JobDto jobDto = new JobDto() {
516        CoresNeeded = 1, // [chn] how to determine real cores needed?
517        PluginsNeeded = pluginsNeeded,
518        State = State.Offline,
519        MemoryNeeded = 0,
520        UserId = Guid.Empty // [chn] set real userid here!
521      };
522
523      SerializedJob serializedJob = new SerializedJob() {
524        JobInfo = jobDto,
525        SerializedJobData = jobByteArray
526      };
527
528      return serializedJob;
529    }
530
531    private JobItem GetJobItemById(Guid jobId) {
532      return jobItems.Single(x => x.JobDto.Id == jobId);
533    }
534    #endregion
535
536    #region Result Polling
537    public void StartResultPolling() {
538      this.stopResultsPollingPending = false;
539      this.IsPollingResults = true;
540      resultPollingThread = CreateResultPollingThread();
541      if (resultPollingThread.ThreadState != System.Threading.ThreadState.Running)
542        resultPollingThread.Start();
543    }
544
545    public void StopResultPolling() {
546      this.stopResultsPollingPending = true;
547      resultPollingThread.Interrupt();
548      this.stopResultsPollingPending = false;
549    }
550
551    private Thread CreateResultPollingThread() {
552      return new Thread(() => {
553        try {
554          do {
555            IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
556            IEnumerable<Guid> jobIdsToQuery = from job in JobItems
557                                              where job.State != State.Finished &&
558                                              job.State != State.Failed
559                                              select job.JobDto.Id;
560            if (jobIdsToQuery.Count() > 0) {
561              LogMessage("Polling results for " + jobIdsToQuery.Count() + " jobs");
562              try {
563                ResponseObject<JobResultList> response = executionEngineFacade.GetAllJobResults(jobIdsToQuery);
564                if (response.Success) {
565                  JobResultList jobItemList = response.Obj;
566                  UpdateJobItems(jobItemList);
567
568                  LogMessage("Polling successfully finished");
569                }
570              } catch (Exception e) {
571                LogMessage("Polling results failed: " + e.Message);
572              }
573              Thread.Sleep(resultPollingIntervalMs);
574            } else {
575              if (sendingJobsFinished) {
576                // all the jobs have been sent to hive, but non are to query any more (all finished or failed)
577                this.stopResultsPollingPending = true;
578              }
579            }
580          } while (!this.stopResultsPollingPending);
581        } catch (ThreadInterruptedException exception) {
582          // thread has been interuppted
583        } finally {
584          this.IsPollingResults = false;
585        }
586      });
587    }
588
589    #endregion
590
591    #region Snapshots
592
593    private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
594      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
595      jobItem.LatestSnapshot = response;
596    }
597
598    public void RequestSnapshot(Guid jobId) {
599      Thread t = new Thread(() => {
600        IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
601        ResponseObject<SerializedJob> response;
602        int retryCount = 0;
603
604        Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
605        if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
606          // job already finished
607          Logger.Debug("HiveExperiment: Abort - GetLastResult(false)");
608          response = executionEngineFacade.GetLastSerializedResult(jobId, false, false);
609          Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
610        } else {
611          // server sent snapshot request to client
612          // poll until snapshot is ready
613          do {
614            Thread.Sleep(snapshotPollingIntervalMs);
615            Logger.Debug("HiveExperiment: Abort - GetLastResult(true)");
616            response = executionEngineFacade.GetLastSerializedResult(jobId, false, true);
617            Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
618            retryCount++;
619            // loop while
620            // 1. problem with communication with server
621            // 2. job result not yet ready
622          } while (
623            (retryCount < maxSnapshotRetries) && (
624            !response.Success ||
625            response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE)
626            );
627        }
628        if (response.Success) {
629          LogMessage(jobId, "Snapshot polling successfull for job " + jobId);
630          UpdateSnapshot(response);
631        } else {
632          LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage);
633        }
634      });
635      t.Start();
636    }
637
638    void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) {
639      JobItem jobItem = (JobItem)sender;
640      if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) {
641        RequestSnapshot(jobItem.JobDto.Id);
642      }
643    }
644
645    #endregion
646
647    #region Required Plugin Search
648    /// <summary>
649    /// Returns a list of plugins in which the type itself and all members
650    /// of the type are declared. Objectgraph is searched recursively.
651    /// </summary>
652    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
653      HashSet<Type> types = new HashSet<Type>();
654      FindTypes(type, types, "HeuristicLab.");
655      return GetDeclaringPlugins(types);
656    }
657
658    /// <summary>
659    /// Returns the plugins (including dependencies) in which the given types are declared
660    /// </summary>
661    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
662      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
663      foreach (Type t in types) {
664        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
665      }
666      return plugins;
667    }
668
669    /// <summary>
670    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
671    /// Also searches the dependencies recursively.
672    /// </summary>
673    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
674      if (!plugins.Contains(plugin)) {
675        plugins.Add(plugin);
676        foreach (IPluginDescription dependency in plugin.Dependencies) {
677          FindDeclaringPlugins(dependency, plugins);
678        }
679      }
680    }
681
682    /// <summary>
683    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
684    /// Be aware that search is not performed on attributes
685    /// </summary>
686    /// <param name="type">the type to be searched</param>
687    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
688    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
689    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
690      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
691        types.Add(type);
692
693        // constructors
694        foreach (ConstructorInfo info in type.GetConstructors()) {
695          foreach (ParameterInfo paramInfo in info.GetParameters()) {
696            FindTypes(paramInfo.ParameterType, types, namespaceStart);
697          }
698        }
699
700        // interfaces
701        foreach (Type t in type.GetInterfaces()) {
702          FindTypes(t, types, namespaceStart);
703        }
704
705        // events
706        foreach (EventInfo info in type.GetEvents()) {
707          FindTypes(info.EventHandlerType, types, namespaceStart);
708          FindTypes(info.DeclaringType, types, namespaceStart);
709        }
710
711        // properties
712        foreach (PropertyInfo info in type.GetProperties()) {
713          FindTypes(info.PropertyType, types, namespaceStart);
714        }
715
716        // fields
717        foreach (FieldInfo info in type.GetFields()) {
718          FindTypes(info.FieldType, types, namespaceStart);
719        }
720
721        // methods
722        foreach (MethodInfo info in type.GetMethods()) {
723          foreach (ParameterInfo paramInfo in info.GetParameters()) {
724            FindTypes(paramInfo.ParameterType, types, namespaceStart);
725          }
726          FindTypes(info.ReturnType, types, namespaceStart);
727        }
728      }
729    }
730    #endregion
731
732    #region Eventhandler
733
734    public event EventHandler ExecutionTimeChanged;
735    private void OnExecutionTimeChanged() {
736      EventHandler handler = ExecutionTimeChanged;
737      if (handler != null) handler(this, EventArgs.Empty);
738    }
739
740    public event EventHandler ExecutionStateChanged;
741    private void OnExecutionStateChanged() {
742      LogMessage("ExecutionState changed to " + executionState.ToString());
743      EventHandler handler = ExecutionStateChanged;
744      if (handler != null) handler(this, EventArgs.Empty);
745    }
746
747    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
748
749    public event EventHandler Started;
750    private void OnStarted() {
751      LogMessage("Started");
752      timer.Start();
753      EventHandler handler = Started;
754      if (handler != null) handler(this, EventArgs.Empty);
755    }
756
757    public event EventHandler Stopped;
758    private void OnStopped() {
759      timer.Stop();
760      LogMessage("Stopped");
761      EventHandler handler = Stopped;
762      if (handler != null) handler(this, EventArgs.Empty);
763    }
764
765    public event EventHandler Paused;
766    private void OnPaused() {
767      timer.Stop();
768      LogMessage("Paused");
769      EventHandler handler = Paused;
770      if (handler != null) handler(this, EventArgs.Empty);
771    }
772
773    public event EventHandler Prepared;
774    protected virtual void OnPrepared() {
775      LogMessage("Prepared");
776      EventHandler handler = Prepared;
777      if (handler != null) handler(this, EventArgs.Empty);
778    }
779
780    public event EventHandler ResourceIdsChanged;
781    protected virtual void OnResourceIdsChanged() {
782      EventHandler handler = ResourceIdsChanged;
783      if (handler != null) handler(this, EventArgs.Empty);
784    }
785
786    public event EventHandler ExperimentChanged;
787    protected virtual void OnExperimentChanged() {
788      LogMessage("Experiment changed");
789      EventHandler handler = ExperimentChanged;
790      if (handler != null) handler(this, EventArgs.Empty);
791    }
792
793    public event EventHandler ServerUrlChanged;
794    protected virtual void OnServerUrlChanged() {
795      EventHandler handler = ServerUrlChanged;
796      if (handler != null) handler(this, EventArgs.Empty);
797    }
798
799    public event EventHandler IsResultsPollingChanged;
800    private void OnIsPollingResultsChanged() {
801      if (this.IsPollingResults) {
802        LogMessage("Results Polling Started");
803        timer.Start();
804      } else {
805        LogMessage("Results Polling Stopped");
806        timer.Stop();
807      }
808      EventHandler handler = IsResultsPollingChanged;
809      if (handler != null) handler(this, EventArgs.Empty);
810    }
811
812    private void RegisterJobItemListEvents() {
813      jobItems.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
814      jobItems.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
815      jobItems.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
816      jobItems.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
817      foreach (JobItem jobItem in jobItems) {
818        RegisterJobItemEvents(jobItem);
819      }
820    }
821
822    private void DeregisterJobItemListEvents() {
823      jobItems.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
824      jobItems.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
825      jobItems.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
826      jobItems.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
827      foreach (JobItem jobItem in jobItems) {
828        DeregisterJobItemEvents(jobItem);
829      }
830    }
831
832    void jobItems_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
833      UpdateJobItemEvents(e);
834    }
835
836    private void UpdateJobItemEvents(CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
837      if (e.OldItems != null) {
838        foreach (var item in e.OldItems) {
839          DeregisterJobItemEvents(item.Value);
840        }
841      }
842      if (e.Items != null) {
843        foreach (var item in e.Items) {
844          RegisterJobItemEvents(item.Value);
845        }
846      }
847    }
848
849    private void RegisterJobItemEvents(JobItem jobItem) {
850      jobItem.SnapshotRequestedStateChanged += new EventHandler(JobItem_SnapshotRequestedStateChanged);
851      jobItem.JobStateChanged += new EventHandler(JobItem_JobStateChanged);
852    }
853
854    private void DeregisterJobItemEvents(JobItem jobItem) {
855      jobItem.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
856      jobItem.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
857    }
858
859    void jobItems_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
860      UpdateJobItemEvents(e);
861    }
862
863    void jobItems_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
864      UpdateJobItemEvents(e);
865    }
866
867    void jobItems_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
868      foreach (var item in e.OldItems) {
869        item.Value.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
870        item.Value.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
871      }
872    }
873    #endregion
874
875    #region Helper Functions
876    private IExecutionEngineFacade GetExecutionEngineFacade() {
877      IExecutionEngineFacade executionEngineFacade = null;
878      do {
879        try {
880          executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl);
881        } catch (EndpointNotFoundException exception) {
882          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
883          Thread.Sleep(resultPollingIntervalMs);
884        }
885      } while (executionEngineFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
886      return executionEngineFacade;
887    }
888
889
890    private void AddRange(IDictionary<IOptimizer, IOptimizer> optimizers, IDictionary<IOptimizer, IOptimizer> childs) {
891      foreach (KeyValuePair<IOptimizer, IOptimizer> kvp in childs) {
892        optimizers.Add(kvp);
893      }
894    }
895
896    #endregion
897
898    #region Logging
899    private void LogMessage(string message) {
900      // HeuristicLab.Log is not Thread-Safe, so lock on every call
901      lock (locker) {
902        log.LogMessage(message);
903      }
904    }
905
906    private void LogMessage(Guid jobId, string message) {
907      GetJobItemById(jobId).LogMessage(message);
908      LogMessage(message + " (jobId: " + jobId + ")");
909    }
910
911    #endregion
912  }
913}
Note: See TracBrowser for help on using the repository browser.