Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4333

Last change on this file since 4333 was 4333, checked in by cneumuel, 12 years ago

added authorizationManager which checks for permission to specific jobs (#1168)

File size: 35.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Linq;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using HeuristicLab.Optimization;
28using System.Drawing;
29using HeuristicLab.Collections;
30using System.Collections.Generic;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using HeuristicLab.Persistence.Default.Xml;
34using HeuristicLab.PluginInfrastructure;
35using System.Reflection;
36using HeuristicLab.Hive.Contracts.Interfaces;
37using HeuristicLab.Hive.Contracts;
38using System.Threading;
39using HeuristicLab.Tracing;
40using HeuristicLab.Hive.JobBase;
41using System.Diagnostics;
42using System.Collections;
43using System.ServiceModel;
44using HeuristicLab.Hive.Contracts.ResponseObjects;
45using HeuristicLab.Hive.Experiment.Properties;
46
47namespace HeuristicLab.Hive.Experiment {
48  /// <summary>
49  /// An experiment which contains multiple batch runs of algorithms.
50  /// </summary>
51  [Item(itemName, itemDescription)]
52  [Creatable("Testing & Analysis")]
53  [StorableClass]
54  public class HiveExperiment : NamedItem, IExecutable {
55    private const string itemName = "Hive Experiment";
56    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
57    private const int resultPollingIntervalMs = 5000;
58    private const int snapshotPollingIntervalMs = 1000;
59    private const int maxSnapshotRetries = 20;
60    private object locker = new object();
61
62    private System.Timers.Timer timer;
63    private bool pausePending, stopPending;
64    private bool sendingJobsFinished = false;
65
66    // ensure that only 2 threads can fetch jobresults simultaniously
67    private Semaphore fetchJobSemaphore = new Semaphore(2, 2);
68
69    private static object pendingOptimizerMappingsLocker = new object();
70
71    private bool stopResultsPollingPending = false;
72
73    private Thread resultPollingThread;
74
75    private bool isPollingResults;
76    public bool IsPollingResults {
77      get { return isPollingResults; }
78      private set {
79        if (isPollingResults != value) {
80          isPollingResults = value;
81          OnIsPollingResultsChanged();
82        }
83      }
84    }
85
86    public IEnumerable<string> ResourceGroups {
87      get {
88        if (!string.IsNullOrEmpty(resourceIds)) {
89          return resourceIds.Split(';');
90        } else {
91          return new List<string>();
92        }
93      }
94    }
95
96    #region Storable Properties
97    [Storable]
98    private DateTime lastUpdateTime;
99
100    /// <summary>
101    /// Mapping from JobId to an optimizer.
102    /// Stores all pending optimizers. If an optimizer is finished it is removed from this collection
103    /// </summary>
104    [Storable]
105    private IDictionary<Guid, IOptimizer> pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
106
107    /// <summary>
108    /// Stores a mapping from the child-optimizer to the parent optimizer.
109    /// Needed to replace a finished optimizer in the optimizer-tree.
110    /// Only pending optmizers are stored.
111    /// </summary>
112    [Storable]
113    private IDictionary<IOptimizer, IOptimizer> parentOptimizersByPendingOptimizer = new Dictionary<IOptimizer, IOptimizer>();
114
115    [Storable]
116    private JobItemList jobItems;
117    public JobItemList JobItems {
118      get { return jobItems; }
119    }
120
121    [Storable]
122    private string resourceIds;
123    public string ResourceIds {
124      get { return resourceIds; }
125      set {
126        if (resourceIds != value) {
127          resourceIds = value;
128          OnResourceIdsChanged();
129        }
130      }
131    }
132
133    [Storable]
134    private HeuristicLab.Optimization.Experiment experiment;
135    public HeuristicLab.Optimization.Experiment Experiment {
136      get { return experiment; }
137      set {
138        if (experiment != value) {
139          experiment = value;
140          OnExperimentChanged();
141        }
142      }
143    }
144
145    [Storable]
146    private ILog log;
147    public ILog Log {
148      get { return log; }
149    }
150
151    [Storable]
152    private Core.ExecutionState executionState;
153    public ExecutionState ExecutionState {
154      get { return executionState; }
155      private set {
156        if (executionState != value) {
157          executionState = value;
158          OnExecutionStateChanged();
159        }
160      }
161    }
162
163    [Storable]
164    private TimeSpan executionTime;
165    public TimeSpan ExecutionTime {
166      get { return executionTime; }
167      private set {
168        if (executionTime != value) {
169          executionTime = value;
170          OnExecutionTimeChanged();
171        }
172      }
173    }
174    #endregion
175
176    [StorableConstructor]
177    public HiveExperiment(bool deserializing)
178      : base(deserializing) {
179    }
180
181    public HiveExperiment()
182      : base(itemName, itemDescription) {
183      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
184      this.log = new Log();
185      pausePending = stopPending = false;
186      jobItems = new JobItemList();
187      isPollingResults = false;
188      RegisterJobItemListEvents();
189      InitTimer();
190    }
191
192    public override IDeepCloneable Clone(Cloner cloner) {
193      LogMessage("I am beeing cloned");
194      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
195      clone.resourceIds = this.resourceIds;
196      clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment);
197      clone.executionState = this.executionState;
198      clone.executionTime = this.executionTime;
199      clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
200
201      lock (pendingOptimizerMappingsLocker) {
202        foreach (var pair in this.pendingOptimizersByJobId)
203          clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
204
205        foreach (var pair in this.parentOptimizersByPendingOptimizer)
206          clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);
207      }
208      clone.log = (ILog)cloner.Clone(log);
209      clone.stopPending = this.stopPending;
210      clone.pausePending = this.pausePending;
211      clone.jobItems.AddRange((JobItemList)cloner.Clone(jobItems));
212      clone.lastUpdateTime = this.lastUpdateTime;
213      clone.isPollingResults = this.isPollingResults;
214      return clone;
215    }
216
217    [StorableHook(HookType.AfterDeserialization)]
218    private void AfterDeserialization() {
219      InitTimer();
220      this.IsPollingResults = false;
221      this.stopResultsPollingPending = false;
222      RegisterJobItemListEvents();
223      LogMessage("I was deserialized.");
224    }
225
226    #region Execution Time Timer
227    private void InitTimer() {
228      timer = new System.Timers.Timer(100);
229      timer.AutoReset = true;
230      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
231    }
232
233    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
234      DateTime now = DateTime.Now;
235      ExecutionTime += now - lastUpdateTime;
236      lastUpdateTime = now;
237    }
238    #endregion
239
240    #region IExecutable Methods
241    public void Pause() {
242      throw new NotSupportedException();
243    }
244
245    public void Prepare() {
246      if (experiment != null) {
247        StopResultPolling();
248        lock (pendingOptimizerMappingsLocker) {
249          pendingOptimizersByJobId.Clear();
250          parentOptimizersByPendingOptimizer.Clear();
251        }
252        lock (jobItems) {
253          jobItems.Clear();
254        }
255        experiment.Prepare();
256        this.ExecutionState = Core.ExecutionState.Prepared;
257        OnPrepared();
258      }
259    }
260
261    public void Start() {
262      sendingJobsFinished = false;
263      OnStarted();
264      ExecutionTime = new TimeSpan();
265      lastUpdateTime = DateTime.Now;
266      this.ExecutionState = Core.ExecutionState.Started;
267      StartResultPolling();
268
269      Thread t = new Thread(() => {
270        IClientFacade clientFacade = CreateStreamedClientFacade();
271
272        try {
273          pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
274
275          LogMessage("Extracting jobs from Experiment");
276          parentOptimizersByPendingOptimizer = GetOptimizers(true);
277          LogMessage("Extraction of jobs from Experiment finished");
278
279          IEnumerable<string> groups = ResourceGroups;
280          lock (pendingOptimizerMappingsLocker) {
281            foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) {
282              SerializedJob serializedJob = CreateSerializedJob(optimizer);
283              ResponseObject<JobDto> response = clientFacade.AddJobWithGroupStrings(serializedJob, groups);
284              pendingOptimizersByJobId.Add(response.Obj.Id, optimizer);
285
286              JobItem jobItem = new JobItem() {
287                JobDto = response.Obj,
288                LatestSnapshot = null,
289                Optimizer = optimizer
290              };
291              lock (jobItems) {
292                jobItems.Add(jobItem);
293              }
294              LogMessage(jobItem.JobDto.Id, "Job sent to Hive");
295            }
296          }
297        }
298        catch (Exception e) {
299          LogMessage("Error: Starting HiveExperiment failed: " + e.Message);
300          this.ExecutionState = Core.ExecutionState.Stopped;
301          OnStopped();
302        }
303        finally {
304          ServiceLocator.DisposeClientFacade(clientFacade);
305        }
306
307        sendingJobsFinished = true;
308      });
309      t.Start();
310    }
311
312    public void Stop() {
313      this.ExecutionState = Core.ExecutionState.Stopped;
314      foreach (JobItem jobItem in jobItems) {
315        AbortJob(jobItem.JobDto.Id);
316      }
317      OnStopped();
318    }
319    #endregion
320
321    #region Optimizier Management
322    /// <summary>
323    /// Returns all optimizers in the current Experiment
324    /// </summary>
325    /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
326    /// <returns></returns>
327    private IDictionary<IOptimizer, IOptimizer> GetOptimizers(bool flatout) {
328      if (!flatout) {
329        var optimizers = new Dictionary<IOptimizer, IOptimizer>();
330        foreach (IOptimizer opt in experiment.Optimizers) {
331          optimizers.Add(experiment, opt);
332        }
333        return optimizers;
334      } else {
335        return FlatOptimizerTree(null, experiment, "");
336      }
337    }
338
339    /// <summary>
340    /// Recursively iterates all IOptimizers in the optimizer-tree and returns them.
341    ///
342    /// [chn] this could be implemented more cleanly if Experiment and BatchRun would implement an interface like:
343    /// interface IParallelizable {
344    ///   IEnumerable&lt;IOptimizer&gt; GetOptimizers();
345    /// }
346    /// </summary>
347    /// <returns>a dictionary mapping from the parent optimizer to the child optimizer</returns>
348    private IDictionary<IOptimizer, IOptimizer> FlatOptimizerTree(IOptimizer parent, IOptimizer optimizer, string prepend) {
349      IDictionary<IOptimizer, IOptimizer> optimizers = new Dictionary<IOptimizer, IOptimizer>();
350      if (optimizer is HeuristicLab.Optimization.Experiment) {
351        HeuristicLab.Optimization.Experiment experiment = optimizer as HeuristicLab.Optimization.Experiment;
352        if (this.experiment != experiment) {
353          prepend += experiment.Name + "/"; // don't prepend for top-level optimizers
354        }
355        foreach (IOptimizer opt in experiment.Optimizers) {
356          AddRange(optimizers, FlatOptimizerTree(experiment, opt, prepend));
357        }
358      } else if (optimizer is BatchRun) {
359        BatchRun batchRun = optimizer as BatchRun;
360        prepend += batchRun.Name + "/";
361        for (int i = 0; i < batchRun.Repetitions; i++) {
362          IOptimizer opt = (IOptimizer)batchRun.Algorithm.Clone();
363          opt.Name += " [" + i + "]";
364          IDictionary<IOptimizer, IOptimizer> batchOptimizers = FlatOptimizerTree(batchRun, opt, prepend);
365          AddRange(optimizers, batchOptimizers);
366        }
367      } else if (optimizer is EngineAlgorithm) {
368        optimizer.Name = prepend + optimizer.Name;
369        optimizers.Add(optimizer, parent);
370        LogMessage("Optimizer extracted: " + optimizer.Name);
371      } else {
372        Logger.Warn("Optimizer of type " + optimizers.GetType().ToString() + " unknown");
373        optimizer.Name = prepend + optimizer.Name;
374        optimizers.Add(optimizer, parent);
375        LogMessage("Optimizer extracted: " + optimizer.Name);
376      }
377      return optimizers;
378    }
379
380    private void ReplaceOptimizer(IOptimizer parentOptimizer, IOptimizer originalOptimizer, IOptimizer newOptimizer) {
381      lock (locker) {
382        if (parentOptimizer is HeuristicLab.Optimization.Experiment) {
383          HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)parentOptimizer;
384          int originalOptimizerIndex = exp.Optimizers.IndexOf(originalOptimizer);
385          exp.Optimizers[originalOptimizerIndex] = newOptimizer;
386        } else if (parentOptimizer is BatchRun) {
387          BatchRun batchRun = (BatchRun)parentOptimizer;
388          if (newOptimizer is IAlgorithm) {
389            batchRun.Runs.Add(new Run(newOptimizer.Name, (IAlgorithm)newOptimizer));
390          } else {
391            throw new NotSupportedException("Only IAlgorithm types supported");
392          }
393        } else {
394          throw new NotSupportedException("Invalid parentOptimizer");
395        }
396      }
397    }
398
399    private bool NoMorePendingOptimizers() {
400      lock (pendingOptimizerMappingsLocker) {
401        return pendingOptimizersByJobId.Count == 0;
402      }
403    }
404
405    /// <summary>
406    /// Removes optimizers from
407    ///  - parentOptimizersByPendingOptimizer
408    ///  - pendingOptimizersByJobId
409    /// </summary>
410    /// <param name="jobId"></param>
411    private void DisposeOptimizerMappings(Guid jobId) {
412      LogMessage(jobId, "Disposing Optimizer Mappings");
413      lock (pendingOptimizerMappingsLocker) {
414        parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]);
415        pendingOptimizersByJobId.Remove(jobId);
416      }
417    }
418
419    #endregion
420
421    #region Job Management
422    /// <summary>
423    /// Updates all JobItems with the results
424    /// </summary>
425    /// <param name="jobResultList"></param>
426    private void UpdateJobItems(JobResultList jobResultList) {
427      // use a Dict to avoid quadratic runtime complexity
428      IDictionary<Guid, JobResult> jobResultDict = jobResultList.ToDictionary(job => job.JobId);
429      lock (jobItems) {
430        foreach (JobItem jobItem in JobItems) {
431          if (jobResultDict.ContainsKey(jobItem.JobDto.Id)) {
432            jobItem.JobResult = jobResultDict[jobItem.JobDto.Id];
433          }
434        }
435      }
436    }
437
438    private void JobItem_JobStateChanged(object sender, EventArgs e) {
439      JobItem jobItem = (JobItem)sender;
440
441      Thread t = new Thread(() => {
442        try {
443          if (jobItem.State == JobState.Finished) {
444            FetchAndUpdateJob(jobItem.JobDto.Id);
445            DisposeOptimizerMappings(jobItem.JobDto.Id);
446          } else if (jobItem.State == JobState.Failed) {
447            DisposeOptimizerMappings(jobItem.JobDto.Id);
448          }
449
450          if (NoMorePendingOptimizers()) {
451            StopResultPolling();
452            this.ExecutionState = Core.ExecutionState.Stopped;
453            OnStopped();
454          }
455        }
456        catch (Exception ex) {
457          Logger.Error("JobItem_JobStateChanged failed badly: " + ex.Message);
458          LogMessage("JobItem_JobStateChanged failed badly: " + ex.Message);
459        }
460      });
461      t.Start();
462    }
463
464    /// <summary>
465    /// Fetches the finished job from the server and updates the jobItem
466    /// </summary>
467    private void FetchAndUpdateJob(Guid jobId) {
468      bool tryagain = false;
469      LogMessage(jobId, "FetchAndUpdateJob started");
470      if (fetchJobSemaphore.WaitOne(new TimeSpan(0, 2, 0))) {
471        IClientFacade clientFacade = null;
472        try {
473          clientFacade = CreateStreamedClientFacade();
474          IOptimizer originalOptimizer;
475          originalOptimizer = pendingOptimizersByJobId[jobId];
476
477          ResponseObject<SerializedJob> jobResponse = clientFacade.GetLastSerializedResult(jobId, false, false);
478          IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
479          IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
480          ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer);
481          LogMessage(jobId, "FetchAndUpdateJob ended");
482        }
483        catch (Exception e) {
484          LogMessage(jobId, "FetchAndUpdateJob failed: " + e.Message + ". Will try again!");
485          tryagain = true;
486        }
487        finally {
488          ServiceLocator.DisposeClientFacade(clientFacade);
489          fetchJobSemaphore.Release();
490        }
491      } else {
492        LogMessage(jobId, "FetchAndUpdateJob timed out. Will try again!");
493        tryagain = true;
494      }
495
496      if (tryagain) {
497        FetchAndUpdateJob(jobId);
498      }
499    }
500
501    private void UpdateJobItem(JobDto jobDto) {
502      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);
503      jobItem.JobDto = jobDto;
504    }
505
506    public void AbortJob(Guid jobId) {
507      IClientFacade clientFacade = CreateClientFacade();
508      Response response = clientFacade.AbortJob(jobId);
509      LogMessage(jobId, "Aborting Job: " + response.StatusMessage);
510    }
511
512    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
513      IJob job = new OptimizerJob() {
514        Optimizer = optimizer
515      };
516
517      // serialize job
518      MemoryStream memStream = new MemoryStream();
519      XmlGenerator.Serialize(job, memStream);
520      byte[] jobByteArray = memStream.ToArray();
521      memStream.Dispose();
522
523      // find out which which plugins are needed for the given object
524      List<HivePluginInfoDto> pluginsNeeded = (
525        from p in GetDeclaringPlugins(job.GetType())
526        select new HivePluginInfoDto() {
527          Name = p.Name,
528          Version = p.Version
529        }).ToList();
530
531      JobDto jobDto = new JobDto() {
532        CoresNeeded = 1, // [chn] how to determine real cores needed?
533        PluginsNeeded = pluginsNeeded,
534        State = JobState.Offline,
535        MemoryNeeded = 0
536      };
537
538      SerializedJob serializedJob = new SerializedJob() {
539        JobInfo = jobDto,
540        SerializedJobData = jobByteArray
541      };
542
543      return serializedJob;
544    }
545
546    private JobItem GetJobItemById(Guid jobId) {
547      return jobItems.Single(x => x.JobDto.Id == jobId);
548    }
549    #endregion
550
551    #region Result Polling
552    public void StartResultPolling() {
553      this.stopResultsPollingPending = false;
554      this.IsPollingResults = true;
555      resultPollingThread = CreateResultPollingThread();
556      if (resultPollingThread.ThreadState != System.Threading.ThreadState.Running)
557        resultPollingThread.Start();
558    }
559
560    public void StopResultPolling() {
561      this.stopResultsPollingPending = true;
562      if (resultPollingThread != null && resultPollingThread.ThreadState == System.Threading.ThreadState.WaitSleepJoin) {
563        resultPollingThread.Interrupt();
564      }
565      this.stopResultsPollingPending = false;
566    }
567
568    private Thread CreateResultPollingThread() {
569      return new Thread(() => {
570        try {
571          do {
572            IClientFacade clientFacade = CreateStreamedClientFacade();
573            IEnumerable<Guid> jobIdsToQuery = from job in JobItems
574                                              where job.State != JobState.Finished &&
575                                              job.State != JobState.Failed
576                                              select job.JobDto.Id;
577            if (jobIdsToQuery.Count() > 0) {
578              LogMessage("Polling results for " + jobIdsToQuery.Count() + " jobs");
579              try {
580                ResponseObject<JobResultList> response = clientFacade.GetJobResults(jobIdsToQuery);
581                if (response.StatusMessage == ResponseStatus.Ok) {
582                  JobResultList jobItemList = response.Obj;
583                  UpdateJobItems(jobItemList);
584
585                  LogMessage("Polling successfully finished");
586                } else {
587                  throw new Exception(response.StatusMessage.ToString());
588                }
589              }
590              catch (Exception e) {
591                LogMessage("Polling results failed: " + e.Message);
592              }
593              finally {
594                ServiceLocator.DisposeClientFacade(clientFacade);
595              }
596              Thread.Sleep(resultPollingIntervalMs);
597            } else {
598              if (sendingJobsFinished) {
599                // all the jobs have been sent to hive, but non are to query any more (all finished or failed)
600                this.stopResultsPollingPending = true;
601              }
602            }
603          } while (!this.stopResultsPollingPending);
604        }
605        catch (ThreadInterruptedException exception) {
606          // thread has been interuppted
607        }
608        catch (Exception e) {
609          LogMessage("Result Polling Thread failed badly: " + e.Message);
610          Logger.Error("Result Polling Thread failed badly: " + e.Message);
611        }
612        finally {
613          this.IsPollingResults = false;
614        }
615      });
616    }
617
618    #endregion
619
620    #region Snapshots
621
622    private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
623      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
624      jobItem.LatestSnapshot = response;
625    }
626
627    public void RequestSnapshot(Guid jobId) {
628      Thread t = new Thread(() => {
629        IClientFacade clientFacade = null;
630        try {
631          clientFacade = CreateStreamedClientFacade();
632
633          ResponseObject<SerializedJob> response;
634          int retryCount = 0;
635
636          Response snapShotResponse = clientFacade.RequestSnapshot(jobId);
637          if (snapShotResponse.StatusMessage == ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated) {
638            // job already finished
639            Logger.Debug("HiveExperiment: Abort - GetLastResult(false)");
640            response = clientFacade.GetLastSerializedResult(jobId, false, false);
641            Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
642          } else {
643            // server sent snapshot request to client
644            // poll until snapshot is ready
645            do {
646              Thread.Sleep(snapshotPollingIntervalMs);
647              Logger.Debug("HiveExperiment: Abort - GetLastResult(true)");
648              response = clientFacade.GetLastSerializedResult(jobId, false, true);
649              Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
650              retryCount++;
651              // loop while
652              // 1. problem with communication with server
653              // 2. job result not yet ready
654            } while (
655              (retryCount < maxSnapshotRetries) && (
656              response.StatusMessage == ResponseStatus.GetLastSerializedResult_JobResultNotYetThere)
657              );
658          }
659          if (response.StatusMessage == ResponseStatus.Ok) {
660            LogMessage(jobId, "Snapshot polling successfull for job " + jobId);
661            UpdateSnapshot(response);
662          } else {
663            LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage);
664          }
665        }
666        catch (Exception e) {
667          LogMessage("RequestSnapshot Thread failed badly: " + e.Message);
668          Logger.Error("RequestSnapshot Thread failed badly: " + e.Message);
669        }
670        finally {
671          ServiceLocator.DisposeClientFacade(clientFacade);
672        }
673      });
674      t.Start();
675    }
676
677    void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) {
678      JobItem jobItem = (JobItem)sender;
679      if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) {
680        RequestSnapshot(jobItem.JobDto.Id);
681      }
682    }
683
684    #endregion
685
686    #region Required Plugin Search
687    /// <summary>
688    /// Returns a list of plugins in which the type itself and all members
689    /// of the type are declared. Objectgraph is searched recursively.
690    /// </summary>
691    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
692      HashSet<Type> types = new HashSet<Type>();
693      FindTypes(type, types, "HeuristicLab.");
694      return GetDeclaringPlugins(types);
695    }
696
697    /// <summary>
698    /// Returns the plugins (including dependencies) in which the given types are declared
699    /// </summary>
700    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
701      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
702      foreach (Type t in types) {
703        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
704      }
705      return plugins;
706    }
707
708    /// <summary>
709    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
710    /// Also searches the dependencies recursively.
711    /// </summary>
712    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
713      if (!plugins.Contains(plugin)) {
714        plugins.Add(plugin);
715        foreach (IPluginDescription dependency in plugin.Dependencies) {
716          FindDeclaringPlugins(dependency, plugins);
717        }
718      }
719    }
720
721    /// <summary>
722    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
723    /// Be aware that search is not performed on attributes
724    /// </summary>
725    /// <param name="type">the type to be searched</param>
726    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
727    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
728    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
729      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
730        types.Add(type);
731
732        // constructors
733        foreach (ConstructorInfo info in type.GetConstructors()) {
734          foreach (ParameterInfo paramInfo in info.GetParameters()) {
735            FindTypes(paramInfo.ParameterType, types, namespaceStart);
736          }
737        }
738
739        // interfaces
740        foreach (Type t in type.GetInterfaces()) {
741          FindTypes(t, types, namespaceStart);
742        }
743
744        // events
745        foreach (EventInfo info in type.GetEvents()) {
746          FindTypes(info.EventHandlerType, types, namespaceStart);
747          FindTypes(info.DeclaringType, types, namespaceStart);
748        }
749
750        // properties
751        foreach (PropertyInfo info in type.GetProperties()) {
752          FindTypes(info.PropertyType, types, namespaceStart);
753        }
754
755        // fields
756        foreach (FieldInfo info in type.GetFields()) {
757          FindTypes(info.FieldType, types, namespaceStart);
758        }
759
760        // methods
761        foreach (MethodInfo info in type.GetMethods()) {
762          foreach (ParameterInfo paramInfo in info.GetParameters()) {
763            FindTypes(paramInfo.ParameterType, types, namespaceStart);
764          }
765          FindTypes(info.ReturnType, types, namespaceStart);
766        }
767      }
768    }
769    #endregion
770
771    #region Eventhandler
772
773    public event EventHandler ExecutionTimeChanged;
774    private void OnExecutionTimeChanged() {
775      EventHandler handler = ExecutionTimeChanged;
776      if (handler != null) handler(this, EventArgs.Empty);
777    }
778
779    public event EventHandler ExecutionStateChanged;
780    private void OnExecutionStateChanged() {
781      LogMessage("ExecutionState changed to " + executionState.ToString());
782      EventHandler handler = ExecutionStateChanged;
783      if (handler != null) handler(this, EventArgs.Empty);
784    }
785
786    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
787
788    public event EventHandler Started;
789    private void OnStarted() {
790      LogMessage("Started");
791      timer.Start();
792      EventHandler handler = Started;
793      if (handler != null) handler(this, EventArgs.Empty);
794    }
795
796    public event EventHandler Stopped;
797    private void OnStopped() {
798      timer.Stop();
799      LogMessage("Stopped");
800      EventHandler handler = Stopped;
801      if (handler != null) handler(this, EventArgs.Empty);
802    }
803
804    public event EventHandler Paused;
805    private void OnPaused() {
806      timer.Stop();
807      LogMessage("Paused");
808      EventHandler handler = Paused;
809      if (handler != null) handler(this, EventArgs.Empty);
810    }
811
812    public event EventHandler Prepared;
813    protected virtual void OnPrepared() {
814      LogMessage("Prepared");
815      EventHandler handler = Prepared;
816      if (handler != null) handler(this, EventArgs.Empty);
817    }
818
819    public event EventHandler ResourceIdsChanged;
820    protected virtual void OnResourceIdsChanged() {
821      EventHandler handler = ResourceIdsChanged;
822      if (handler != null) handler(this, EventArgs.Empty);
823    }
824
825    public event EventHandler ExperimentChanged;
826    protected virtual void OnExperimentChanged() {
827      LogMessage("Experiment changed");
828      EventHandler handler = ExperimentChanged;
829      if (handler != null) handler(this, EventArgs.Empty);
830    }
831
832    public event EventHandler ServerUrlChanged;
833    protected virtual void OnServerUrlChanged() {
834      EventHandler handler = ServerUrlChanged;
835      if (handler != null) handler(this, EventArgs.Empty);
836    }
837
838    public event EventHandler IsResultsPollingChanged;
839    private void OnIsPollingResultsChanged() {
840      if (this.IsPollingResults) {
841        LogMessage("Results Polling Started");
842        timer.Start();
843      } else {
844        LogMessage("Results Polling Stopped");
845        timer.Stop();
846      }
847      EventHandler handler = IsResultsPollingChanged;
848      if (handler != null) handler(this, EventArgs.Empty);
849    }
850
851    private void RegisterJobItemListEvents() {
852      jobItems.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
853      jobItems.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
854      jobItems.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
855      jobItems.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
856      foreach (JobItem jobItem in jobItems) {
857        RegisterJobItemEvents(jobItem);
858      }
859    }
860
861    private void DeregisterJobItemListEvents() {
862      jobItems.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
863      jobItems.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
864      jobItems.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
865      jobItems.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
866      foreach (JobItem jobItem in jobItems) {
867        DeregisterJobItemEvents(jobItem);
868      }
869    }
870
871    void jobItems_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
872      UpdateJobItemEvents(e);
873    }
874
875    private void UpdateJobItemEvents(CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
876      if (e.OldItems != null) {
877        foreach (var item in e.OldItems) {
878          DeregisterJobItemEvents(item.Value);
879        }
880      }
881      if (e.Items != null) {
882        foreach (var item in e.Items) {
883          RegisterJobItemEvents(item.Value);
884        }
885      }
886    }
887
888    private void RegisterJobItemEvents(JobItem jobItem) {
889      jobItem.SnapshotRequestedStateChanged += new EventHandler(JobItem_SnapshotRequestedStateChanged);
890      jobItem.JobStateChanged += new EventHandler(JobItem_JobStateChanged);
891    }
892
893    private void DeregisterJobItemEvents(JobItem jobItem) {
894      jobItem.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
895      jobItem.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
896    }
897
898    void jobItems_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
899      UpdateJobItemEvents(e);
900    }
901
902    void jobItems_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
903      UpdateJobItemEvents(e);
904    }
905
906    void jobItems_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
907      foreach (var item in e.OldItems) {
908        item.Value.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
909        item.Value.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
910      }
911    }
912    #endregion
913
914    #region Helper Functions
915    private IClientFacade CreateClientFacade() {
916      IClientFacade clientFacade = null;
917      do {
918        try {
919          //clientFacade = ServiceLocator.CreateClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientServiceName));
920          clientFacade = ServiceLocator.CreateClientFacade(Settings.Default.HiveServerIp);
921
922        }
923        catch (EndpointNotFoundException exception) {
924          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
925          Thread.Sleep(resultPollingIntervalMs);
926        }
927      } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
928      return clientFacade;
929    }
930
931    private IClientFacade CreateStreamedClientFacade() {
932      IClientFacade clientFacade = null;
933      do {
934        try {
935          //clientFacade = ServiceLocator.CreateStreamedClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientStreamedServiceName));
936          clientFacade = ServiceLocator.CreateStreamedClientFacade(Settings.Default.HiveServerIp);
937        }
938        catch (EndpointNotFoundException exception) {
939          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
940          Thread.Sleep(resultPollingIntervalMs);
941        }
942      } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
943      return clientFacade;
944    }
945
946    private void AddRange(IDictionary<IOptimizer, IOptimizer> optimizers, IDictionary<IOptimizer, IOptimizer> childs) {
947      foreach (KeyValuePair<IOptimizer, IOptimizer> kvp in childs) {
948        optimizers.Add(kvp);
949      }
950    }
951
952    #endregion
953
954    #region Logging
955    private void LogMessage(string message) {
956      // HeuristicLab.Log is not Thread-Safe, so lock on every call
957      lock (locker) {
958        log.LogMessage(message);
959      }
960    }
961
962    private void LogMessage(Guid jobId, string message) {
963      GetJobItemById(jobId).LogMessage(message);
964      LogMessage(message + " (jobId: " + jobId + ")");
965    }
966
967    #endregion
968  }
969}
Note: See TracBrowser for help on using the repository browser.