Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4316

Last change on this file since 4316 was 4316, checked in by cneumuel, 14 years ago

made streaming wcf-services work with Transport-Security and net.tcp but with Message-Level Credentials (#1168)

File size: 34.4 KB
RevLine 
[4116]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Linq;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using HeuristicLab.Optimization;
28using System.Drawing;
29using HeuristicLab.Collections;
30using System.Collections.Generic;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using HeuristicLab.Persistence.Default.Xml;
34using HeuristicLab.PluginInfrastructure;
35using System.Reflection;
36using HeuristicLab.Hive.Contracts.Interfaces;
37using HeuristicLab.Hive.Contracts;
38using System.Threading;
39using HeuristicLab.Tracing;
[4119]40using HeuristicLab.Hive.JobBase;
41using System.Diagnostics;
42using System.Collections;
[4170]43using System.ServiceModel;
[4263]44using HeuristicLab.Hive.Contracts.ResponseObjects;
[4305]45using HeuristicLab.Hive.Experiment.Properties;
[4116]46
47namespace HeuristicLab.Hive.Experiment {
48  /// <summary>
49  /// An experiment which contains multiple batch runs of algorithms.
50  /// </summary>
51  [Item(itemName, itemDescription)]
52  [Creatable("Testing & Analysis")]
53  [StorableClass]
54  public class HiveExperiment : NamedItem, IExecutable {
55    private const string itemName = "Hive Experiment";
[4139]56    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
[4170]57    private const int resultPollingIntervalMs = 5000;
[4141]58    private const int snapshotPollingIntervalMs = 1000;
[4133]59    private const int maxSnapshotRetries = 20;
60    private object locker = new object();
[4119]61
[4120]62    private System.Timers.Timer timer;
63    private bool pausePending, stopPending;
[4170]64    private bool sendingJobsFinished = false;
[4133]65
[4170]66    // ensure that only 2 threads can fetch jobresults simultaniously
67    private Semaphore fetchJobSemaphore = new Semaphore(2, 2);
68
[4173]69    private bool stopResultsPollingPending = false;
[4116]70
[4173]71    private Thread resultPollingThread;
72
[4133]73    private bool isPollingResults;
74    public bool IsPollingResults {
75      get { return isPollingResults; }
76      private set {
77        if (isPollingResults != value) {
78          isPollingResults = value;
79          OnIsPollingResultsChanged();
80        }
81      }
82    }
83
[4173]84    public IEnumerable<string> ResourceGroups {
85      get {
86        if (!string.IsNullOrEmpty(resourceIds)) {
87          return resourceIds.Split(';');
88        } else {
89          return new List<string>();
90        }
91      }
92    }
[4305]93
[4173]94    #region Storable Properties
95    [Storable]
96    private DateTime lastUpdateTime;
[4133]97
[4139]98    /// <summary>
99    /// Mapping from JobId to an optimizer.
100    /// Stores all pending optimizers. If an optimizer is finished it is removed from this collection
101    /// </summary>
[4119]102    [Storable]
[4139]103    private IDictionary<Guid, IOptimizer> pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
[4119]104
[4139]105    /// <summary>
106    /// Stores a mapping from the child-optimizer to the parent optimizer.
107    /// Needed to replace a finished optimizer in the optimizer-tree.
108    /// Only pending optmizers are stored.
109    /// </summary>
[4120]110    [Storable]
[4139]111    private IDictionary<IOptimizer, IOptimizer> parentOptimizersByPendingOptimizer = new Dictionary<IOptimizer, IOptimizer>();
112
113    [Storable]
[4120]114    private JobItemList jobItems;
115    public JobItemList JobItems {
116      get { return jobItems; }
117    }
[4116]118
[4119]119    [Storable]
[4116]120    private string resourceIds;
121    public string ResourceIds {
122      get { return resourceIds; }
[4120]123      set {
124        if (resourceIds != value) {
[4133]125          resourceIds = value;
[4120]126          OnResourceIdsChanged();
127        }
128      }
[4116]129    }
130
[4119]131    [Storable]
[4116]132    private HeuristicLab.Optimization.Experiment experiment;
133    public HeuristicLab.Optimization.Experiment Experiment {
134      get { return experiment; }
[4120]135      set {
136        if (experiment != value) {
137          experiment = value;
138          OnExperimentChanged();
139        }
140      }
[4116]141    }
[4119]142
[4120]143    [Storable]
144    private ILog log;
145    public ILog Log {
146      get { return log; }
147    }
148
[4173]149    [Storable]
150    private Core.ExecutionState executionState;
151    public ExecutionState ExecutionState {
152      get { return executionState; }
153      private set {
154        if (executionState != value) {
155          executionState = value;
156          OnExecutionStateChanged();
157        }
158      }
159    }
160
161    [Storable]
162    private TimeSpan executionTime;
163    public TimeSpan ExecutionTime {
164      get { return executionTime; }
165      private set {
166        if (executionTime != value) {
167          executionTime = value;
168          OnExecutionTimeChanged();
169        }
170      }
171    }
172    #endregion
173
[4119]174    [StorableConstructor]
175    public HiveExperiment(bool deserializing)
176      : base(deserializing) {
[4116]177    }
178
179    public HiveExperiment()
180      : base(itemName, itemDescription) {
181      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
[4120]182      this.log = new Log();
183      pausePending = stopPending = false;
184      jobItems = new JobItemList();
[4133]185      isPollingResults = false;
[4170]186      RegisterJobItemListEvents();
[4120]187      InitTimer();
[4116]188    }
189
[4119]190    public override IDeepCloneable Clone(Cloner cloner) {
[4121]191      LogMessage("I am beeing cloned");
[4119]192      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
193      clone.resourceIds = this.resourceIds;
194      clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment);
195      clone.executionState = this.executionState;
196      clone.executionTime = this.executionTime;
[4139]197      clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
[4141]198
[4139]199      foreach (var pair in this.pendingOptimizersByJobId)
200        clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
[4141]201
[4139]202      foreach (var pair in this.parentOptimizersByPendingOptimizer)
[4141]203        clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);
204
[4120]205      clone.log = (ILog)cloner.Clone(log);
206      clone.stopPending = this.stopPending;
207      clone.pausePending = this.pausePending;
[4170]208      clone.jobItems.AddRange((JobItemList)cloner.Clone(jobItems));
[4133]209      clone.lastUpdateTime = this.lastUpdateTime;
210      clone.isPollingResults = this.isPollingResults;
[4119]211      return clone;
212    }
213
[4120]214    [StorableHook(HookType.AfterDeserialization)]
215    private void AfterDeserialization() {
216      InitTimer();
[4133]217      this.IsPollingResults = false;
218      this.stopResultsPollingPending = false;
[4170]219      RegisterJobItemListEvents();
[4121]220      LogMessage("I was deserialized.");
[4120]221    }
[4170]222
[4173]223    #region Execution Time Timer
[4120]224    private void InitTimer() {
225      timer = new System.Timers.Timer(100);
226      timer.AutoReset = true;
227      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
228    }
229
230    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
231      DateTime now = DateTime.Now;
232      ExecutionTime += now - lastUpdateTime;
233      lastUpdateTime = now;
234    }
[4173]235    #endregion
[4120]236
[4173]237    #region IExecutable Methods
[4116]238    public void Pause() {
239      throw new NotSupportedException();
240    }
241
242    public void Prepare() {
[4119]243      if (experiment != null) {
[4141]244        StopResultPolling();
[4173]245        lock (pendingOptimizersByJobId) {
246          pendingOptimizersByJobId.Clear();
247        }
[4141]248        parentOptimizersByPendingOptimizer.Clear();
[4173]249        lock (jobItems) {
250          jobItems.Clear();
251        }
[4119]252        experiment.Prepare();
[4120]253        this.ExecutionState = Core.ExecutionState.Prepared;
[4119]254        OnPrepared();
255      }
[4116]256    }
257
258    public void Start() {
[4173]259      sendingJobsFinished = false;
[4120]260      OnStarted();
[4141]261      ExecutionTime = new TimeSpan();
[4120]262      lastUpdateTime = DateTime.Now;
[4119]263      this.ExecutionState = Core.ExecutionState.Started;
[4170]264      StartResultPolling();
265
[4120]266      Thread t = new Thread(() => {
[4305]267        IClientFacade clientFacade = CreateStreamedClientFacade();
[4173]268
[4170]269        try {
270          pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
[4120]271
[4170]272          LogMessage("Extracting jobs from Experiment");
273          parentOptimizersByPendingOptimizer = GetOptimizers(true);
274          LogMessage("Extraction of jobs from Experiment finished");
[4173]275
[4170]276          IEnumerable<string> groups = ResourceGroups;
[4120]277
[4170]278          foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) {
279            SerializedJob serializedJob = CreateSerializedJob(optimizer);
[4302]280            ResponseObject<JobDto> response = clientFacade.AddJobWithGroupStrings(serializedJob, groups);
[4173]281            lock (pendingOptimizersByJobId) {
282              pendingOptimizersByJobId.Add(response.Obj.Id, optimizer);
283            }
[4170]284
285            JobItem jobItem = new JobItem() {
286              JobDto = response.Obj,
287              LatestSnapshot = null,
288              Optimizer = optimizer
289            };
290            lock (jobItems) {
291              jobItems.Add(jobItem);
292            }
293            LogMessage(jobItem.JobDto.Id, "Job sent to Hive");
294          }
[4305]295        }
296        catch (Exception e) {
[4170]297          LogMessage("Error: Starting HiveExperiment failed: " + e.Message);
298          this.ExecutionState = Core.ExecutionState.Stopped;
299          OnStopped();
[4120]300        }
[4316]301        finally {
302          ServiceLocator.DisposeClientFacade(clientFacade);
303        }
[4139]304
[4173]305        sendingJobsFinished = true;
[4120]306      });
307      t.Start();
308    }
309
[4135]310    public void Stop() {
311      this.ExecutionState = Core.ExecutionState.Stopped;
312      foreach (JobItem jobItem in jobItems) {
313        AbortJob(jobItem.JobDto.Id);
314      }
315      OnStopped();
316    }
[4173]317    #endregion
[4135]318
[4173]319    #region Optimizier Management
[4120]320    /// <summary>
321    /// Returns all optimizers in the current Experiment
322    /// </summary>
323    /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
324    /// <returns></returns>
[4139]325    private IDictionary<IOptimizer, IOptimizer> GetOptimizers(bool flatout) {
[4120]326      if (!flatout) {
[4139]327        var optimizers = new Dictionary<IOptimizer, IOptimizer>();
328        foreach (IOptimizer opt in experiment.Optimizers) {
329          optimizers.Add(experiment, opt);
330        }
331        return optimizers;
[4120]332      } else {
[4144]333        return FlatOptimizerTree(null, experiment, "");
[4116]334      }
335    }
336
[4139]337    /// <summary>
338    /// Recursively iterates all IOptimizers in the optimizer-tree and returns them.
339    ///
340    /// [chn] this could be implemented more cleanly if Experiment and BatchRun would implement an interface like:
341    /// interface IParallelizable {
342    ///   IEnumerable&lt;IOptimizer&gt; GetOptimizers();
343    /// }
344    /// </summary>
345    /// <returns>a dictionary mapping from the parent optimizer to the child optimizer</returns>
[4144]346    private IDictionary<IOptimizer, IOptimizer> FlatOptimizerTree(IOptimizer parent, IOptimizer optimizer, string prepend) {
[4139]347      IDictionary<IOptimizer, IOptimizer> optimizers = new Dictionary<IOptimizer, IOptimizer>();
348      if (optimizer is HeuristicLab.Optimization.Experiment) {
349        HeuristicLab.Optimization.Experiment experiment = optimizer as HeuristicLab.Optimization.Experiment;
[4144]350        if (this.experiment != experiment) {
[4170]351          prepend += experiment.Name + "/"; // don't prepend for top-level optimizers
[4144]352        }
[4139]353        foreach (IOptimizer opt in experiment.Optimizers) {
[4144]354          AddRange(optimizers, FlatOptimizerTree(experiment, opt, prepend));
[4139]355        }
356      } else if (optimizer is BatchRun) {
357        BatchRun batchRun = optimizer as BatchRun;
[4170]358        prepend += batchRun.Name + "/";
[4139]359        for (int i = 0; i < batchRun.Repetitions; i++) {
360          IOptimizer opt = (IOptimizer)batchRun.Algorithm.Clone();
[4144]361          opt.Name += " [" + i + "]";
362          IDictionary<IOptimizer, IOptimizer> batchOptimizers = FlatOptimizerTree(batchRun, opt, prepend);
363          AddRange(optimizers, batchOptimizers);
[4139]364        }
365      } else if (optimizer is EngineAlgorithm) {
[4144]366        optimizer.Name = prepend + optimizer.Name;
[4139]367        optimizers.Add(optimizer, parent);
[4173]368        LogMessage("Optimizer extracted: " + optimizer.Name);
[4139]369      } else {
370        Logger.Warn("Optimizer of type " + optimizers.GetType().ToString() + " unknown");
[4144]371        optimizer.Name = prepend + optimizer.Name;
[4139]372        optimizers.Add(optimizer, parent);
[4173]373        LogMessage("Optimizer extracted: " + optimizer.Name);
[4139]374      }
375      return optimizers;
376    }
377
378    private void ReplaceOptimizer(IOptimizer parentOptimizer, IOptimizer originalOptimizer, IOptimizer newOptimizer) {
[4121]379      lock (locker) {
[4139]380        if (parentOptimizer is HeuristicLab.Optimization.Experiment) {
381          HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)parentOptimizer;
382          int originalOptimizerIndex = exp.Optimizers.IndexOf(originalOptimizer);
383          exp.Optimizers[originalOptimizerIndex] = newOptimizer;
384        } else if (parentOptimizer is BatchRun) {
385          BatchRun batchRun = (BatchRun)parentOptimizer;
386          if (newOptimizer is IAlgorithm) {
[4170]387            batchRun.Runs.Add(new Run(newOptimizer.Name, (IAlgorithm)newOptimizer));
[4139]388          } else {
389            throw new NotSupportedException("Only IAlgorithm types supported");
390          }
391        } else {
392          throw new NotSupportedException("Invalid parentOptimizer");
393        }
[4121]394      }
[4120]395    }
396
[4173]397    private bool NoMorePendingOptimizers() {
398      lock (pendingOptimizersByJobId) {
399        return pendingOptimizersByJobId.Count == 0;
400      }
[4133]401    }
[4135]402
[4173]403    /// <summary>
404    /// Removes optimizers from
405    ///  - parentOptimizersByPendingOptimizer
406    ///  - pendingOptimizersByJobId
407    /// </summary>
408    /// <param name="jobId"></param>
409    private void DisposeOptimizerMappings(Guid jobId) {
410      lock (pendingOptimizersByJobId) {
411        LogMessage(jobId, "Disposing Optimizer Mappings");
412        parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]);
413        pendingOptimizersByJobId.Remove(jobId);
414      }
415    }
416
[4116]417    #endregion
418
[4173]419    #region Job Management
420    /// <summary>
421    /// Updates all JobItems with the results
422    /// </summary>
423    /// <param name="jobResultList"></param>
424    private void UpdateJobItems(JobResultList jobResultList) {
425      // use a Dict to avoid quadratic runtime complexity
426      IDictionary<Guid, JobResult> jobResultDict = jobResultList.ToDictionary(job => job.JobId);
427      lock (jobItems) {
428        foreach (JobItem jobItem in JobItems) {
429          if (jobResultDict.ContainsKey(jobItem.JobDto.Id)) {
430            jobItem.JobResult = jobResultDict[jobItem.JobDto.Id];
431          }
[4170]432        }
[4173]433      }
[4133]434    }
435
[4173]436    private void JobItem_JobStateChanged(object sender, EventArgs e) {
437      JobItem jobItem = (JobItem)sender;
438      Thread t = new Thread(() => {
[4264]439        if (jobItem.State == JobState.Finished) {
[4173]440          FetchAndUpdateJob(jobItem.JobDto.Id);
441          DisposeOptimizerMappings(jobItem.JobDto.Id);
[4264]442        } else if (jobItem.State == JobState.Failed) {
[4173]443          DisposeOptimizerMappings(jobItem.JobDto.Id);
444        }
445
446        if (NoMorePendingOptimizers()) {
447          StopResultPolling();
448          this.ExecutionState = Core.ExecutionState.Stopped;
449          OnStopped();
450        }
451      });
452      t.Start();
453    }
454
455    /// <summary>
456    /// Fetches the finished job from the server and updates the jobItem
457    /// </summary>
458    private void FetchAndUpdateJob(Guid jobId) {
459      LogMessage(jobId, "FetchAndUpdateJob started");
[4305]460      IClientFacade clientFacade = CreateStreamedClientFacade();
[4173]461      IOptimizer originalOptimizer;
462      lock (pendingOptimizersByJobId) {
463        originalOptimizer = pendingOptimizersByJobId[jobId];
464      }
465
466      fetchJobSemaphore.WaitOne();
[4302]467      ResponseObject<SerializedJob> jobResponse = clientFacade.GetLastSerializedResult(jobId, false, false);
[4316]468      ServiceLocator.DisposeClientFacade(clientFacade);
[4173]469      IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
470      IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
471
472      ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer);
473      fetchJobSemaphore.Release();
474      LogMessage(jobId, "FetchAndUpdateJob ended");
475    }
476
477    private void UpdateJobItem(JobDto jobDto) {
478      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);
479      jobItem.JobDto = jobDto;
480    }
481
482    public void AbortJob(Guid jobId) {
[4305]483      IClientFacade clientFacade = CreateClientFacade();
[4302]484      Response response = clientFacade.AbortJob(jobId);
[4173]485      LogMessage(jobId, "Aborting Job: " + response.StatusMessage);
486    }
487
[4119]488    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
489      IJob job = new OptimizerJob() {
490        Optimizer = optimizer
491      };
492
493      // serialize job
[4116]494      MemoryStream memStream = new MemoryStream();
[4119]495      XmlGenerator.Serialize(job, memStream);
496      byte[] jobByteArray = memStream.ToArray();
497      memStream.Dispose();
[4116]498
499      // find out which which plugins are needed for the given object
500      List<HivePluginInfoDto> pluginsNeeded = (
[4170]501        from p in GetDeclaringPlugins(job.GetType())
[4116]502        select new HivePluginInfoDto() {
503          Name = p.Name,
[4119]504          Version = p.Version
[4116]505        }).ToList();
506
507      JobDto jobDto = new JobDto() {
508        CoresNeeded = 1, // [chn] how to determine real cores needed?
509        PluginsNeeded = pluginsNeeded,
[4264]510        State = JobState.Offline,
[4116]511        MemoryNeeded = 0,
512        UserId = Guid.Empty // [chn] set real userid here!
513      };
514
515      SerializedJob serializedJob = new SerializedJob() {
516        JobInfo = jobDto,
[4119]517        SerializedJobData = jobByteArray
[4116]518      };
519
520      return serializedJob;
521    }
522
[4173]523    private JobItem GetJobItemById(Guid jobId) {
524      return jobItems.Single(x => x.JobDto.Id == jobId);
525    }
526    #endregion
527
528    #region Result Polling
529    public void StartResultPolling() {
530      this.stopResultsPollingPending = false;
531      this.IsPollingResults = true;
532      resultPollingThread = CreateResultPollingThread();
533      if (resultPollingThread.ThreadState != System.Threading.ThreadState.Running)
534        resultPollingThread.Start();
535    }
536
537    public void StopResultPolling() {
538      this.stopResultsPollingPending = true;
539      resultPollingThread.Interrupt();
540      this.stopResultsPollingPending = false;
541    }
542
[4170]543    private Thread CreateResultPollingThread() {
[4133]544      return new Thread(() => {
545        try {
546          do {
[4305]547            IClientFacade clientFacade = CreateStreamedClientFacade();
[4170]548            IEnumerable<Guid> jobIdsToQuery = from job in JobItems
[4264]549                                              where job.State != JobState.Finished &&
550                                              job.State != JobState.Failed
[4170]551                                              select job.JobDto.Id;
552            if (jobIdsToQuery.Count() > 0) {
553              LogMessage("Polling results for " + jobIdsToQuery.Count() + " jobs");
554              try {
[4302]555                ResponseObject<JobResultList> response = clientFacade.GetJobResults(jobIdsToQuery);
[4263]556                if (response.StatusMessage == ResponseStatus.Ok) {
[4170]557                  JobResultList jobItemList = response.Obj;
558                  UpdateJobItems(jobItemList);
[4144]559
[4170]560                  LogMessage("Polling successfully finished");
[4263]561                } else {
562                  throw new Exception(response.StatusMessage.ToString());
[4170]563                }
[4305]564              }
565              catch (Exception e) {
[4170]566                LogMessage("Polling results failed: " + e.Message);
567              }
[4316]568              finally {
569                ServiceLocator.DisposeClientFacade(clientFacade);
570              }
[4170]571              Thread.Sleep(resultPollingIntervalMs);
572            } else {
573              if (sendingJobsFinished) {
574                // all the jobs have been sent to hive, but non are to query any more (all finished or failed)
575                this.stopResultsPollingPending = true;
576              }
[4133]577            }
[4170]578          } while (!this.stopResultsPollingPending);
[4305]579        }
580        catch (ThreadInterruptedException exception) {
[4170]581          // thread has been interuppted
[4305]582        }
583        finally {
[4170]584          this.IsPollingResults = false;
585        }
586      });
587    }
[4121]588
[4173]589    #endregion
[4121]590
[4173]591    #region Snapshots
[4133]592
[4121]593    private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
594      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
595      jobItem.LatestSnapshot = response;
596    }
597
[4173]598    public void RequestSnapshot(Guid jobId) {
599      Thread t = new Thread(() => {
[4305]600        IClientFacade clientFacade = CreateStreamedClientFacade();
[4316]601        try {
602          ResponseObject<SerializedJob> response;
603          int retryCount = 0;
[4133]604
[4316]605          Response snapShotResponse = clientFacade.RequestSnapshot(jobId);
606          if (snapShotResponse.StatusMessage == ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated) {
607            // job already finished
608            Logger.Debug("HiveExperiment: Abort - GetLastResult(false)");
609            response = clientFacade.GetLastSerializedResult(jobId, false, false);
[4263]610            Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
[4316]611          } else {
612            // server sent snapshot request to client
613            // poll until snapshot is ready
614            do {
615              Thread.Sleep(snapshotPollingIntervalMs);
616              Logger.Debug("HiveExperiment: Abort - GetLastResult(true)");
617              response = clientFacade.GetLastSerializedResult(jobId, false, true);
618              Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
619              retryCount++;
620              // loop while
621              // 1. problem with communication with server
622              // 2. job result not yet ready
623            } while (
624              (retryCount < maxSnapshotRetries) && (
625              response.StatusMessage == ResponseStatus.GetLastSerializedResult_JobResultNotYetThere)
626              );
627          }
628          if (response.StatusMessage == ResponseStatus.Ok) {
629            LogMessage(jobId, "Snapshot polling successfull for job " + jobId);
630            UpdateSnapshot(response);
631          } else {
632            LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage);
633          }
[4173]634        }
[4316]635        finally {
636          ServiceLocator.DisposeClientFacade(clientFacade);
[4173]637        }
638      });
639      t.Start();
[4170]640    }
641
[4145]642    void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) {
643      JobItem jobItem = (JobItem)sender;
644      if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) {
645        RequestSnapshot(jobItem.JobDto.Id);
646      }
647    }
648
[4173]649    #endregion
[4141]650
[4116]651    #region Required Plugin Search
652    /// <summary>
653    /// Returns a list of plugins in which the type itself and all members
654    /// of the type are declared. Objectgraph is searched recursively.
655    /// </summary>
656    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
657      HashSet<Type> types = new HashSet<Type>();
658      FindTypes(type, types, "HeuristicLab.");
659      return GetDeclaringPlugins(types);
660    }
661
662    /// <summary>
663    /// Returns the plugins (including dependencies) in which the given types are declared
664    /// </summary>
665    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
666      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
667      foreach (Type t in types) {
668        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
669      }
670      return plugins;
671    }
672
673    /// <summary>
674    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
675    /// Also searches the dependencies recursively.
676    /// </summary>
677    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
678      if (!plugins.Contains(plugin)) {
679        plugins.Add(plugin);
680        foreach (IPluginDescription dependency in plugin.Dependencies) {
681          FindDeclaringPlugins(dependency, plugins);
682        }
683      }
684    }
685
686    /// <summary>
687    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
688    /// Be aware that search is not performed on attributes
689    /// </summary>
690    /// <param name="type">the type to be searched</param>
691    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
692    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
693    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
694      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
695        types.Add(type);
696
697        // constructors
698        foreach (ConstructorInfo info in type.GetConstructors()) {
699          foreach (ParameterInfo paramInfo in info.GetParameters()) {
700            FindTypes(paramInfo.ParameterType, types, namespaceStart);
701          }
702        }
703
704        // interfaces
705        foreach (Type t in type.GetInterfaces()) {
706          FindTypes(t, types, namespaceStart);
707        }
708
709        // events
710        foreach (EventInfo info in type.GetEvents()) {
711          FindTypes(info.EventHandlerType, types, namespaceStart);
712          FindTypes(info.DeclaringType, types, namespaceStart);
713        }
714
715        // properties
716        foreach (PropertyInfo info in type.GetProperties()) {
717          FindTypes(info.PropertyType, types, namespaceStart);
718        }
719
720        // fields
721        foreach (FieldInfo info in type.GetFields()) {
722          FindTypes(info.FieldType, types, namespaceStart);
723        }
724
725        // methods
726        foreach (MethodInfo info in type.GetMethods()) {
727          foreach (ParameterInfo paramInfo in info.GetParameters()) {
728            FindTypes(paramInfo.ParameterType, types, namespaceStart);
729          }
730          FindTypes(info.ReturnType, types, namespaceStart);
731        }
732      }
733    }
734    #endregion
[4119]735
736    #region Eventhandler
737
738    public event EventHandler ExecutionTimeChanged;
739    private void OnExecutionTimeChanged() {
740      EventHandler handler = ExecutionTimeChanged;
741      if (handler != null) handler(this, EventArgs.Empty);
742    }
743
744    public event EventHandler ExecutionStateChanged;
745    private void OnExecutionStateChanged() {
[4121]746      LogMessage("ExecutionState changed to " + executionState.ToString());
[4119]747      EventHandler handler = ExecutionStateChanged;
748      if (handler != null) handler(this, EventArgs.Empty);
749    }
750
751    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
752
753    public event EventHandler Started;
754    private void OnStarted() {
[4121]755      LogMessage("Started");
[4120]756      timer.Start();
[4119]757      EventHandler handler = Started;
758      if (handler != null) handler(this, EventArgs.Empty);
759    }
760
761    public event EventHandler Stopped;
762    private void OnStopped() {
[4120]763      timer.Stop();
[4121]764      LogMessage("Stopped");
[4119]765      EventHandler handler = Stopped;
766      if (handler != null) handler(this, EventArgs.Empty);
767    }
768
769    public event EventHandler Paused;
770    private void OnPaused() {
[4120]771      timer.Stop();
[4121]772      LogMessage("Paused");
[4119]773      EventHandler handler = Paused;
774      if (handler != null) handler(this, EventArgs.Empty);
775    }
776
777    public event EventHandler Prepared;
778    protected virtual void OnPrepared() {
[4121]779      LogMessage("Prepared");
[4119]780      EventHandler handler = Prepared;
781      if (handler != null) handler(this, EventArgs.Empty);
782    }
783
784    public event EventHandler ResourceIdsChanged;
785    protected virtual void OnResourceIdsChanged() {
786      EventHandler handler = ResourceIdsChanged;
787      if (handler != null) handler(this, EventArgs.Empty);
788    }
789
790    public event EventHandler ExperimentChanged;
791    protected virtual void OnExperimentChanged() {
[4121]792      LogMessage("Experiment changed");
[4119]793      EventHandler handler = ExperimentChanged;
794      if (handler != null) handler(this, EventArgs.Empty);
795    }
796
797    public event EventHandler ServerUrlChanged;
798    protected virtual void OnServerUrlChanged() {
799      EventHandler handler = ServerUrlChanged;
800      if (handler != null) handler(this, EventArgs.Empty);
801    }
802
[4133]803    public event EventHandler IsResultsPollingChanged;
804    private void OnIsPollingResultsChanged() {
805      if (this.IsPollingResults) {
806        LogMessage("Results Polling Started");
807        timer.Start();
808      } else {
809        LogMessage("Results Polling Stopped");
810        timer.Stop();
811      }
812      EventHandler handler = IsResultsPollingChanged;
813      if (handler != null) handler(this, EventArgs.Empty);
814    }
[4145]815
[4170]816    private void RegisterJobItemListEvents() {
[4145]817      jobItems.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
818      jobItems.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
819      jobItems.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
820      jobItems.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
[4173]821      foreach (JobItem jobItem in jobItems) {
822        RegisterJobItemEvents(jobItem);
823      }
[4145]824    }
825
[4170]826    private void DeregisterJobItemListEvents() {
827      jobItems.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
828      jobItems.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
829      jobItems.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
830      jobItems.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
[4173]831      foreach (JobItem jobItem in jobItems) {
832        DeregisterJobItemEvents(jobItem);
833      }
[4145]834    }
835
836    void jobItems_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
[4170]837      UpdateJobItemEvents(e);
[4145]838    }
839
[4170]840    private void UpdateJobItemEvents(CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
841      if (e.OldItems != null) {
842        foreach (var item in e.OldItems) {
[4173]843          DeregisterJobItemEvents(item.Value);
[4170]844        }
[4145]845      }
[4170]846      if (e.Items != null) {
847        foreach (var item in e.Items) {
[4173]848          RegisterJobItemEvents(item.Value);
[4170]849        }
[4145]850      }
851    }
852
[4173]853    private void RegisterJobItemEvents(JobItem jobItem) {
854      jobItem.SnapshotRequestedStateChanged += new EventHandler(JobItem_SnapshotRequestedStateChanged);
855      jobItem.JobStateChanged += new EventHandler(JobItem_JobStateChanged);
856    }
857
858    private void DeregisterJobItemEvents(JobItem jobItem) {
859      jobItem.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
860      jobItem.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
861    }
862
[4145]863    void jobItems_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
[4170]864      UpdateJobItemEvents(e);
[4145]865    }
866
867    void jobItems_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
[4170]868      UpdateJobItemEvents(e);
[4145]869    }
870
871    void jobItems_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
872      foreach (var item in e.OldItems) {
[4170]873        item.Value.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
[4145]874        item.Value.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
875      }
876    }
[4119]877    #endregion
[4173]878
879    #region Helper Functions
[4305]880    private IClientFacade CreateClientFacade() {
[4302]881      IClientFacade clientFacade = null;
[4173]882      do {
883        try {
[4316]884          //clientFacade = ServiceLocator.CreateClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientServiceName));
885          clientFacade = ServiceLocator.CreateClientFacade(Settings.Default.HiveServerIp);
[4305]886
[4173]887        } catch (EndpointNotFoundException exception) {
888          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
889          Thread.Sleep(resultPollingIntervalMs);
890        }
[4302]891      } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
892      return clientFacade;
[4173]893    }
894
[4305]895    private IClientFacade CreateStreamedClientFacade() {
896      IClientFacade clientFacade = null;
897      do {
898        try {
[4316]899          //clientFacade = ServiceLocator.CreateStreamedClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientStreamedServiceName));
900          clientFacade = ServiceLocator.CreateStreamedClientFacade(Settings.Default.HiveServerIp);
[4305]901        } catch (EndpointNotFoundException exception) {
902          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
903          Thread.Sleep(resultPollingIntervalMs);
904        }
905      } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
906      return clientFacade;
907    }
908
[4173]909    private void AddRange(IDictionary<IOptimizer, IOptimizer> optimizers, IDictionary<IOptimizer, IOptimizer> childs) {
910      foreach (KeyValuePair<IOptimizer, IOptimizer> kvp in childs) {
911        optimizers.Add(kvp);
912      }
913    }
914
915    #endregion
916
917    #region Logging
918    private void LogMessage(string message) {
919      // HeuristicLab.Log is not Thread-Safe, so lock on every call
920      lock (locker) {
921        log.LogMessage(message);
922      }
923    }
924
925    private void LogMessage(Guid jobId, string message) {
926      GetJobItemById(jobId).LogMessage(message);
927      LogMessage(message + " (jobId: " + jobId + ")");
928    }
929
930    #endregion
[4116]931  }
932}
Note: See TracBrowser for help on using the repository browser.