source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4264

Last change on this file since 4264 was 4264, checked in by cneumuel, 12 years ago

Split up "State" to "JobState" and "SlaveState" (#1159)

File size: 33.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Linq;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using HeuristicLab.Optimization;
28using System.Drawing;
29using HeuristicLab.Collections;
30using System.Collections.Generic;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using HeuristicLab.Persistence.Default.Xml;
34using HeuristicLab.PluginInfrastructure;
35using System.Reflection;
36using HeuristicLab.Hive.Contracts.Interfaces;
37using HeuristicLab.Hive.Contracts;
38using System.Threading;
39using HeuristicLab.Tracing;
40using HeuristicLab.Hive.JobBase;
41using System.Diagnostics;
42using System.Collections;
43using System.ServiceModel;
44using HeuristicLab.Hive.Contracts.ResponseObjects;
45
46namespace HeuristicLab.Hive.Experiment {
47  /// <summary>
48  /// An experiment which contains multiple batch runs of algorithms.
49  /// </summary>
50  [Item(itemName, itemDescription)]
51  [Creatable("Testing & Analysis")]
52  [StorableClass]
53  public class HiveExperiment : NamedItem, IExecutable {
54    private const string itemName = "Hive Experiment";
55    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
56    private const int resultPollingIntervalMs = 5000;
57    private const int snapshotPollingIntervalMs = 1000;
58    private const int maxSnapshotRetries = 20;
59    private object locker = new object();
60
61    private System.Timers.Timer timer;
62    private bool pausePending, stopPending;
63    private bool sendingJobsFinished = false;
64
65    // ensure that only 2 threads can fetch jobresults simultaniously
66    private Semaphore fetchJobSemaphore = new Semaphore(2, 2);
67
68    private bool stopResultsPollingPending = false;
69
70    private Thread resultPollingThread;
71
72    private bool isPollingResults;
73    public bool IsPollingResults {
74      get { return isPollingResults; }
75      private set {
76        if (isPollingResults != value) {
77          isPollingResults = value;
78          OnIsPollingResultsChanged();
79        }
80      }
81    }
82
83    public IEnumerable<string> ResourceGroups {
84      get {
85        if (!string.IsNullOrEmpty(resourceIds)) {
86          return resourceIds.Split(';');
87        } else {
88          return new List<string>();
89        }
90      }
91    }
92   
93    #region Storable Properties
94    [Storable]
95    private DateTime lastUpdateTime;
96
97    /// <summary>
98    /// Mapping from JobId to an optimizer.
99    /// Stores all pending optimizers. If an optimizer is finished it is removed from this collection
100    /// </summary>
101    [Storable]
102    private IDictionary<Guid, IOptimizer> pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
103
104    /// <summary>
105    /// Stores a mapping from the child-optimizer to the parent optimizer.
106    /// Needed to replace a finished optimizer in the optimizer-tree.
107    /// Only pending optmizers are stored.
108    /// </summary>
109    [Storable]
110    private IDictionary<IOptimizer, IOptimizer> parentOptimizersByPendingOptimizer = new Dictionary<IOptimizer, IOptimizer>();
111
112    [Storable]
113    private JobItemList jobItems;
114    public JobItemList JobItems {
115      get { return jobItems; }
116    }
117
118
119    [Storable]
120    private string serverUrl;
121    public string ServerUrl {
122      get { return serverUrl; }
123      set {
124        if (serverUrl != value) {
125          serverUrl = value;
126          OnServerUrlChanged();
127        }
128      }
129    }
130
131    [Storable]
132    private string resourceIds;
133    public string ResourceIds {
134      get { return resourceIds; }
135      set {
136        if (resourceIds != value) {
137          resourceIds = value;
138          OnResourceIdsChanged();
139        }
140      }
141    }
142
143    [Storable]
144    private HeuristicLab.Optimization.Experiment experiment;
145    public HeuristicLab.Optimization.Experiment Experiment {
146      get { return experiment; }
147      set {
148        if (experiment != value) {
149          experiment = value;
150          OnExperimentChanged();
151        }
152      }
153    }
154
155    [Storable]
156    private ILog log;
157    public ILog Log {
158      get { return log; }
159    }
160
161    [Storable]
162    private Core.ExecutionState executionState;
163    public ExecutionState ExecutionState {
164      get { return executionState; }
165      private set {
166        if (executionState != value) {
167          executionState = value;
168          OnExecutionStateChanged();
169        }
170      }
171    }
172
173    [Storable]
174    private TimeSpan executionTime;
175    public TimeSpan ExecutionTime {
176      get { return executionTime; }
177      private set {
178        if (executionTime != value) {
179          executionTime = value;
180          OnExecutionTimeChanged();
181        }
182      }
183    }
184    #endregion
185
186    [StorableConstructor]
187    public HiveExperiment(bool deserializing)
188      : base(deserializing) {
189    }
190
191    public HiveExperiment()
192      : base(itemName, itemDescription) {
193      this.ServerUrl = HeuristicLab.Hive.Experiment.Properties.Settings.Default.HiveServerUrl;
194      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
195      this.log = new Log();
196      pausePending = stopPending = false;
197      jobItems = new JobItemList();
198      isPollingResults = false;
199      RegisterJobItemListEvents();
200      InitTimer();
201    }
202
203    public override IDeepCloneable Clone(Cloner cloner) {
204      LogMessage("I am beeing cloned");
205      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
206      clone.resourceIds = this.resourceIds;
207      clone.serverUrl = this.serverUrl;
208      clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment);
209      clone.executionState = this.executionState;
210      clone.executionTime = this.executionTime;
211      clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
212
213      foreach (var pair in this.pendingOptimizersByJobId)
214        clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
215
216      foreach (var pair in this.parentOptimizersByPendingOptimizer)
217        clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);
218
219      clone.log = (ILog)cloner.Clone(log);
220      clone.stopPending = this.stopPending;
221      clone.pausePending = this.pausePending;
222      clone.jobItems.AddRange((JobItemList)cloner.Clone(jobItems));
223      clone.lastUpdateTime = this.lastUpdateTime;
224      clone.isPollingResults = this.isPollingResults;
225      return clone;
226    }
227
228    [StorableHook(HookType.AfterDeserialization)]
229    private void AfterDeserialization() {
230      InitTimer();
231      this.IsPollingResults = false;
232      this.stopResultsPollingPending = false;
233      RegisterJobItemListEvents();
234      LogMessage("I was deserialized.");
235    }
236
237    #region Execution Time Timer
238    private void InitTimer() {
239      timer = new System.Timers.Timer(100);
240      timer.AutoReset = true;
241      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
242    }
243
244    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
245      DateTime now = DateTime.Now;
246      ExecutionTime += now - lastUpdateTime;
247      lastUpdateTime = now;
248    }
249    #endregion
250
251    #region IExecutable Methods
252    public void Pause() {
253      throw new NotSupportedException();
254    }
255
256    public void Prepare() {
257      if (experiment != null) {
258        StopResultPolling();
259        lock (pendingOptimizersByJobId) {
260          pendingOptimizersByJobId.Clear();
261        }
262        parentOptimizersByPendingOptimizer.Clear();
263        lock (jobItems) {
264          jobItems.Clear();
265        }
266        experiment.Prepare();
267        this.ExecutionState = Core.ExecutionState.Prepared;
268        OnPrepared();
269      }
270    }
271
272    public void Start() {
273      sendingJobsFinished = false;
274      OnStarted();
275      ExecutionTime = new TimeSpan();
276      lastUpdateTime = DateTime.Now;
277      this.ExecutionState = Core.ExecutionState.Started;
278      StartResultPolling();
279
280      Thread t = new Thread(() => {
281        IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
282
283        try {
284          pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
285
286          LogMessage("Extracting jobs from Experiment");
287          parentOptimizersByPendingOptimizer = GetOptimizers(true);
288          LogMessage("Extraction of jobs from Experiment finished");
289
290          IEnumerable<string> groups = ResourceGroups;
291
292          foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) {
293            SerializedJob serializedJob = CreateSerializedJob(optimizer);
294            ResponseObject<JobDto> response = executionEngineFacade.AddJobWithGroupStrings(serializedJob, groups);
295            lock (pendingOptimizersByJobId) {
296              pendingOptimizersByJobId.Add(response.Obj.Id, optimizer);
297            }
298
299            JobItem jobItem = new JobItem() {
300              JobDto = response.Obj,
301              LatestSnapshot = null,
302              Optimizer = optimizer
303            };
304            lock (jobItems) {
305              jobItems.Add(jobItem);
306            }
307            LogMessage(jobItem.JobDto.Id, "Job sent to Hive");
308          }
309        } catch (Exception e) {
310          LogMessage("Error: Starting HiveExperiment failed: " + e.Message);
311          this.ExecutionState = Core.ExecutionState.Stopped;
312          OnStopped();
313        }
314
315        sendingJobsFinished = true;
316      });
317      t.Start();
318    }
319
320    public void Stop() {
321      this.ExecutionState = Core.ExecutionState.Stopped;
322      foreach (JobItem jobItem in jobItems) {
323        AbortJob(jobItem.JobDto.Id);
324      }
325      OnStopped();
326    }
327    #endregion
328
329    #region Optimizier Management
330    /// <summary>
331    /// Returns all optimizers in the current Experiment
332    /// </summary>
333    /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
334    /// <returns></returns>
335    private IDictionary<IOptimizer, IOptimizer> GetOptimizers(bool flatout) {
336      if (!flatout) {
337        var optimizers = new Dictionary<IOptimizer, IOptimizer>();
338        foreach (IOptimizer opt in experiment.Optimizers) {
339          optimizers.Add(experiment, opt);
340        }
341        return optimizers;
342      } else {
343        return FlatOptimizerTree(null, experiment, "");
344      }
345    }
346
347    /// <summary>
348    /// Recursively iterates all IOptimizers in the optimizer-tree and returns them.
349    ///
350    /// [chn] this could be implemented more cleanly if Experiment and BatchRun would implement an interface like:
351    /// interface IParallelizable {
352    ///   IEnumerable&lt;IOptimizer&gt; GetOptimizers();
353    /// }
354    /// </summary>
355    /// <returns>a dictionary mapping from the parent optimizer to the child optimizer</returns>
356    private IDictionary<IOptimizer, IOptimizer> FlatOptimizerTree(IOptimizer parent, IOptimizer optimizer, string prepend) {
357      IDictionary<IOptimizer, IOptimizer> optimizers = new Dictionary<IOptimizer, IOptimizer>();
358      if (optimizer is HeuristicLab.Optimization.Experiment) {
359        HeuristicLab.Optimization.Experiment experiment = optimizer as HeuristicLab.Optimization.Experiment;
360        if (this.experiment != experiment) {
361          prepend += experiment.Name + "/"; // don't prepend for top-level optimizers
362        }
363        foreach (IOptimizer opt in experiment.Optimizers) {
364          AddRange(optimizers, FlatOptimizerTree(experiment, opt, prepend));
365        }
366      } else if (optimizer is BatchRun) {
367        BatchRun batchRun = optimizer as BatchRun;
368        prepend += batchRun.Name + "/";
369        for (int i = 0; i < batchRun.Repetitions; i++) {
370          IOptimizer opt = (IOptimizer)batchRun.Algorithm.Clone();
371          opt.Name += " [" + i + "]";
372          IDictionary<IOptimizer, IOptimizer> batchOptimizers = FlatOptimizerTree(batchRun, opt, prepend);
373          AddRange(optimizers, batchOptimizers);
374        }
375      } else if (optimizer is EngineAlgorithm) {
376        optimizer.Name = prepend + optimizer.Name;
377        optimizers.Add(optimizer, parent);
378        LogMessage("Optimizer extracted: " + optimizer.Name);
379      } else {
380        Logger.Warn("Optimizer of type " + optimizers.GetType().ToString() + " unknown");
381        optimizer.Name = prepend + optimizer.Name;
382        optimizers.Add(optimizer, parent);
383        LogMessage("Optimizer extracted: " + optimizer.Name);
384      }
385      return optimizers;
386    }
387
388    private void ReplaceOptimizer(IOptimizer parentOptimizer, IOptimizer originalOptimizer, IOptimizer newOptimizer) {
389      lock (locker) {
390        if (parentOptimizer is HeuristicLab.Optimization.Experiment) {
391          HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)parentOptimizer;
392          int originalOptimizerIndex = exp.Optimizers.IndexOf(originalOptimizer);
393          exp.Optimizers[originalOptimizerIndex] = newOptimizer;
394        } else if (parentOptimizer is BatchRun) {
395          BatchRun batchRun = (BatchRun)parentOptimizer;
396          if (newOptimizer is IAlgorithm) {
397            batchRun.Runs.Add(new Run(newOptimizer.Name, (IAlgorithm)newOptimizer));
398          } else {
399            throw new NotSupportedException("Only IAlgorithm types supported");
400          }
401        } else {
402          throw new NotSupportedException("Invalid parentOptimizer");
403        }
404      }
405    }
406
407    private bool NoMorePendingOptimizers() {
408      lock (pendingOptimizersByJobId) {
409        return pendingOptimizersByJobId.Count == 0;
410      }
411    }
412
413    /// <summary>
414    /// Removes optimizers from
415    ///  - parentOptimizersByPendingOptimizer
416    ///  - pendingOptimizersByJobId
417    /// </summary>
418    /// <param name="jobId"></param>
419    private void DisposeOptimizerMappings(Guid jobId) {
420      lock (pendingOptimizersByJobId) {
421        LogMessage(jobId, "Disposing Optimizer Mappings");
422        parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]);
423        pendingOptimizersByJobId.Remove(jobId);
424      }
425    }
426
427    #endregion
428
429    #region Job Management
430    /// <summary>
431    /// Updates all JobItems with the results
432    /// </summary>
433    /// <param name="jobResultList"></param>
434    private void UpdateJobItems(JobResultList jobResultList) {
435      // use a Dict to avoid quadratic runtime complexity
436      IDictionary<Guid, JobResult> jobResultDict = jobResultList.ToDictionary(job => job.JobId);
437      lock (jobItems) {
438        foreach (JobItem jobItem in JobItems) {
439          if (jobResultDict.ContainsKey(jobItem.JobDto.Id)) {
440            jobItem.JobResult = jobResultDict[jobItem.JobDto.Id];
441          }
442        }
443      }
444    }
445
446    private void JobItem_JobStateChanged(object sender, EventArgs e) {
447      JobItem jobItem = (JobItem)sender;
448      Thread t = new Thread(() => {
449        if (jobItem.State == JobState.Finished) {
450          FetchAndUpdateJob(jobItem.JobDto.Id);
451          DisposeOptimizerMappings(jobItem.JobDto.Id);
452        } else if (jobItem.State == JobState.Failed) {
453          DisposeOptimizerMappings(jobItem.JobDto.Id);
454        }
455
456        if (NoMorePendingOptimizers()) {
457          StopResultPolling();
458          this.ExecutionState = Core.ExecutionState.Stopped;
459          OnStopped();
460        }
461      });
462      t.Start();
463    }
464
465    /// <summary>
466    /// Fetches the finished job from the server and updates the jobItem
467    /// </summary>
468    private void FetchAndUpdateJob(Guid jobId) {
469      LogMessage(jobId, "FetchAndUpdateJob started");
470      IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
471      IOptimizer originalOptimizer;
472      lock (pendingOptimizersByJobId) {
473        originalOptimizer = pendingOptimizersByJobId[jobId];
474      }
475
476      fetchJobSemaphore.WaitOne();
477      ResponseObject<SerializedJob> jobResponse = executionEngineFacade.GetLastSerializedResult(jobId, false, false);
478      IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
479      IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
480
481      ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer);
482      fetchJobSemaphore.Release();
483      LogMessage(jobId, "FetchAndUpdateJob ended");
484    }
485
486    private void UpdateJobItem(JobDto jobDto) {
487      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);
488      jobItem.JobDto = jobDto;
489    }
490
491    public void AbortJob(Guid jobId) {
492      IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
493      Response response = executionEngineFacade.AbortJob(jobId);
494      LogMessage(jobId, "Aborting Job: " + response.StatusMessage);
495    }
496
497    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
498      IJob job = new OptimizerJob() {
499        Optimizer = optimizer
500      };
501
502      // serialize job
503      MemoryStream memStream = new MemoryStream();
504      XmlGenerator.Serialize(job, memStream);
505      byte[] jobByteArray = memStream.ToArray();
506      memStream.Dispose();
507
508      // find out which which plugins are needed for the given object
509      List<HivePluginInfoDto> pluginsNeeded = (
510        from p in GetDeclaringPlugins(job.GetType())
511        select new HivePluginInfoDto() {
512          Name = p.Name,
513          Version = p.Version
514        }).ToList();
515
516      JobDto jobDto = new JobDto() {
517        CoresNeeded = 1, // [chn] how to determine real cores needed?
518        PluginsNeeded = pluginsNeeded,
519        State = JobState.Offline,
520        MemoryNeeded = 0,
521        UserId = Guid.Empty // [chn] set real userid here!
522      };
523
524      SerializedJob serializedJob = new SerializedJob() {
525        JobInfo = jobDto,
526        SerializedJobData = jobByteArray
527      };
528
529      return serializedJob;
530    }
531
532    private JobItem GetJobItemById(Guid jobId) {
533      return jobItems.Single(x => x.JobDto.Id == jobId);
534    }
535    #endregion
536
537    #region Result Polling
538    public void StartResultPolling() {
539      this.stopResultsPollingPending = false;
540      this.IsPollingResults = true;
541      resultPollingThread = CreateResultPollingThread();
542      if (resultPollingThread.ThreadState != System.Threading.ThreadState.Running)
543        resultPollingThread.Start();
544    }
545
546    public void StopResultPolling() {
547      this.stopResultsPollingPending = true;
548      resultPollingThread.Interrupt();
549      this.stopResultsPollingPending = false;
550    }
551
552    private Thread CreateResultPollingThread() {
553      return new Thread(() => {
554        try {
555          do {
556            IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
557            IEnumerable<Guid> jobIdsToQuery = from job in JobItems
558                                              where job.State != JobState.Finished &&
559                                              job.State != JobState.Failed
560                                              select job.JobDto.Id;
561            if (jobIdsToQuery.Count() > 0) {
562              LogMessage("Polling results for " + jobIdsToQuery.Count() + " jobs");
563              try {
564                ResponseObject<JobResultList> response = executionEngineFacade.GetJobResults(jobIdsToQuery);
565                if (response.StatusMessage == ResponseStatus.Ok) {
566                  JobResultList jobItemList = response.Obj;
567                  UpdateJobItems(jobItemList);
568
569                  LogMessage("Polling successfully finished");
570                } else {
571                  throw new Exception(response.StatusMessage.ToString());
572                }
573              } catch (Exception e) {
574                LogMessage("Polling results failed: " + e.Message);
575              }
576              Thread.Sleep(resultPollingIntervalMs);
577            } else {
578              if (sendingJobsFinished) {
579                // all the jobs have been sent to hive, but non are to query any more (all finished or failed)
580                this.stopResultsPollingPending = true;
581              }
582            }
583          } while (!this.stopResultsPollingPending);
584        } catch (ThreadInterruptedException exception) {
585          // thread has been interuppted
586        } finally {
587          this.IsPollingResults = false;
588        }
589      });
590    }
591
592    #endregion
593
594    #region Snapshots
595
596    private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
597      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
598      jobItem.LatestSnapshot = response;
599    }
600
601    public void RequestSnapshot(Guid jobId) {
602      Thread t = new Thread(() => {
603        IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
604        ResponseObject<SerializedJob> response;
605        int retryCount = 0;
606
607        Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
608        if (snapShotResponse.StatusMessage == ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated) {
609          // job already finished
610          Logger.Debug("HiveExperiment: Abort - GetLastResult(false)");
611          response = executionEngineFacade.GetLastSerializedResult(jobId, false, false);
612          Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
613        } else {
614          // server sent snapshot request to client
615          // poll until snapshot is ready
616          do {
617            Thread.Sleep(snapshotPollingIntervalMs);
618            Logger.Debug("HiveExperiment: Abort - GetLastResult(true)");
619            response = executionEngineFacade.GetLastSerializedResult(jobId, false, true);
620            Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
621            retryCount++;
622            // loop while
623            // 1. problem with communication with server
624            // 2. job result not yet ready
625          } while (
626            (retryCount < maxSnapshotRetries) && (
627            response.StatusMessage == ResponseStatus.GetLastSerializedResult_JobResultNotYetThere)
628            );
629        }
630        if (response.StatusMessage == ResponseStatus.Ok) {
631          LogMessage(jobId, "Snapshot polling successfull for job " + jobId);
632          UpdateSnapshot(response);
633        } else {
634          LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage);
635        }
636      });
637      t.Start();
638    }
639
640    void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) {
641      JobItem jobItem = (JobItem)sender;
642      if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) {
643        RequestSnapshot(jobItem.JobDto.Id);
644      }
645    }
646
647    #endregion
648
649    #region Required Plugin Search
650    /// <summary>
651    /// Returns a list of plugins in which the type itself and all members
652    /// of the type are declared. Objectgraph is searched recursively.
653    /// </summary>
654    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
655      HashSet<Type> types = new HashSet<Type>();
656      FindTypes(type, types, "HeuristicLab.");
657      return GetDeclaringPlugins(types);
658    }
659
660    /// <summary>
661    /// Returns the plugins (including dependencies) in which the given types are declared
662    /// </summary>
663    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
664      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
665      foreach (Type t in types) {
666        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
667      }
668      return plugins;
669    }
670
671    /// <summary>
672    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
673    /// Also searches the dependencies recursively.
674    /// </summary>
675    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
676      if (!plugins.Contains(plugin)) {
677        plugins.Add(plugin);
678        foreach (IPluginDescription dependency in plugin.Dependencies) {
679          FindDeclaringPlugins(dependency, plugins);
680        }
681      }
682    }
683
684    /// <summary>
685    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
686    /// Be aware that search is not performed on attributes
687    /// </summary>
688    /// <param name="type">the type to be searched</param>
689    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
690    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
691    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
692      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
693        types.Add(type);
694
695        // constructors
696        foreach (ConstructorInfo info in type.GetConstructors()) {
697          foreach (ParameterInfo paramInfo in info.GetParameters()) {
698            FindTypes(paramInfo.ParameterType, types, namespaceStart);
699          }
700        }
701
702        // interfaces
703        foreach (Type t in type.GetInterfaces()) {
704          FindTypes(t, types, namespaceStart);
705        }
706
707        // events
708        foreach (EventInfo info in type.GetEvents()) {
709          FindTypes(info.EventHandlerType, types, namespaceStart);
710          FindTypes(info.DeclaringType, types, namespaceStart);
711        }
712
713        // properties
714        foreach (PropertyInfo info in type.GetProperties()) {
715          FindTypes(info.PropertyType, types, namespaceStart);
716        }
717
718        // fields
719        foreach (FieldInfo info in type.GetFields()) {
720          FindTypes(info.FieldType, types, namespaceStart);
721        }
722
723        // methods
724        foreach (MethodInfo info in type.GetMethods()) {
725          foreach (ParameterInfo paramInfo in info.GetParameters()) {
726            FindTypes(paramInfo.ParameterType, types, namespaceStart);
727          }
728          FindTypes(info.ReturnType, types, namespaceStart);
729        }
730      }
731    }
732    #endregion
733
734    #region Eventhandler
735
736    public event EventHandler ExecutionTimeChanged;
737    private void OnExecutionTimeChanged() {
738      EventHandler handler = ExecutionTimeChanged;
739      if (handler != null) handler(this, EventArgs.Empty);
740    }
741
742    public event EventHandler ExecutionStateChanged;
743    private void OnExecutionStateChanged() {
744      LogMessage("ExecutionState changed to " + executionState.ToString());
745      EventHandler handler = ExecutionStateChanged;
746      if (handler != null) handler(this, EventArgs.Empty);
747    }
748
749    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
750
751    public event EventHandler Started;
752    private void OnStarted() {
753      LogMessage("Started");
754      timer.Start();
755      EventHandler handler = Started;
756      if (handler != null) handler(this, EventArgs.Empty);
757    }
758
759    public event EventHandler Stopped;
760    private void OnStopped() {
761      timer.Stop();
762      LogMessage("Stopped");
763      EventHandler handler = Stopped;
764      if (handler != null) handler(this, EventArgs.Empty);
765    }
766
767    public event EventHandler Paused;
768    private void OnPaused() {
769      timer.Stop();
770      LogMessage("Paused");
771      EventHandler handler = Paused;
772      if (handler != null) handler(this, EventArgs.Empty);
773    }
774
775    public event EventHandler Prepared;
776    protected virtual void OnPrepared() {
777      LogMessage("Prepared");
778      EventHandler handler = Prepared;
779      if (handler != null) handler(this, EventArgs.Empty);
780    }
781
782    public event EventHandler ResourceIdsChanged;
783    protected virtual void OnResourceIdsChanged() {
784      EventHandler handler = ResourceIdsChanged;
785      if (handler != null) handler(this, EventArgs.Empty);
786    }
787
788    public event EventHandler ExperimentChanged;
789    protected virtual void OnExperimentChanged() {
790      LogMessage("Experiment changed");
791      EventHandler handler = ExperimentChanged;
792      if (handler != null) handler(this, EventArgs.Empty);
793    }
794
795    public event EventHandler ServerUrlChanged;
796    protected virtual void OnServerUrlChanged() {
797      EventHandler handler = ServerUrlChanged;
798      if (handler != null) handler(this, EventArgs.Empty);
799    }
800
801    public event EventHandler IsResultsPollingChanged;
802    private void OnIsPollingResultsChanged() {
803      if (this.IsPollingResults) {
804        LogMessage("Results Polling Started");
805        timer.Start();
806      } else {
807        LogMessage("Results Polling Stopped");
808        timer.Stop();
809      }
810      EventHandler handler = IsResultsPollingChanged;
811      if (handler != null) handler(this, EventArgs.Empty);
812    }
813
814    private void RegisterJobItemListEvents() {
815      jobItems.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
816      jobItems.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
817      jobItems.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
818      jobItems.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
819      foreach (JobItem jobItem in jobItems) {
820        RegisterJobItemEvents(jobItem);
821      }
822    }
823
824    private void DeregisterJobItemListEvents() {
825      jobItems.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
826      jobItems.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
827      jobItems.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
828      jobItems.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
829      foreach (JobItem jobItem in jobItems) {
830        DeregisterJobItemEvents(jobItem);
831      }
832    }
833
834    void jobItems_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
835      UpdateJobItemEvents(e);
836    }
837
838    private void UpdateJobItemEvents(CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
839      if (e.OldItems != null) {
840        foreach (var item in e.OldItems) {
841          DeregisterJobItemEvents(item.Value);
842        }
843      }
844      if (e.Items != null) {
845        foreach (var item in e.Items) {
846          RegisterJobItemEvents(item.Value);
847        }
848      }
849    }
850
851    private void RegisterJobItemEvents(JobItem jobItem) {
852      jobItem.SnapshotRequestedStateChanged += new EventHandler(JobItem_SnapshotRequestedStateChanged);
853      jobItem.JobStateChanged += new EventHandler(JobItem_JobStateChanged);
854    }
855
856    private void DeregisterJobItemEvents(JobItem jobItem) {
857      jobItem.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
858      jobItem.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
859    }
860
861    void jobItems_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
862      UpdateJobItemEvents(e);
863    }
864
865    void jobItems_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
866      UpdateJobItemEvents(e);
867    }
868
869    void jobItems_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
870      foreach (var item in e.OldItems) {
871        item.Value.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
872        item.Value.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
873      }
874    }
875    #endregion
876
877    #region Helper Functions
878    private IExecutionEngineFacade GetExecutionEngineFacade() {
879      IExecutionEngineFacade executionEngineFacade = null;
880      do {
881        try {
882          executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl);
883        } catch (EndpointNotFoundException exception) {
884          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
885          Thread.Sleep(resultPollingIntervalMs);
886        }
887      } while (executionEngineFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
888      return executionEngineFacade;
889    }
890
891
892    private void AddRange(IDictionary<IOptimizer, IOptimizer> optimizers, IDictionary<IOptimizer, IOptimizer> childs) {
893      foreach (KeyValuePair<IOptimizer, IOptimizer> kvp in childs) {
894        optimizers.Add(kvp);
895      }
896    }
897
898    #endregion
899
900    #region Logging
901    private void LogMessage(string message) {
902      // HeuristicLab.Log is not Thread-Safe, so lock on every call
903      lock (locker) {
904        log.LogMessage(message);
905      }
906    }
907
908    private void LogMessage(Guid jobId, string message) {
909      GetJobItemById(jobId).LogMessage(message);
910      LogMessage(message + " (jobId: " + jobId + ")");
911    }
912
913    #endregion
914  }
915}
Note: See TracBrowser for help on using the repository browser.