Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4316

Last change on this file since 4316 was 4316, checked in by cneumuel, 12 years ago

made streaming wcf-services work with Transport-Security and net.tcp but with Message-Level Credentials (#1168)

File size: 34.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Linq;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using HeuristicLab.Optimization;
28using System.Drawing;
29using HeuristicLab.Collections;
30using System.Collections.Generic;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using HeuristicLab.Persistence.Default.Xml;
34using HeuristicLab.PluginInfrastructure;
35using System.Reflection;
36using HeuristicLab.Hive.Contracts.Interfaces;
37using HeuristicLab.Hive.Contracts;
38using System.Threading;
39using HeuristicLab.Tracing;
40using HeuristicLab.Hive.JobBase;
41using System.Diagnostics;
42using System.Collections;
43using System.ServiceModel;
44using HeuristicLab.Hive.Contracts.ResponseObjects;
45using HeuristicLab.Hive.Experiment.Properties;
46
47namespace HeuristicLab.Hive.Experiment {
48  /// <summary>
49  /// An experiment which contains multiple batch runs of algorithms.
50  /// </summary>
51  [Item(itemName, itemDescription)]
52  [Creatable("Testing & Analysis")]
53  [StorableClass]
54  public class HiveExperiment : NamedItem, IExecutable {
55    private const string itemName = "Hive Experiment";
56    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
57    private const int resultPollingIntervalMs = 5000;
58    private const int snapshotPollingIntervalMs = 1000;
59    private const int maxSnapshotRetries = 20;
60    private object locker = new object();
61
62    private System.Timers.Timer timer;
63    private bool pausePending, stopPending;
64    private bool sendingJobsFinished = false;
65
66    // ensure that only 2 threads can fetch jobresults simultaniously
67    private Semaphore fetchJobSemaphore = new Semaphore(2, 2);
68
69    private bool stopResultsPollingPending = false;
70
71    private Thread resultPollingThread;
72
73    private bool isPollingResults;
74    public bool IsPollingResults {
75      get { return isPollingResults; }
76      private set {
77        if (isPollingResults != value) {
78          isPollingResults = value;
79          OnIsPollingResultsChanged();
80        }
81      }
82    }
83
84    public IEnumerable<string> ResourceGroups {
85      get {
86        if (!string.IsNullOrEmpty(resourceIds)) {
87          return resourceIds.Split(';');
88        } else {
89          return new List<string>();
90        }
91      }
92    }
93
94    #region Storable Properties
95    [Storable]
96    private DateTime lastUpdateTime;
97
98    /// <summary>
99    /// Mapping from JobId to an optimizer.
100    /// Stores all pending optimizers. If an optimizer is finished it is removed from this collection
101    /// </summary>
102    [Storable]
103    private IDictionary<Guid, IOptimizer> pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
104
105    /// <summary>
106    /// Stores a mapping from the child-optimizer to the parent optimizer.
107    /// Needed to replace a finished optimizer in the optimizer-tree.
108    /// Only pending optmizers are stored.
109    /// </summary>
110    [Storable]
111    private IDictionary<IOptimizer, IOptimizer> parentOptimizersByPendingOptimizer = new Dictionary<IOptimizer, IOptimizer>();
112
113    [Storable]
114    private JobItemList jobItems;
115    public JobItemList JobItems {
116      get { return jobItems; }
117    }
118
119    [Storable]
120    private string resourceIds;
121    public string ResourceIds {
122      get { return resourceIds; }
123      set {
124        if (resourceIds != value) {
125          resourceIds = value;
126          OnResourceIdsChanged();
127        }
128      }
129    }
130
131    [Storable]
132    private HeuristicLab.Optimization.Experiment experiment;
133    public HeuristicLab.Optimization.Experiment Experiment {
134      get { return experiment; }
135      set {
136        if (experiment != value) {
137          experiment = value;
138          OnExperimentChanged();
139        }
140      }
141    }
142
143    [Storable]
144    private ILog log;
145    public ILog Log {
146      get { return log; }
147    }
148
149    [Storable]
150    private Core.ExecutionState executionState;
151    public ExecutionState ExecutionState {
152      get { return executionState; }
153      private set {
154        if (executionState != value) {
155          executionState = value;
156          OnExecutionStateChanged();
157        }
158      }
159    }
160
161    [Storable]
162    private TimeSpan executionTime;
163    public TimeSpan ExecutionTime {
164      get { return executionTime; }
165      private set {
166        if (executionTime != value) {
167          executionTime = value;
168          OnExecutionTimeChanged();
169        }
170      }
171    }
172    #endregion
173
174    [StorableConstructor]
175    public HiveExperiment(bool deserializing)
176      : base(deserializing) {
177    }
178
179    public HiveExperiment()
180      : base(itemName, itemDescription) {
181      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
182      this.log = new Log();
183      pausePending = stopPending = false;
184      jobItems = new JobItemList();
185      isPollingResults = false;
186      RegisterJobItemListEvents();
187      InitTimer();
188    }
189
190    public override IDeepCloneable Clone(Cloner cloner) {
191      LogMessage("I am beeing cloned");
192      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
193      clone.resourceIds = this.resourceIds;
194      clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment);
195      clone.executionState = this.executionState;
196      clone.executionTime = this.executionTime;
197      clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
198
199      foreach (var pair in this.pendingOptimizersByJobId)
200        clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
201
202      foreach (var pair in this.parentOptimizersByPendingOptimizer)
203        clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);
204
205      clone.log = (ILog)cloner.Clone(log);
206      clone.stopPending = this.stopPending;
207      clone.pausePending = this.pausePending;
208      clone.jobItems.AddRange((JobItemList)cloner.Clone(jobItems));
209      clone.lastUpdateTime = this.lastUpdateTime;
210      clone.isPollingResults = this.isPollingResults;
211      return clone;
212    }
213
214    [StorableHook(HookType.AfterDeserialization)]
215    private void AfterDeserialization() {
216      InitTimer();
217      this.IsPollingResults = false;
218      this.stopResultsPollingPending = false;
219      RegisterJobItemListEvents();
220      LogMessage("I was deserialized.");
221    }
222
223    #region Execution Time Timer
224    private void InitTimer() {
225      timer = new System.Timers.Timer(100);
226      timer.AutoReset = true;
227      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
228    }
229
230    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
231      DateTime now = DateTime.Now;
232      ExecutionTime += now - lastUpdateTime;
233      lastUpdateTime = now;
234    }
235    #endregion
236
237    #region IExecutable Methods
238    public void Pause() {
239      throw new NotSupportedException();
240    }
241
242    public void Prepare() {
243      if (experiment != null) {
244        StopResultPolling();
245        lock (pendingOptimizersByJobId) {
246          pendingOptimizersByJobId.Clear();
247        }
248        parentOptimizersByPendingOptimizer.Clear();
249        lock (jobItems) {
250          jobItems.Clear();
251        }
252        experiment.Prepare();
253        this.ExecutionState = Core.ExecutionState.Prepared;
254        OnPrepared();
255      }
256    }
257
258    public void Start() {
259      sendingJobsFinished = false;
260      OnStarted();
261      ExecutionTime = new TimeSpan();
262      lastUpdateTime = DateTime.Now;
263      this.ExecutionState = Core.ExecutionState.Started;
264      StartResultPolling();
265
266      Thread t = new Thread(() => {
267        IClientFacade clientFacade = CreateStreamedClientFacade();
268
269        try {
270          pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
271
272          LogMessage("Extracting jobs from Experiment");
273          parentOptimizersByPendingOptimizer = GetOptimizers(true);
274          LogMessage("Extraction of jobs from Experiment finished");
275
276          IEnumerable<string> groups = ResourceGroups;
277
278          foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) {
279            SerializedJob serializedJob = CreateSerializedJob(optimizer);
280            ResponseObject<JobDto> response = clientFacade.AddJobWithGroupStrings(serializedJob, groups);
281            lock (pendingOptimizersByJobId) {
282              pendingOptimizersByJobId.Add(response.Obj.Id, optimizer);
283            }
284
285            JobItem jobItem = new JobItem() {
286              JobDto = response.Obj,
287              LatestSnapshot = null,
288              Optimizer = optimizer
289            };
290            lock (jobItems) {
291              jobItems.Add(jobItem);
292            }
293            LogMessage(jobItem.JobDto.Id, "Job sent to Hive");
294          }
295        }
296        catch (Exception e) {
297          LogMessage("Error: Starting HiveExperiment failed: " + e.Message);
298          this.ExecutionState = Core.ExecutionState.Stopped;
299          OnStopped();
300        }
301        finally {
302          ServiceLocator.DisposeClientFacade(clientFacade);
303        }
304
305        sendingJobsFinished = true;
306      });
307      t.Start();
308    }
309
310    public void Stop() {
311      this.ExecutionState = Core.ExecutionState.Stopped;
312      foreach (JobItem jobItem in jobItems) {
313        AbortJob(jobItem.JobDto.Id);
314      }
315      OnStopped();
316    }
317    #endregion
318
319    #region Optimizier Management
320    /// <summary>
321    /// Returns all optimizers in the current Experiment
322    /// </summary>
323    /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
324    /// <returns></returns>
325    private IDictionary<IOptimizer, IOptimizer> GetOptimizers(bool flatout) {
326      if (!flatout) {
327        var optimizers = new Dictionary<IOptimizer, IOptimizer>();
328        foreach (IOptimizer opt in experiment.Optimizers) {
329          optimizers.Add(experiment, opt);
330        }
331        return optimizers;
332      } else {
333        return FlatOptimizerTree(null, experiment, "");
334      }
335    }
336
337    /// <summary>
338    /// Recursively iterates all IOptimizers in the optimizer-tree and returns them.
339    ///
340    /// [chn] this could be implemented more cleanly if Experiment and BatchRun would implement an interface like:
341    /// interface IParallelizable {
342    ///   IEnumerable&lt;IOptimizer&gt; GetOptimizers();
343    /// }
344    /// </summary>
345    /// <returns>a dictionary mapping from the parent optimizer to the child optimizer</returns>
346    private IDictionary<IOptimizer, IOptimizer> FlatOptimizerTree(IOptimizer parent, IOptimizer optimizer, string prepend) {
347      IDictionary<IOptimizer, IOptimizer> optimizers = new Dictionary<IOptimizer, IOptimizer>();
348      if (optimizer is HeuristicLab.Optimization.Experiment) {
349        HeuristicLab.Optimization.Experiment experiment = optimizer as HeuristicLab.Optimization.Experiment;
350        if (this.experiment != experiment) {
351          prepend += experiment.Name + "/"; // don't prepend for top-level optimizers
352        }
353        foreach (IOptimizer opt in experiment.Optimizers) {
354          AddRange(optimizers, FlatOptimizerTree(experiment, opt, prepend));
355        }
356      } else if (optimizer is BatchRun) {
357        BatchRun batchRun = optimizer as BatchRun;
358        prepend += batchRun.Name + "/";
359        for (int i = 0; i < batchRun.Repetitions; i++) {
360          IOptimizer opt = (IOptimizer)batchRun.Algorithm.Clone();
361          opt.Name += " [" + i + "]";
362          IDictionary<IOptimizer, IOptimizer> batchOptimizers = FlatOptimizerTree(batchRun, opt, prepend);
363          AddRange(optimizers, batchOptimizers);
364        }
365      } else if (optimizer is EngineAlgorithm) {
366        optimizer.Name = prepend + optimizer.Name;
367        optimizers.Add(optimizer, parent);
368        LogMessage("Optimizer extracted: " + optimizer.Name);
369      } else {
370        Logger.Warn("Optimizer of type " + optimizers.GetType().ToString() + " unknown");
371        optimizer.Name = prepend + optimizer.Name;
372        optimizers.Add(optimizer, parent);
373        LogMessage("Optimizer extracted: " + optimizer.Name);
374      }
375      return optimizers;
376    }
377
378    private void ReplaceOptimizer(IOptimizer parentOptimizer, IOptimizer originalOptimizer, IOptimizer newOptimizer) {
379      lock (locker) {
380        if (parentOptimizer is HeuristicLab.Optimization.Experiment) {
381          HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)parentOptimizer;
382          int originalOptimizerIndex = exp.Optimizers.IndexOf(originalOptimizer);
383          exp.Optimizers[originalOptimizerIndex] = newOptimizer;
384        } else if (parentOptimizer is BatchRun) {
385          BatchRun batchRun = (BatchRun)parentOptimizer;
386          if (newOptimizer is IAlgorithm) {
387            batchRun.Runs.Add(new Run(newOptimizer.Name, (IAlgorithm)newOptimizer));
388          } else {
389            throw new NotSupportedException("Only IAlgorithm types supported");
390          }
391        } else {
392          throw new NotSupportedException("Invalid parentOptimizer");
393        }
394      }
395    }
396
397    private bool NoMorePendingOptimizers() {
398      lock (pendingOptimizersByJobId) {
399        return pendingOptimizersByJobId.Count == 0;
400      }
401    }
402
403    /// <summary>
404    /// Removes optimizers from
405    ///  - parentOptimizersByPendingOptimizer
406    ///  - pendingOptimizersByJobId
407    /// </summary>
408    /// <param name="jobId"></param>
409    private void DisposeOptimizerMappings(Guid jobId) {
410      lock (pendingOptimizersByJobId) {
411        LogMessage(jobId, "Disposing Optimizer Mappings");
412        parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]);
413        pendingOptimizersByJobId.Remove(jobId);
414      }
415    }
416
417    #endregion
418
419    #region Job Management
420    /// <summary>
421    /// Updates all JobItems with the results
422    /// </summary>
423    /// <param name="jobResultList"></param>
424    private void UpdateJobItems(JobResultList jobResultList) {
425      // use a Dict to avoid quadratic runtime complexity
426      IDictionary<Guid, JobResult> jobResultDict = jobResultList.ToDictionary(job => job.JobId);
427      lock (jobItems) {
428        foreach (JobItem jobItem in JobItems) {
429          if (jobResultDict.ContainsKey(jobItem.JobDto.Id)) {
430            jobItem.JobResult = jobResultDict[jobItem.JobDto.Id];
431          }
432        }
433      }
434    }
435
436    private void JobItem_JobStateChanged(object sender, EventArgs e) {
437      JobItem jobItem = (JobItem)sender;
438      Thread t = new Thread(() => {
439        if (jobItem.State == JobState.Finished) {
440          FetchAndUpdateJob(jobItem.JobDto.Id);
441          DisposeOptimizerMappings(jobItem.JobDto.Id);
442        } else if (jobItem.State == JobState.Failed) {
443          DisposeOptimizerMappings(jobItem.JobDto.Id);
444        }
445
446        if (NoMorePendingOptimizers()) {
447          StopResultPolling();
448          this.ExecutionState = Core.ExecutionState.Stopped;
449          OnStopped();
450        }
451      });
452      t.Start();
453    }
454
455    /// <summary>
456    /// Fetches the finished job from the server and updates the jobItem
457    /// </summary>
458    private void FetchAndUpdateJob(Guid jobId) {
459      LogMessage(jobId, "FetchAndUpdateJob started");
460      IClientFacade clientFacade = CreateStreamedClientFacade();
461      IOptimizer originalOptimizer;
462      lock (pendingOptimizersByJobId) {
463        originalOptimizer = pendingOptimizersByJobId[jobId];
464      }
465
466      fetchJobSemaphore.WaitOne();
467      ResponseObject<SerializedJob> jobResponse = clientFacade.GetLastSerializedResult(jobId, false, false);
468      ServiceLocator.DisposeClientFacade(clientFacade);
469      IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
470      IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
471
472      ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer);
473      fetchJobSemaphore.Release();
474      LogMessage(jobId, "FetchAndUpdateJob ended");
475    }
476
477    private void UpdateJobItem(JobDto jobDto) {
478      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);
479      jobItem.JobDto = jobDto;
480    }
481
482    public void AbortJob(Guid jobId) {
483      IClientFacade clientFacade = CreateClientFacade();
484      Response response = clientFacade.AbortJob(jobId);
485      LogMessage(jobId, "Aborting Job: " + response.StatusMessage);
486    }
487
488    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
489      IJob job = new OptimizerJob() {
490        Optimizer = optimizer
491      };
492
493      // serialize job
494      MemoryStream memStream = new MemoryStream();
495      XmlGenerator.Serialize(job, memStream);
496      byte[] jobByteArray = memStream.ToArray();
497      memStream.Dispose();
498
499      // find out which which plugins are needed for the given object
500      List<HivePluginInfoDto> pluginsNeeded = (
501        from p in GetDeclaringPlugins(job.GetType())
502        select new HivePluginInfoDto() {
503          Name = p.Name,
504          Version = p.Version
505        }).ToList();
506
507      JobDto jobDto = new JobDto() {
508        CoresNeeded = 1, // [chn] how to determine real cores needed?
509        PluginsNeeded = pluginsNeeded,
510        State = JobState.Offline,
511        MemoryNeeded = 0,
512        UserId = Guid.Empty // [chn] set real userid here!
513      };
514
515      SerializedJob serializedJob = new SerializedJob() {
516        JobInfo = jobDto,
517        SerializedJobData = jobByteArray
518      };
519
520      return serializedJob;
521    }
522
523    private JobItem GetJobItemById(Guid jobId) {
524      return jobItems.Single(x => x.JobDto.Id == jobId);
525    }
526    #endregion
527
528    #region Result Polling
529    public void StartResultPolling() {
530      this.stopResultsPollingPending = false;
531      this.IsPollingResults = true;
532      resultPollingThread = CreateResultPollingThread();
533      if (resultPollingThread.ThreadState != System.Threading.ThreadState.Running)
534        resultPollingThread.Start();
535    }
536
537    public void StopResultPolling() {
538      this.stopResultsPollingPending = true;
539      resultPollingThread.Interrupt();
540      this.stopResultsPollingPending = false;
541    }
542
543    private Thread CreateResultPollingThread() {
544      return new Thread(() => {
545        try {
546          do {
547            IClientFacade clientFacade = CreateStreamedClientFacade();
548            IEnumerable<Guid> jobIdsToQuery = from job in JobItems
549                                              where job.State != JobState.Finished &&
550                                              job.State != JobState.Failed
551                                              select job.JobDto.Id;
552            if (jobIdsToQuery.Count() > 0) {
553              LogMessage("Polling results for " + jobIdsToQuery.Count() + " jobs");
554              try {
555                ResponseObject<JobResultList> response = clientFacade.GetJobResults(jobIdsToQuery);
556                if (response.StatusMessage == ResponseStatus.Ok) {
557                  JobResultList jobItemList = response.Obj;
558                  UpdateJobItems(jobItemList);
559
560                  LogMessage("Polling successfully finished");
561                } else {
562                  throw new Exception(response.StatusMessage.ToString());
563                }
564              }
565              catch (Exception e) {
566                LogMessage("Polling results failed: " + e.Message);
567              }
568              finally {
569                ServiceLocator.DisposeClientFacade(clientFacade);
570              }
571              Thread.Sleep(resultPollingIntervalMs);
572            } else {
573              if (sendingJobsFinished) {
574                // all the jobs have been sent to hive, but non are to query any more (all finished or failed)
575                this.stopResultsPollingPending = true;
576              }
577            }
578          } while (!this.stopResultsPollingPending);
579        }
580        catch (ThreadInterruptedException exception) {
581          // thread has been interuppted
582        }
583        finally {
584          this.IsPollingResults = false;
585        }
586      });
587    }
588
589    #endregion
590
591    #region Snapshots
592
593    private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
594      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
595      jobItem.LatestSnapshot = response;
596    }
597
598    public void RequestSnapshot(Guid jobId) {
599      Thread t = new Thread(() => {
600        IClientFacade clientFacade = CreateStreamedClientFacade();
601        try {
602          ResponseObject<SerializedJob> response;
603          int retryCount = 0;
604
605          Response snapShotResponse = clientFacade.RequestSnapshot(jobId);
606          if (snapShotResponse.StatusMessage == ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated) {
607            // job already finished
608            Logger.Debug("HiveExperiment: Abort - GetLastResult(false)");
609            response = clientFacade.GetLastSerializedResult(jobId, false, false);
610            Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
611          } else {
612            // server sent snapshot request to client
613            // poll until snapshot is ready
614            do {
615              Thread.Sleep(snapshotPollingIntervalMs);
616              Logger.Debug("HiveExperiment: Abort - GetLastResult(true)");
617              response = clientFacade.GetLastSerializedResult(jobId, false, true);
618              Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
619              retryCount++;
620              // loop while
621              // 1. problem with communication with server
622              // 2. job result not yet ready
623            } while (
624              (retryCount < maxSnapshotRetries) && (
625              response.StatusMessage == ResponseStatus.GetLastSerializedResult_JobResultNotYetThere)
626              );
627          }
628          if (response.StatusMessage == ResponseStatus.Ok) {
629            LogMessage(jobId, "Snapshot polling successfull for job " + jobId);
630            UpdateSnapshot(response);
631          } else {
632            LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage);
633          }
634        }
635        finally {
636          ServiceLocator.DisposeClientFacade(clientFacade);
637        }
638      });
639      t.Start();
640    }
641
642    void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) {
643      JobItem jobItem = (JobItem)sender;
644      if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) {
645        RequestSnapshot(jobItem.JobDto.Id);
646      }
647    }
648
649    #endregion
650
651    #region Required Plugin Search
652    /// <summary>
653    /// Returns a list of plugins in which the type itself and all members
654    /// of the type are declared. Objectgraph is searched recursively.
655    /// </summary>
656    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
657      HashSet<Type> types = new HashSet<Type>();
658      FindTypes(type, types, "HeuristicLab.");
659      return GetDeclaringPlugins(types);
660    }
661
662    /// <summary>
663    /// Returns the plugins (including dependencies) in which the given types are declared
664    /// </summary>
665    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
666      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
667      foreach (Type t in types) {
668        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
669      }
670      return plugins;
671    }
672
673    /// <summary>
674    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
675    /// Also searches the dependencies recursively.
676    /// </summary>
677    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
678      if (!plugins.Contains(plugin)) {
679        plugins.Add(plugin);
680        foreach (IPluginDescription dependency in plugin.Dependencies) {
681          FindDeclaringPlugins(dependency, plugins);
682        }
683      }
684    }
685
686    /// <summary>
687    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
688    /// Be aware that search is not performed on attributes
689    /// </summary>
690    /// <param name="type">the type to be searched</param>
691    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
692    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
693    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
694      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
695        types.Add(type);
696
697        // constructors
698        foreach (ConstructorInfo info in type.GetConstructors()) {
699          foreach (ParameterInfo paramInfo in info.GetParameters()) {
700            FindTypes(paramInfo.ParameterType, types, namespaceStart);
701          }
702        }
703
704        // interfaces
705        foreach (Type t in type.GetInterfaces()) {
706          FindTypes(t, types, namespaceStart);
707        }
708
709        // events
710        foreach (EventInfo info in type.GetEvents()) {
711          FindTypes(info.EventHandlerType, types, namespaceStart);
712          FindTypes(info.DeclaringType, types, namespaceStart);
713        }
714
715        // properties
716        foreach (PropertyInfo info in type.GetProperties()) {
717          FindTypes(info.PropertyType, types, namespaceStart);
718        }
719
720        // fields
721        foreach (FieldInfo info in type.GetFields()) {
722          FindTypes(info.FieldType, types, namespaceStart);
723        }
724
725        // methods
726        foreach (MethodInfo info in type.GetMethods()) {
727          foreach (ParameterInfo paramInfo in info.GetParameters()) {
728            FindTypes(paramInfo.ParameterType, types, namespaceStart);
729          }
730          FindTypes(info.ReturnType, types, namespaceStart);
731        }
732      }
733    }
734    #endregion
735
736    #region Eventhandler
737
738    public event EventHandler ExecutionTimeChanged;
739    private void OnExecutionTimeChanged() {
740      EventHandler handler = ExecutionTimeChanged;
741      if (handler != null) handler(this, EventArgs.Empty);
742    }
743
744    public event EventHandler ExecutionStateChanged;
745    private void OnExecutionStateChanged() {
746      LogMessage("ExecutionState changed to " + executionState.ToString());
747      EventHandler handler = ExecutionStateChanged;
748      if (handler != null) handler(this, EventArgs.Empty);
749    }
750
751    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
752
753    public event EventHandler Started;
754    private void OnStarted() {
755      LogMessage("Started");
756      timer.Start();
757      EventHandler handler = Started;
758      if (handler != null) handler(this, EventArgs.Empty);
759    }
760
761    public event EventHandler Stopped;
762    private void OnStopped() {
763      timer.Stop();
764      LogMessage("Stopped");
765      EventHandler handler = Stopped;
766      if (handler != null) handler(this, EventArgs.Empty);
767    }
768
769    public event EventHandler Paused;
770    private void OnPaused() {
771      timer.Stop();
772      LogMessage("Paused");
773      EventHandler handler = Paused;
774      if (handler != null) handler(this, EventArgs.Empty);
775    }
776
777    public event EventHandler Prepared;
778    protected virtual void OnPrepared() {
779      LogMessage("Prepared");
780      EventHandler handler = Prepared;
781      if (handler != null) handler(this, EventArgs.Empty);
782    }
783
784    public event EventHandler ResourceIdsChanged;
785    protected virtual void OnResourceIdsChanged() {
786      EventHandler handler = ResourceIdsChanged;
787      if (handler != null) handler(this, EventArgs.Empty);
788    }
789
790    public event EventHandler ExperimentChanged;
791    protected virtual void OnExperimentChanged() {
792      LogMessage("Experiment changed");
793      EventHandler handler = ExperimentChanged;
794      if (handler != null) handler(this, EventArgs.Empty);
795    }
796
797    public event EventHandler ServerUrlChanged;
798    protected virtual void OnServerUrlChanged() {
799      EventHandler handler = ServerUrlChanged;
800      if (handler != null) handler(this, EventArgs.Empty);
801    }
802
803    public event EventHandler IsResultsPollingChanged;
804    private void OnIsPollingResultsChanged() {
805      if (this.IsPollingResults) {
806        LogMessage("Results Polling Started");
807        timer.Start();
808      } else {
809        LogMessage("Results Polling Stopped");
810        timer.Stop();
811      }
812      EventHandler handler = IsResultsPollingChanged;
813      if (handler != null) handler(this, EventArgs.Empty);
814    }
815
816    private void RegisterJobItemListEvents() {
817      jobItems.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
818      jobItems.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
819      jobItems.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
820      jobItems.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
821      foreach (JobItem jobItem in jobItems) {
822        RegisterJobItemEvents(jobItem);
823      }
824    }
825
826    private void DeregisterJobItemListEvents() {
827      jobItems.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
828      jobItems.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
829      jobItems.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
830      jobItems.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
831      foreach (JobItem jobItem in jobItems) {
832        DeregisterJobItemEvents(jobItem);
833      }
834    }
835
836    void jobItems_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
837      UpdateJobItemEvents(e);
838    }
839
840    private void UpdateJobItemEvents(CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
841      if (e.OldItems != null) {
842        foreach (var item in e.OldItems) {
843          DeregisterJobItemEvents(item.Value);
844        }
845      }
846      if (e.Items != null) {
847        foreach (var item in e.Items) {
848          RegisterJobItemEvents(item.Value);
849        }
850      }
851    }
852
853    private void RegisterJobItemEvents(JobItem jobItem) {
854      jobItem.SnapshotRequestedStateChanged += new EventHandler(JobItem_SnapshotRequestedStateChanged);
855      jobItem.JobStateChanged += new EventHandler(JobItem_JobStateChanged);
856    }
857
858    private void DeregisterJobItemEvents(JobItem jobItem) {
859      jobItem.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
860      jobItem.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
861    }
862
863    void jobItems_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
864      UpdateJobItemEvents(e);
865    }
866
867    void jobItems_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
868      UpdateJobItemEvents(e);
869    }
870
871    void jobItems_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
872      foreach (var item in e.OldItems) {
873        item.Value.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
874        item.Value.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
875      }
876    }
877    #endregion
878
879    #region Helper Functions
880    private IClientFacade CreateClientFacade() {
881      IClientFacade clientFacade = null;
882      do {
883        try {
884          //clientFacade = ServiceLocator.CreateClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientServiceName));
885          clientFacade = ServiceLocator.CreateClientFacade(Settings.Default.HiveServerIp);
886
887        } catch (EndpointNotFoundException exception) {
888          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
889          Thread.Sleep(resultPollingIntervalMs);
890        }
891      } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
892      return clientFacade;
893    }
894
895    private IClientFacade CreateStreamedClientFacade() {
896      IClientFacade clientFacade = null;
897      do {
898        try {
899          //clientFacade = ServiceLocator.CreateStreamedClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientStreamedServiceName));
900          clientFacade = ServiceLocator.CreateStreamedClientFacade(Settings.Default.HiveServerIp);
901        } catch (EndpointNotFoundException exception) {
902          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
903          Thread.Sleep(resultPollingIntervalMs);
904        }
905      } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
906      return clientFacade;
907    }
908
909    private void AddRange(IDictionary<IOptimizer, IOptimizer> optimizers, IDictionary<IOptimizer, IOptimizer> childs) {
910      foreach (KeyValuePair<IOptimizer, IOptimizer> kvp in childs) {
911        optimizers.Add(kvp);
912      }
913    }
914
915    #endregion
916
917    #region Logging
918    private void LogMessage(string message) {
919      // HeuristicLab.Log is not Thread-Safe, so lock on every call
920      lock (locker) {
921        log.LogMessage(message);
922      }
923    }
924
925    private void LogMessage(Guid jobId, string message) {
926      GetJobItemById(jobId).LogMessage(message);
927      LogMessage(message + " (jobId: " + jobId + ")");
928    }
929
930    #endregion
931  }
932}
Note: See TracBrowser for help on using the repository browser.