Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4342

Last change on this file since 4342 was 4342, checked in by cneumuel, 14 years ago

added connection setup dialog

File size: 35.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Linq;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using HeuristicLab.Optimization;
28using System.Drawing;
29using HeuristicLab.Collections;
30using System.Collections.Generic;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using HeuristicLab.Persistence.Default.Xml;
34using HeuristicLab.PluginInfrastructure;
35using System.Reflection;
36using HeuristicLab.Hive.Contracts.Interfaces;
37using HeuristicLab.Hive.Contracts;
38using System.Threading;
39using HeuristicLab.Tracing;
40using HeuristicLab.Hive.JobBase;
41using System.Diagnostics;
42using System.Collections;
43using System.ServiceModel;
44using HeuristicLab.Hive.Contracts.ResponseObjects;
45using HeuristicLab.Hive.Experiment.Properties;
46using System.ComponentModel;
47
48namespace HeuristicLab.Hive.Experiment {
49  /// <summary>
50  /// An experiment which contains multiple batch runs of algorithms.
51  /// </summary>
52  [Item(itemName, itemDescription)]
53  [Creatable("Testing & Analysis")]
54  [StorableClass]
55  public class HiveExperiment : NamedItem, IExecutable {
56    private const string itemName = "Hive Experiment";
57    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
58    private const int resultPollingIntervalMs = 5000;
59    private const int snapshotPollingIntervalMs = 1000;
60    private const int maxSnapshotRetries = 20;
61    private object locker = new object();
62
63    private System.Timers.Timer timer;
64    private bool pausePending, stopPending;
65    private bool sendingJobsFinished = false;
66
67    // ensure that only 2 threads can fetch jobresults simultaniously
68    private Semaphore fetchJobSemaphore = new Semaphore(2, 2);
69
70    private static object pendingOptimizerMappingsLocker = new object();
71
72    private bool stopResultsPollingPending = false;
73
74    private Thread resultPollingThread;
75
76    private bool isPollingResults;
77    public bool IsPollingResults {
78      get { return isPollingResults; }
79      private set {
80        if (isPollingResults != value) {
81          isPollingResults = value;
82          OnIsPollingResultsChanged();
83        }
84      }
85    }
86
87    public IEnumerable<string> ResourceGroups {
88      get {
89        if (!string.IsNullOrEmpty(resourceIds)) {
90          return resourceIds.Split(';');
91        } else {
92          return new List<string>();
93        }
94      }
95    }
96
97    #region Storable Properties
98    [Storable]
99    private DateTime lastUpdateTime;
100
101    /// <summary>
102    /// Mapping from JobId to an optimizer.
103    /// Stores all pending optimizers. If an optimizer is finished it is removed from this collection
104    /// </summary>
105    [Storable]
106    private IDictionary<Guid, IOptimizer> pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
107
108    /// <summary>
109    /// Stores a mapping from the child-optimizer to the parent optimizer.
110    /// Needed to replace a finished optimizer in the optimizer-tree.
111    /// Only pending optmizers are stored.
112    /// </summary>
113    [Storable]
114    private IDictionary<IOptimizer, IOptimizer> parentOptimizersByPendingOptimizer = new Dictionary<IOptimizer, IOptimizer>();
115
116    [Storable]
117    private JobItemList jobItems;
118    public JobItemList JobItems {
119      get { return jobItems; }
120    }
121
122    [Storable]
123    private string resourceIds;
124    public string ResourceIds {
125      get { return resourceIds; }
126      set {
127        if (resourceIds != value) {
128          resourceIds = value;
129          OnResourceIdsChanged();
130        }
131      }
132    }
133
134    [Storable]
135    private HeuristicLab.Optimization.Experiment experiment;
136    public HeuristicLab.Optimization.Experiment Experiment {
137      get { return experiment; }
138      set {
139        if (experiment != value) {
140          experiment = value;
141          OnExperimentChanged();
142        }
143      }
144    }
145
146    [Storable]
147    private ILog log;
148    public ILog Log {
149      get { return log; }
150    }
151
152    [Storable]
153    private Core.ExecutionState executionState;
154    public ExecutionState ExecutionState {
155      get { return executionState; }
156      private set {
157        if (executionState != value) {
158          executionState = value;
159          OnExecutionStateChanged();
160        }
161      }
162    }
163
164    [Storable]
165    private TimeSpan executionTime;
166    public TimeSpan ExecutionTime {
167      get { return executionTime; }
168      private set {
169        if (executionTime != value) {
170          executionTime = value;
171          OnExecutionTimeChanged();
172        }
173      }
174    }
175    #endregion
176
177    [StorableConstructor]
178    public HiveExperiment(bool deserializing)
179      : base(deserializing) {
180    }
181
182    public HiveExperiment()
183      : base(itemName, itemDescription) {
184      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
185      this.log = new Log();
186      pausePending = stopPending = false;
187      jobItems = new JobItemList();
188      isPollingResults = false;
189      RegisterJobItemListEvents();
190      InitTimer();
191    }
192
193    public override IDeepCloneable Clone(Cloner cloner) {
194      LogMessage("I am beeing cloned");
195      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
196      clone.resourceIds = this.resourceIds;
197      clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment);
198      clone.executionState = this.executionState;
199      clone.executionTime = this.executionTime;
200      clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
201
202      lock (pendingOptimizerMappingsLocker) {
203        foreach (var pair in this.pendingOptimizersByJobId)
204          clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
205
206        foreach (var pair in this.parentOptimizersByPendingOptimizer)
207          clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);
208      }
209      clone.log = (ILog)cloner.Clone(log);
210      clone.stopPending = this.stopPending;
211      clone.pausePending = this.pausePending;
212      clone.jobItems.AddRange((JobItemList)cloner.Clone(jobItems));
213      clone.lastUpdateTime = this.lastUpdateTime;
214      clone.isPollingResults = this.isPollingResults;
215      return clone;
216    }
217
218    [StorableHook(HookType.AfterDeserialization)]
219    private void AfterDeserialization() {
220      InitTimer();
221      this.IsPollingResults = false;
222      this.stopResultsPollingPending = false;
223      RegisterJobItemListEvents();
224      LogMessage("I was deserialized.");
225    }
226
227    #region Execution Time Timer
228    private void InitTimer() {
229      timer = new System.Timers.Timer(100);
230      timer.AutoReset = true;
231      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
232    }
233
234    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
235      DateTime now = DateTime.Now;
236      ExecutionTime += now - lastUpdateTime;
237      lastUpdateTime = now;
238    }
239    #endregion
240
241    #region IExecutable Methods
242    public void Pause() {
243      throw new NotSupportedException();
244    }
245
246    public void Prepare() {
247      if (experiment != null) {
248        StopResultPolling();
249        lock (pendingOptimizerMappingsLocker) {
250          pendingOptimizersByJobId.Clear();
251          parentOptimizersByPendingOptimizer.Clear();
252        }
253        lock (jobItems) {
254          jobItems.Clear();
255        }
256        experiment.Prepare();
257        this.ExecutionState = Core.ExecutionState.Prepared;
258        OnPrepared();
259      }
260    }
261
262    public void Start() {
263      sendingJobsFinished = false;
264      OnStarted();
265      ExecutionTime = new TimeSpan();
266      lastUpdateTime = DateTime.Now;
267      this.ExecutionState = Core.ExecutionState.Started;
268      StartResultPolling();
269
270      Thread t = new Thread(() => {
271        IClientFacade clientFacade = CreateStreamedClientFacade();
272
273        try {
274          pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
275
276          LogMessage("Extracting jobs from Experiment");
277          parentOptimizersByPendingOptimizer = GetOptimizers(true);
278          LogMessage("Extraction of jobs from Experiment finished");
279
280          IEnumerable<string> groups = ResourceGroups;
281          lock (pendingOptimizerMappingsLocker) {
282            foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) {
283              SerializedJob serializedJob = CreateSerializedJob(optimizer);
284              ResponseObject<JobDto> response = clientFacade.AddJobWithGroupStrings(serializedJob, groups);
285              pendingOptimizersByJobId.Add(response.Obj.Id, optimizer);
286
287              JobItem jobItem = new JobItem() {
288                JobDto = response.Obj,
289                LatestSnapshot = null,
290                Optimizer = optimizer
291              };
292              lock (jobItems) {
293                jobItems.Add(jobItem);
294              }
295              LogMessage(jobItem.JobDto.Id, "Job sent to Hive");
296            }
297          }
298        }
299        catch (Exception e) {
300          LogMessage("Error: Starting HiveExperiment failed: " + e.Message);
301          this.ExecutionState = Core.ExecutionState.Stopped;
302          OnStopped();
303        }
304        finally {
305          ServiceLocator.DisposeClientFacade(clientFacade);
306        }
307        sendingJobsFinished = true;
308      });
309      t.Start();
310    }
311
312    public void Stop() {
313      this.ExecutionState = Core.ExecutionState.Stopped;
314      foreach (JobItem jobItem in jobItems) {
315        AbortJob(jobItem.JobDto.Id);
316      }
317      OnStopped();
318    }
319    #endregion
320
321    #region Optimizier Management
322    /// <summary>
323    /// Returns all optimizers in the current Experiment
324    /// </summary>
325    /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
326    /// <returns></returns>
327    private IDictionary<IOptimizer, IOptimizer> GetOptimizers(bool flatout) {
328      if (!flatout) {
329        var optimizers = new Dictionary<IOptimizer, IOptimizer>();
330        foreach (IOptimizer opt in experiment.Optimizers) {
331          optimizers.Add(experiment, opt);
332        }
333        return optimizers;
334      } else {
335        return FlatOptimizerTree(null, experiment, "");
336      }
337    }
338
339    /// <summary>
340    /// Recursively iterates all IOptimizers in the optimizer-tree and returns them.
341    ///
342    /// [chn] this could be implemented more cleanly if Experiment and BatchRun would implement an interface like:
343    /// interface IParallelizable {
344    ///   IEnumerable&lt;IOptimizer&gt; GetOptimizers();
345    /// }
346    /// </summary>
347    /// <returns>a dictionary mapping from the parent optimizer to the child optimizer</returns>
348    private IDictionary<IOptimizer, IOptimizer> FlatOptimizerTree(IOptimizer parent, IOptimizer optimizer, string prepend) {
349      IDictionary<IOptimizer, IOptimizer> optimizers = new Dictionary<IOptimizer, IOptimizer>();
350      if (optimizer is HeuristicLab.Optimization.Experiment) {
351        HeuristicLab.Optimization.Experiment experiment = optimizer as HeuristicLab.Optimization.Experiment;
352        if (this.experiment != experiment) {
353          prepend += experiment.Name + "/"; // don't prepend for top-level optimizers
354        }
355        foreach (IOptimizer opt in experiment.Optimizers) {
356          AddRange(optimizers, FlatOptimizerTree(experiment, opt, prepend));
357        }
358      } else if (optimizer is BatchRun) {
359        BatchRun batchRun = optimizer as BatchRun;
360        prepend += batchRun.Name + "/";
361        for (int i = 0; i < batchRun.Repetitions; i++) {
362          IOptimizer opt = (IOptimizer)batchRun.Algorithm.Clone();
363          opt.Name += " [" + i + "]";
364          IDictionary<IOptimizer, IOptimizer> batchOptimizers = FlatOptimizerTree(batchRun, opt, prepend);
365          AddRange(optimizers, batchOptimizers);
366        }
367      } else if (optimizer is EngineAlgorithm) {
368        optimizer.Name = prepend + optimizer.Name;
369        optimizers.Add(optimizer, parent);
370        LogMessage("Optimizer extracted: " + optimizer.Name);
371      } else {
372        Logger.Warn("Optimizer of type " + optimizers.GetType().ToString() + " unknown");
373        optimizer.Name = prepend + optimizer.Name;
374        optimizers.Add(optimizer, parent);
375        LogMessage("Optimizer extracted: " + optimizer.Name);
376      }
377      return optimizers;
378    }
379
380    private void ReplaceOptimizer(IOptimizer parentOptimizer, IOptimizer originalOptimizer, IOptimizer newOptimizer) {
381      lock (locker) {
382        if (parentOptimizer is HeuristicLab.Optimization.Experiment) {
383          HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)parentOptimizer;
384          int originalOptimizerIndex = exp.Optimizers.IndexOf(originalOptimizer);
385          exp.Optimizers[originalOptimizerIndex] = newOptimizer;
386        } else if (parentOptimizer is BatchRun) {
387          BatchRun batchRun = (BatchRun)parentOptimizer;
388          if (newOptimizer is IAlgorithm) {
389            batchRun.Runs.Add(new Run(newOptimizer.Name, (IAlgorithm)newOptimizer));
390          } else {
391            throw new NotSupportedException("Only IAlgorithm types supported");
392          }
393        } else {
394          throw new NotSupportedException("Invalid parentOptimizer");
395        }
396      }
397    }
398
399    private bool NoMorePendingOptimizers() {
400      lock (pendingOptimizerMappingsLocker) {
401        return pendingOptimizersByJobId.Count == 0;
402      }
403    }
404
405    /// <summary>
406    /// Removes optimizers from
407    ///  - parentOptimizersByPendingOptimizer
408    ///  - pendingOptimizersByJobId
409    /// </summary>
410    /// <param name="jobId"></param>
411    private void DisposeOptimizerMappings(Guid jobId) {
412      LogMessage(jobId, "Disposing Optimizer Mappings");
413      lock (pendingOptimizerMappingsLocker) {
414        parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]);
415        pendingOptimizersByJobId.Remove(jobId);
416      }
417    }
418
419    #endregion
420
421    #region Job Management
422    /// <summary>
423    /// Updates all JobItems with the results
424    /// </summary>
425    /// <param name="jobResultList"></param>
426    private void UpdateJobItems(JobResultList jobResultList) {
427      // use a Dict to avoid quadratic runtime complexity
428      IDictionary<Guid, JobResult> jobResultDict = jobResultList.ToDictionary(job => job.JobId);
429      lock (jobItems) {
430        foreach (JobItem jobItem in JobItems) {
431          if (jobResultDict.ContainsKey(jobItem.JobDto.Id)) {
432            jobItem.JobResult = jobResultDict[jobItem.JobDto.Id];
433          }
434        }
435      }
436    }
437
438    private void JobItem_JobStateChanged(object sender, EventArgs e) {
439      JobItem jobItem = (JobItem)sender;
440
441      Thread t = new Thread(() => {
442        try {
443          if (jobItem.State == JobState.Finished) {
444            FetchAndUpdateJob(jobItem.JobDto.Id);
445            DisposeOptimizerMappings(jobItem.JobDto.Id);
446          } else if (jobItem.State == JobState.Failed) {
447            DisposeOptimizerMappings(jobItem.JobDto.Id);
448          }
449
450          if (NoMorePendingOptimizers()) {
451            StopResultPolling();
452            this.ExecutionState = Core.ExecutionState.Stopped;
453            OnStopped();
454          }
455        }
456        catch (Exception ex) {
457          Logger.Error("JobItem_JobStateChanged failed badly: " + ex.Message);
458          LogMessage("JobItem_JobStateChanged failed badly: " + ex.Message);
459        }
460      });
461      t.Start();
462    }
463
464    /// <summary>
465    /// Fetches the finished job from the server and updates the jobItem
466    /// </summary>
467    private void FetchAndUpdateJob(Guid jobId) {
468      bool tryagain = false;
469      LogMessage(jobId, "FetchAndUpdateJob started");
470      if (fetchJobSemaphore.WaitOne(new TimeSpan(0, 2, 0))) {
471        IClientFacade clientFacade = null;
472        try {
473          clientFacade = CreateStreamedClientFacade();
474          IOptimizer originalOptimizer;
475          originalOptimizer = pendingOptimizersByJobId[jobId];
476
477          ResponseObject<SerializedJob> jobResponse = clientFacade.GetLastSerializedResult(jobId, false, false);
478          IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
479          IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
480          ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer);
481          LogMessage(jobId, "FetchAndUpdateJob ended");
482        }
483        catch (Exception e) {
484          LogMessage(jobId, "FetchAndUpdateJob failed: " + e.Message + ". Will try again!");
485          tryagain = true;
486        }
487        finally {
488          ServiceLocator.DisposeClientFacade(clientFacade);
489          fetchJobSemaphore.Release();
490        }
491      } else {
492        LogMessage(jobId, "FetchAndUpdateJob timed out. Will try again!");
493        tryagain = true;
494      }
495
496      if (tryagain) {
497        FetchAndUpdateJob(jobId);
498      }
499    }
500
501    private void UpdateJobItem(JobDto jobDto) {
502      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);
503      jobItem.JobDto = jobDto;
504    }
505
506    public void AbortJob(Guid jobId) {
507      IClientFacade clientFacade = CreateClientFacade();
508      Response response = clientFacade.AbortJob(jobId);
509      LogMessage(jobId, "Aborting Job: " + response.StatusMessage);
510    }
511
512    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
513      IJob job = new OptimizerJob() {
514        Optimizer = optimizer
515      };
516
517      // serialize job
518      MemoryStream memStream = new MemoryStream();
519      XmlGenerator.Serialize(job, memStream);
520      byte[] jobByteArray = memStream.ToArray();
521      memStream.Dispose();
522
523      // find out which which plugins are needed for the given object
524      List<HivePluginInfoDto> pluginsNeeded = (
525        from p in GetDeclaringPlugins(job.GetType())
526        select new HivePluginInfoDto() {
527          Name = p.Name,
528          Version = p.Version
529        }).ToList();
530
531      JobDto jobDto = new JobDto() {
532        CoresNeeded = 1, // [chn] how to determine real cores needed?
533        PluginsNeeded = pluginsNeeded,
534        State = JobState.Offline,
535        MemoryNeeded = 0
536      };
537
538      SerializedJob serializedJob = new SerializedJob() {
539        JobInfo = jobDto,
540        SerializedJobData = jobByteArray
541      };
542
543      return serializedJob;
544    }
545
546    private JobItem GetJobItemById(Guid jobId) {
547      return jobItems.Single(x => x.JobDto.Id == jobId);
548    }
549    #endregion
550
551    #region Result Polling
552    public void StartResultPolling() {
553      this.stopResultsPollingPending = false;
554      this.IsPollingResults = true;
555      resultPollingThread = CreateResultPollingThread();
556      if (resultPollingThread.ThreadState != System.Threading.ThreadState.Running)
557        resultPollingThread.Start();
558    }
559
560    public void StopResultPolling() {
561      this.stopResultsPollingPending = true;
562      if (resultPollingThread != null && resultPollingThread.ThreadState == System.Threading.ThreadState.WaitSleepJoin) {
563        resultPollingThread.Interrupt();
564      }
565      this.stopResultsPollingPending = false;
566    }
567
568    private Thread CreateResultPollingThread() {
569      return new Thread(() => {
570        try {
571          do {
572            IClientFacade clientFacade = CreateStreamedClientFacade();
573            IEnumerable<Guid> jobIdsToQuery = from job in JobItems
574                                              where job.State != JobState.Finished &&
575                                              job.State != JobState.Failed
576                                              select job.JobDto.Id;
577            if (jobIdsToQuery.Count() > 0) {
578              LogMessage("Polling results for " + jobIdsToQuery.Count() + " jobs");
579              try {
580                ResponseObject<JobResultList> response = clientFacade.GetJobResults(jobIdsToQuery);
581                if (response.StatusMessage == ResponseStatus.Ok) {
582                  JobResultList jobItemList = response.Obj;
583                  UpdateJobItems(jobItemList);
584
585                  LogMessage("Polling successfully finished");
586                } else {
587                  throw new Exception(response.StatusMessage.ToString());
588                }
589              }
590              catch (Exception e) {
591                LogMessage("Polling results failed: " + e.Message);
592              }
593              finally {
594                ServiceLocator.DisposeClientFacade(clientFacade);
595              }
596              Thread.Sleep(resultPollingIntervalMs);
597            } else {
598              if (sendingJobsFinished) {
599                // all the jobs have been sent to hive, but non are to query any more (all finished or failed)
600                this.stopResultsPollingPending = true;
601              }
602            }
603          } while (!this.stopResultsPollingPending);
604        }
605        catch (ThreadInterruptedException exception) {
606          // thread has been interuppted
607        }
608        catch (Exception e) {
609          LogMessage("Result Polling Thread failed badly: " + e.Message);
610          Logger.Error("Result Polling Thread failed badly: " + e.Message);
611        }
612        finally {
613          this.IsPollingResults = false;
614        }
615      });
616    }
617
618    #endregion
619
620    #region Snapshots
621
622    private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
623      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
624      jobItem.LatestSnapshot = response;
625    }
626
627    public void RequestSnapshot(Guid jobId) {
628      Thread t = new Thread(() => {
629        IClientFacade clientFacade = null;
630        try {
631          clientFacade = CreateStreamedClientFacade();
632
633          ResponseObject<SerializedJob> response;
634          int retryCount = 0;
635
636          Response snapShotResponse = clientFacade.RequestSnapshot(jobId);
637          if (snapShotResponse.StatusMessage == ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated) {
638            // job already finished
639            Logger.Debug("HiveExperiment: Abort - GetLastResult(false)");
640            response = clientFacade.GetLastSerializedResult(jobId, false, false);
641            Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
642          } else {
643            // server sent snapshot request to client
644            // poll until snapshot is ready
645            do {
646              Thread.Sleep(snapshotPollingIntervalMs);
647              Logger.Debug("HiveExperiment: Abort - GetLastResult(true)");
648              response = clientFacade.GetLastSerializedResult(jobId, false, true);
649              Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
650              retryCount++;
651              // loop while
652              // 1. problem with communication with server
653              // 2. job result not yet ready
654            } while (
655              (retryCount < maxSnapshotRetries) && (
656              response.StatusMessage == ResponseStatus.GetLastSerializedResult_JobResultNotYetThere)
657              );
658          }
659          if (response.StatusMessage == ResponseStatus.Ok) {
660            LogMessage(jobId, "Snapshot polling successfull for job " + jobId);
661            UpdateSnapshot(response);
662          } else {
663            LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage);
664          }
665        }
666        catch (Exception e) {
667          LogMessage("RequestSnapshot Thread failed badly: " + e.Message);
668          Logger.Error("RequestSnapshot Thread failed badly: " + e.Message);
669        }
670        finally {
671          ServiceLocator.DisposeClientFacade(clientFacade);
672        }
673      });
674      t.Start();
675    }
676
677    void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) {
678      JobItem jobItem = (JobItem)sender;
679      if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) {
680        RequestSnapshot(jobItem.JobDto.Id);
681      }
682    }
683
684    #endregion
685
686    #region Required Plugin Search
687    /// <summary>
688    /// Returns a list of plugins in which the type itself and all members
689    /// of the type are declared. Objectgraph is searched recursively.
690    /// </summary>
691    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
692      HashSet<Type> types = new HashSet<Type>();
693      FindTypes(type, types, "HeuristicLab.");
694      return GetDeclaringPlugins(types);
695    }
696
697    /// <summary>
698    /// Returns the plugins (including dependencies) in which the given types are declared
699    /// </summary>
700    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
701      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
702      foreach (Type t in types) {
703        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
704      }
705      return plugins;
706    }
707
708    /// <summary>
709    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
710    /// Also searches the dependencies recursively.
711    /// </summary>
712    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
713      if (!plugins.Contains(plugin)) {
714        plugins.Add(plugin);
715        foreach (IPluginDescription dependency in plugin.Dependencies) {
716          FindDeclaringPlugins(dependency, plugins);
717        }
718      }
719    }
720
721    /// <summary>
722    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
723    /// Be aware that search is not performed on attributes
724    /// </summary>
725    /// <param name="type">the type to be searched</param>
726    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
727    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
728    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
729      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
730        types.Add(type);
731
732        // constructors
733        foreach (ConstructorInfo info in type.GetConstructors()) {
734          foreach (ParameterInfo paramInfo in info.GetParameters()) {
735            FindTypes(paramInfo.ParameterType, types, namespaceStart);
736          }
737        }
738
739        // interfaces
740        foreach (Type t in type.GetInterfaces()) {
741          FindTypes(t, types, namespaceStart);
742        }
743
744        // events
745        foreach (EventInfo info in type.GetEvents()) {
746          FindTypes(info.EventHandlerType, types, namespaceStart);
747          FindTypes(info.DeclaringType, types, namespaceStart);
748        }
749
750        // properties
751        foreach (PropertyInfo info in type.GetProperties()) {
752          FindTypes(info.PropertyType, types, namespaceStart);
753        }
754
755        // fields
756        foreach (FieldInfo info in type.GetFields()) {
757          FindTypes(info.FieldType, types, namespaceStart);
758        }
759
760        // methods
761        foreach (MethodInfo info in type.GetMethods()) {
762          foreach (ParameterInfo paramInfo in info.GetParameters()) {
763            FindTypes(paramInfo.ParameterType, types, namespaceStart);
764          }
765          FindTypes(info.ReturnType, types, namespaceStart);
766        }
767      }
768    }
769    #endregion
770
771    #region Eventhandler
772
773    public event EventHandler ExecutionTimeChanged;
774    private void OnExecutionTimeChanged() {
775      EventHandler handler = ExecutionTimeChanged;
776      if (handler != null) handler(this, EventArgs.Empty);
777    }
778
779    public event EventHandler ExecutionStateChanged;
780    private void OnExecutionStateChanged() {
781      LogMessage("ExecutionState changed to " + executionState.ToString());
782      EventHandler handler = ExecutionStateChanged;
783      if (handler != null) handler(this, EventArgs.Empty);
784    }
785
786    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
787
788    public event EventHandler Started;
789    private void OnStarted() {
790      LogMessage("Started");
791      timer.Start();
792      EventHandler handler = Started;
793      if (handler != null) handler(this, EventArgs.Empty);
794    }
795
796    public event EventHandler Stopped;
797    private void OnStopped() {
798      timer.Stop();
799      LogMessage("Stopped");
800      EventHandler handler = Stopped;
801      if (handler != null) handler(this, EventArgs.Empty);
802    }
803
804    public event EventHandler Paused;
805    private void OnPaused() {
806      timer.Stop();
807      LogMessage("Paused");
808      EventHandler handler = Paused;
809      if (handler != null) handler(this, EventArgs.Empty);
810    }
811
812    public event EventHandler Prepared;
813    protected virtual void OnPrepared() {
814      LogMessage("Prepared");
815      EventHandler handler = Prepared;
816      if (handler != null) handler(this, EventArgs.Empty);
817    }
818
819    public event EventHandler ResourceIdsChanged;
820    protected virtual void OnResourceIdsChanged() {
821      EventHandler handler = ResourceIdsChanged;
822      if (handler != null) handler(this, EventArgs.Empty);
823    }
824
825    public event EventHandler ExperimentChanged;
826    protected virtual void OnExperimentChanged() {
827      LogMessage("Experiment changed");
828      EventHandler handler = ExperimentChanged;
829      if (handler != null) handler(this, EventArgs.Empty);
830    }
831
832    public event EventHandler IsResultsPollingChanged;
833    private void OnIsPollingResultsChanged() {
834      if (this.IsPollingResults) {
835        LogMessage("Results Polling Started");
836        timer.Start();
837      } else {
838        LogMessage("Results Polling Stopped");
839        timer.Stop();
840      }
841      EventHandler handler = IsResultsPollingChanged;
842      if (handler != null) handler(this, EventArgs.Empty);
843    }
844
845    private void RegisterJobItemListEvents() {
846      jobItems.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
847      jobItems.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
848      jobItems.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
849      jobItems.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
850      foreach (JobItem jobItem in jobItems) {
851        RegisterJobItemEvents(jobItem);
852      }
853    }
854
855    private void DeregisterJobItemListEvents() {
856      jobItems.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
857      jobItems.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
858      jobItems.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
859      jobItems.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
860      foreach (JobItem jobItem in jobItems) {
861        DeregisterJobItemEvents(jobItem);
862      }
863    }
864
865    void jobItems_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
866      UpdateJobItemEvents(e);
867    }
868
869    private void UpdateJobItemEvents(CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
870      if (e.OldItems != null) {
871        foreach (var item in e.OldItems) {
872          DeregisterJobItemEvents(item.Value);
873        }
874      }
875      if (e.Items != null) {
876        foreach (var item in e.Items) {
877          RegisterJobItemEvents(item.Value);
878        }
879      }
880    }
881
882    private void RegisterJobItemEvents(JobItem jobItem) {
883      jobItem.SnapshotRequestedStateChanged += new EventHandler(JobItem_SnapshotRequestedStateChanged);
884      jobItem.JobStateChanged += new EventHandler(JobItem_JobStateChanged);
885    }
886
887    private void DeregisterJobItemEvents(JobItem jobItem) {
888      jobItem.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
889      jobItem.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
890    }
891
892    void jobItems_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
893      UpdateJobItemEvents(e);
894    }
895
896    void jobItems_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
897      UpdateJobItemEvents(e);
898    }
899
900    void jobItems_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
901      foreach (var item in e.OldItems) {
902        item.Value.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
903        item.Value.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
904      }
905    }
906    #endregion
907
908    #region Helper Functions
909    private IClientFacade CreateClientFacade() {
910      IClientFacade clientFacade = null;
911      do {
912        try {
913          //clientFacade = ServiceLocator.CreateClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientServiceName));
914          clientFacade = ServiceLocator.CreateClientFacade(Settings.Default.HiveServerIp);
915
916        }
917        catch (EndpointNotFoundException exception) {
918          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
919          Thread.Sleep(resultPollingIntervalMs);
920        }
921      } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
922      return clientFacade;
923    }
924
925    private IClientFacade CreateStreamedClientFacade() {
926      IClientFacade clientFacade = null;
927      do {
928        try {
929          //clientFacade = ServiceLocator.CreateStreamedClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientStreamedServiceName));
930          clientFacade = ServiceLocator.CreateStreamedClientFacade(Settings.Default.HiveServerIp);
931        }
932        catch (EndpointNotFoundException exception) {
933          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
934          Thread.Sleep(resultPollingIntervalMs);
935        }
936      } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
937      return clientFacade;
938    }
939
940    private void AddRange(IDictionary<IOptimizer, IOptimizer> optimizers, IDictionary<IOptimizer, IOptimizer> childs) {
941      foreach (KeyValuePair<IOptimizer, IOptimizer> kvp in childs) {
942        optimizers.Add(kvp);
943      }
944    }
945
946    #endregion
947
948    #region Logging
949    private void LogMessage(string message) {
950      // HeuristicLab.Log is not Thread-Safe, so lock on every call
951      lock (locker) {
952        log.LogMessage(message);
953      }
954    }
955
956    private void LogMessage(Guid jobId, string message) {
957      GetJobItemById(jobId).LogMessage(message);
958      LogMessage(message + " (jobId: " + jobId + ")");
959    }
960
961    #endregion
962  }
963}
Note: See TracBrowser for help on using the repository browser.