source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4170

Last change on this file since 4170 was 4170, checked in by cneumuel, 12 years ago

refactoring of Result-Polling of HiveExperiment, polling is now much faster and code is cleaner (1092#)

File size: 36.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Linq;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using HeuristicLab.Optimization;
28using System.Drawing;
29using HeuristicLab.Collections;
30using System.Collections.Generic;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using HeuristicLab.Persistence.Default.Xml;
34using HeuristicLab.PluginInfrastructure;
35using System.Reflection;
36using HeuristicLab.Hive.Contracts.Interfaces;
37using HeuristicLab.Hive.Contracts;
38using System.Threading;
39using HeuristicLab.Tracing;
40using HeuristicLab.Hive.JobBase;
41using System.Diagnostics;
42using System.Collections;
43using System.ServiceModel;
44
45namespace HeuristicLab.Hive.Experiment {
46  /// <summary>
47  /// An experiment which contains multiple batch runs of algorithms.
48  /// </summary>
49  [Item(itemName, itemDescription)]
50  [Creatable("Testing & Analysis")]
51  [StorableClass]
52  public class HiveExperiment : NamedItem, IExecutable {
53    private const string itemName = "Hive Experiment";
54    private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
55    private const int resultPollingIntervalMs = 5000;
56    private const int snapshotPollingIntervalMs = 1000;
57    private const int maxSnapshotRetries = 20;
58    private object locker = new object();
59
60    private System.Timers.Timer timer;
61    private bool pausePending, stopPending;
62    private bool sendingJobsFinished = false;
63
64    // ensure that only 2 threads can fetch jobresults simultaniously
65    private Semaphore fetchJobSemaphore = new Semaphore(2, 2);
66
67    [Storable]
68    private DateTime lastUpdateTime;
69
70    private bool isPollingResults;
71    public bool IsPollingResults {
72      get { return isPollingResults; }
73      private set {
74        if (isPollingResults != value) {
75          isPollingResults = value;
76          OnIsPollingResultsChanged();
77        }
78      }
79    }
80
81    private bool stopResultsPollingPending = false;
82
83    private Thread resultPollingThread;
84
85    /// <summary>
86    /// Mapping from JobId to an optimizer.
87    /// Stores all pending optimizers. If an optimizer is finished it is removed from this collection
88    /// </summary>
89    [Storable]
90    private IDictionary<Guid, IOptimizer> pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
91
92    /// <summary>
93    /// Stores a mapping from the child-optimizer to the parent optimizer.
94    /// Needed to replace a finished optimizer in the optimizer-tree.
95    /// Only pending optmizers are stored.
96    /// </summary>
97    [Storable]
98    private IDictionary<IOptimizer, IOptimizer> parentOptimizersByPendingOptimizer = new Dictionary<IOptimizer, IOptimizer>();
99
100    [Storable]
101    private JobItemList jobItems;
102    public JobItemList JobItems {
103      get { return jobItems; }
104    }
105
106
107    [Storable]
108    private string serverUrl;
109    public string ServerUrl {
110      get { return serverUrl; }
111      set {
112        if (serverUrl != value) {
113          serverUrl = value;
114          OnServerUrlChanged();
115        }
116      }
117    }
118
119    [Storable]
120    private string resourceIds;
121    public string ResourceIds {
122      get { return resourceIds; }
123      set {
124        if (resourceIds != value) {
125          resourceIds = value;
126          OnResourceIdsChanged();
127        }
128      }
129    }
130
131    [Storable]
132    private HeuristicLab.Optimization.Experiment experiment;
133    public HeuristicLab.Optimization.Experiment Experiment {
134      get { return experiment; }
135      set {
136        if (experiment != value) {
137          experiment = value;
138          OnExperimentChanged();
139        }
140      }
141    }
142
143    [Storable]
144    private ILog log;
145    public ILog Log {
146      get { return log; }
147    }
148
149    [StorableConstructor]
150    public HiveExperiment(bool deserializing)
151      : base(deserializing) {
152    }
153
154    public HiveExperiment()
155      : base(itemName, itemDescription) {
156      this.ServerUrl = HeuristicLab.Hive.Experiment.Properties.Settings.Default.HiveServerUrl;
157      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
158      this.log = new Log();
159      pausePending = stopPending = false;
160      jobItems = new JobItemList();
161      isPollingResults = false;
162      RegisterJobItemListEvents();
163      InitTimer();
164    }
165
166    public override IDeepCloneable Clone(Cloner cloner) {
167      LogMessage("I am beeing cloned");
168      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
169      clone.resourceIds = this.resourceIds;
170      clone.serverUrl = this.serverUrl;
171      clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment);
172      clone.executionState = this.executionState;
173      clone.executionTime = this.executionTime;
174      clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
175
176      foreach (var pair in this.pendingOptimizersByJobId)
177        clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
178
179      foreach (var pair in this.parentOptimizersByPendingOptimizer)
180        clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);
181
182      clone.log = (ILog)cloner.Clone(log);
183      clone.stopPending = this.stopPending;
184      clone.pausePending = this.pausePending;
185      clone.jobItems.AddRange((JobItemList)cloner.Clone(jobItems));
186      clone.lastUpdateTime = this.lastUpdateTime;
187      clone.isPollingResults = this.isPollingResults;
188      return clone;
189    }
190
191    [StorableHook(HookType.AfterDeserialization)]
192    private void AfterDeserialization() {
193      InitTimer();
194      this.IsPollingResults = false;
195      this.stopResultsPollingPending = false;
196      RegisterJobItemListEvents();
197      LogMessage("I was deserialized.");
198    }
199
200    private void InitTimer() {
201      timer = new System.Timers.Timer(100);
202      timer.AutoReset = true;
203      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
204    }
205
206    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
207      DateTime now = DateTime.Now;
208      ExecutionTime += now - lastUpdateTime;
209      lastUpdateTime = now;
210    }
211
212    public IEnumerable<string> ResourceGroups {
213      get {
214        if (!string.IsNullOrEmpty(resourceIds)) {
215          return resourceIds.Split(';');
216        } else {
217          return new List<string>();
218        }
219      }
220    }
221    #region IExecutable Members
222
223    [Storable]
224    private Core.ExecutionState executionState;
225    public ExecutionState ExecutionState {
226      get { return executionState; }
227      private set {
228        if (executionState != value) {
229          executionState = value;
230          OnExecutionStateChanged();
231        }
232      }
233    }
234
235    [Storable]
236    private TimeSpan executionTime;
237    public TimeSpan ExecutionTime {
238      get { return executionTime; }
239      private set {
240        if (executionTime != value) {
241          executionTime = value;
242          OnExecutionTimeChanged();
243        }
244      }
245    }
246
247    public void Pause() {
248      throw new NotSupportedException();
249    }
250
251    public void Prepare() {
252      if (experiment != null) {
253        StopResultPolling();
254        pendingOptimizersByJobId.Clear();
255        parentOptimizersByPendingOptimizer.Clear();
256        jobItems.Clear();
257        experiment.Prepare();
258        this.ExecutionState = Core.ExecutionState.Prepared;
259        OnPrepared();
260      }
261    }
262
263    public void Start() {
264      OnStarted();
265      ExecutionTime = new TimeSpan();
266      lastUpdateTime = DateTime.Now;
267      this.ExecutionState = Core.ExecutionState.Started;
268      StartResultPolling();
269
270      Thread t = new Thread(() => {
271        IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
272       
273        try {
274          pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
275
276          LogMessage("Extracting jobs from Experiment");
277          parentOptimizersByPendingOptimizer = GetOptimizers(true);
278          LogMessage("Extraction of jobs from Experiment finished");
279         
280          IEnumerable<string> groups = ResourceGroups;
281
282          sendingJobsFinished = false;
283          foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) {
284            SerializedJob serializedJob = CreateSerializedJob(optimizer);
285            ResponseObject<JobDto> response = executionEngineFacade.AddJobWithGroupStrings(serializedJob, groups);
286            pendingOptimizersByJobId.Add(response.Obj.Id, optimizer);
287
288            JobItem jobItem = new JobItem() {
289              JobDto = response.Obj,
290              LatestSnapshot = null,
291              Optimizer = optimizer
292            };
293            lock (jobItems) {
294              jobItems.Add(jobItem);
295            }
296            LogMessage(jobItem.JobDto.Id, "Job sent to Hive");
297          }
298          sendingJobsFinished = true;
299        } catch (Exception e) {
300          LogMessage("Error: Starting HiveExperiment failed: " + e.Message);
301          this.ExecutionState = Core.ExecutionState.Stopped;
302          OnStopped();
303        }
304
305      });
306      t.Start();
307    }
308
309    public void Stop() {
310      this.ExecutionState = Core.ExecutionState.Stopped;
311      foreach (JobItem jobItem in jobItems) {
312        AbortJob(jobItem.JobDto.Id);
313      }
314      OnStopped();
315    }
316
317    public void StartResultPolling() {
318      this.stopResultsPollingPending = false;
319      this.IsPollingResults = true;
320      resultPollingThread = CreateResultPollingThread();
321      if (resultPollingThread.ThreadState != System.Threading.ThreadState.Running)
322        resultPollingThread.Start();
323    }
324
325    public void StopResultPolling() {
326      this.stopResultsPollingPending = true;
327      resultPollingThread.Interrupt();
328      this.stopResultsPollingPending = false;
329    }
330
331    private JobItem GetJobItemById(Guid jobId) {
332      return jobItems.Single(x => x.JobDto.Id == jobId);
333    }
334
335    /// <summary>
336    /// Returns all optimizers in the current Experiment
337    /// </summary>
338    /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
339    /// <returns></returns>
340    private IDictionary<IOptimizer, IOptimizer> GetOptimizers(bool flatout) {
341      if (!flatout) {
342        var optimizers = new Dictionary<IOptimizer, IOptimizer>();
343        foreach (IOptimizer opt in experiment.Optimizers) {
344          optimizers.Add(experiment, opt);
345        }
346        return optimizers;
347      } else {
348        return FlatOptimizerTree(null, experiment, "");
349      }
350    }
351
352    /// <summary>
353    /// Recursively iterates all IOptimizers in the optimizer-tree and returns them.
354    ///
355    /// [chn] this could be implemented more cleanly if Experiment and BatchRun would implement an interface like:
356    /// interface IParallelizable {
357    ///   IEnumerable&lt;IOptimizer&gt; GetOptimizers();
358    /// }
359    /// </summary>
360    /// <returns>a dictionary mapping from the parent optimizer to the child optimizer</returns>
361    private IDictionary<IOptimizer, IOptimizer> FlatOptimizerTree(IOptimizer parent, IOptimizer optimizer, string prepend) {
362      IDictionary<IOptimizer, IOptimizer> optimizers = new Dictionary<IOptimizer, IOptimizer>();
363      if (optimizer is HeuristicLab.Optimization.Experiment) {
364        HeuristicLab.Optimization.Experiment experiment = optimizer as HeuristicLab.Optimization.Experiment;
365        if (this.experiment != experiment) {
366          prepend += experiment.Name + "/"; // don't prepend for top-level optimizers
367        }
368        foreach (IOptimizer opt in experiment.Optimizers) {
369          AddRange(optimizers, FlatOptimizerTree(experiment, opt, prepend));
370        }
371      } else if (optimizer is BatchRun) {
372        BatchRun batchRun = optimizer as BatchRun;
373        prepend += batchRun.Name + "/";
374        for (int i = 0; i < batchRun.Repetitions; i++) {
375          IOptimizer opt = (IOptimizer)batchRun.Algorithm.Clone();
376          opt.Name += " [" + i + "]";
377          IDictionary<IOptimizer, IOptimizer> batchOptimizers = FlatOptimizerTree(batchRun, opt, prepend);
378          AddRange(optimizers, batchOptimizers);
379        }
380      } else if (optimizer is EngineAlgorithm) {
381        optimizer.Name = prepend + optimizer.Name;
382        optimizers.Add(optimizer, parent);
383      } else {
384        Logger.Warn("Optimizer of type " + optimizers.GetType().ToString() + " unknown");
385        optimizer.Name = prepend + optimizer.Name;
386        optimizers.Add(optimizer, parent);
387      }
388      return optimizers;
389    }
390
391    private void AddRange(IDictionary<IOptimizer, IOptimizer> optimizers, IDictionary<IOptimizer, IOptimizer> childs) {
392      foreach (KeyValuePair<IOptimizer, IOptimizer> kvp in childs) {
393        optimizers.Add(kvp);
394      }
395    }
396
397    private void ReplaceOptimizer(IOptimizer parentOptimizer, IOptimizer originalOptimizer, IOptimizer newOptimizer) {
398      lock (locker) {
399        if (parentOptimizer is HeuristicLab.Optimization.Experiment) {
400          HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)parentOptimizer;
401          int originalOptimizerIndex = exp.Optimizers.IndexOf(originalOptimizer);
402          exp.Optimizers[originalOptimizerIndex] = newOptimizer;
403        } else if (parentOptimizer is BatchRun) {
404          BatchRun batchRun = (BatchRun)parentOptimizer;
405          if (newOptimizer is IAlgorithm) {
406            batchRun.Runs.Add(new Run(newOptimizer.Name, (IAlgorithm)newOptimizer));
407          } else {
408            throw new NotSupportedException("Only IAlgorithm types supported");
409          }
410        } else {
411          throw new NotSupportedException("Invalid parentOptimizer");
412        }
413      }
414    }
415
416    public void AbortJob(Guid jobId) {
417      IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
418      Response response = executionEngineFacade.AbortJob(jobId);
419      LogMessage(jobId, "Aborting Job: " + response.StatusMessage);
420    }
421
422    #endregion
423
424    private IExecutionEngineFacade GetExecutionEngineFacade() {
425      IExecutionEngineFacade executionEngineFacade = null;
426      do {
427        try {
428          executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl);
429        } catch (EndpointNotFoundException exception) {
430          LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
431          Thread.Sleep(resultPollingIntervalMs);
432        }
433      } while (executionEngineFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
434      return executionEngineFacade;
435    }
436
437    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
438      IJob job = new OptimizerJob() {
439        Optimizer = optimizer
440      };
441
442      // serialize job
443      MemoryStream memStream = new MemoryStream();
444      XmlGenerator.Serialize(job, memStream);
445      byte[] jobByteArray = memStream.ToArray();
446      memStream.Dispose();
447
448      // find out which which plugins are needed for the given object
449      List<HivePluginInfoDto> pluginsNeeded = (
450        from p in GetDeclaringPlugins(job.GetType())
451        select new HivePluginInfoDto() {
452          Name = p.Name,
453          Version = p.Version
454        }).ToList();
455
456      JobDto jobDto = new JobDto() {
457        CoresNeeded = 1, // [chn] how to determine real cores needed?
458        PluginsNeeded = pluginsNeeded,
459        State = State.Offline,
460        MemoryNeeded = 0,
461        UserId = Guid.Empty // [chn] set real userid here!
462      };
463
464      SerializedJob serializedJob = new SerializedJob() {
465        JobInfo = jobDto,
466        SerializedJobData = jobByteArray
467      };
468
469      return serializedJob;
470    }
471
472    /// <summary>
473    /// Requests a resultList for all jobItems not finished or aborted
474    /// </summary>
475    /// <returns></returns>
476    private Thread CreateResultPollingThread() {
477      return new Thread(() => {
478        try {
479          do {
480            IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
481            IEnumerable<Guid> jobIdsToQuery = from job in JobItems
482                                              where job.State != State.Finished &&
483                                              job.State != State.Failed
484                                              select job.JobDto.Id;
485            if (jobIdsToQuery.Count() > 0) {
486              LogMessage("Polling results for " + jobIdsToQuery.Count() + " jobs");
487              try {
488                ResponseObject<JobResultList> response = executionEngineFacade.GetAllJobResults(jobIdsToQuery);
489                if (response.Success) {
490                  JobResultList jobItemList = response.Obj;
491                  UpdateJobItems(jobItemList);
492
493                  LogMessage("Polling successfully finished");
494                }
495              } catch (Exception e) {
496                LogMessage("Polling results failed: " + e.Message);
497              }
498              Thread.Sleep(resultPollingIntervalMs);
499            } else {
500              if (sendingJobsFinished) {
501                // all the jobs have been sent to hive, but non are to query any more (all finished or failed)
502                this.stopResultsPollingPending = true;
503              }
504            }
505          } while (!this.stopResultsPollingPending);
506        } catch (ThreadInterruptedException exception) {
507          // thread has been interuppted
508        } finally {
509          this.IsPollingResults = false;
510        }
511      });
512    }
513
514    /// <summary>
515    /// Updates all JobItems with the results
516    /// </summary>
517    /// <param name="jobResultList"></param>
518    private void UpdateJobItems(JobResultList jobResultList) {
519      // use a Dict to avoid quadratic runtime complexity
520      IDictionary<Guid, JobResult> jobResultDict = jobResultList.ToDictionary(job => job.JobId);
521      lock (jobItems) {
522        foreach (JobItem jobItem in JobItems) {
523          if (jobResultDict.ContainsKey(jobItem.JobDto.Id)) {
524            jobItem.JobResult = jobResultDict[jobItem.JobDto.Id];
525          }
526        }
527      }
528    }
529
530    void JobItem_JobStateChanged(object sender, EventArgs e) {
531      JobItem jobItem = (JobItem)sender;
532      Thread t = new Thread(() => {
533        if (jobItem.State == State.Finished) {
534          FetchAndUpdateJob(jobItem.JobDto.Id);
535          DisposeOptimizerMappings(jobItem.JobDto.Id);
536        } else if (jobItem.State == State.Failed) {
537          DisposeOptimizerMappings(jobItem.JobDto.Id);
538        }
539
540        if (NoMorePendingOptimizers()) {
541          StopResultPolling();
542          this.ExecutionState = Core.ExecutionState.Stopped;
543          OnStopped();
544        }
545      });
546      t.Start();
547    }
548
549    private bool NoMorePendingOptimizers() {
550      return pendingOptimizersByJobId.Count == 0;
551    }
552
553    /// <summary>
554    /// Removes optimizers from
555    ///  - parentOptimizersByPendingOptimizer
556    ///  - pendingOptimizersByJobId
557    /// </summary>
558    /// <param name="jobId"></param>
559    private void DisposeOptimizerMappings(Guid jobId) {
560      parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]);
561      pendingOptimizersByJobId.Remove(jobId);
562    }
563
564    /// <summary>
565    /// Fetches the finished job from the server and updates the jobItem
566    /// </summary>
567    private void FetchAndUpdateJob(Guid jobId) {
568      LogMessage(jobId, "FetchAndUpdateJob started");
569      IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
570      IOptimizer originalOptimizer = pendingOptimizersByJobId[jobId];
571
572      fetchJobSemaphore.WaitOne();
573      ResponseObject<SerializedJob> jobResponse = executionEngineFacade.GetLastSerializedResult(jobId, false, false);
574      UpdateSnapshot(jobResponse);
575
576      IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
577      IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
578
579      ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer);
580      fetchJobSemaphore.Release();
581      LogMessage(jobId, "FetchAndUpdateJob ended");
582    }
583
584    //// ensure that not all resultPollingThreads can access the server simultaniously
585    //private Semaphore resultPollingSemaphore = new Semaphore(5, 5);
586
587    //private Thread CreateResultPollingThread(JobDto job) {
588    //  return new Thread(() => {
589    //    try {
590    //      GetJobItemById(job.Id).LogMessage("Starting job results polling");
591    //      IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
592    //      IJob restoredObject = null;
593    //      IOptimizer originalOptimizer = pendingOptimizersByJobId[job.Id];
594
595    //      do {
596    //        // loop while
597    //        // 1. the user doesn't request an abort
598    //        // 2. there is a problem with server communication (success==false)
599    //        // 3. no result for the job is available yet (response.Obj==null)
600    //        // 4. the result that we get from the server is a snapshot and not the final result
601    //        try {
602    //          resultPollingSemaphore.WaitOne();
603    //          if (stopPending || !this.IsPollingResults) {
604    //            return;
605    //          }
606
607    //          ResponseObject<JobDto> response = null;
608    //          try {
609    //            response = executionEngineFacade.GetJobById(job.Id);
610    //            LogMessage("Response: " + response.StatusMessage + " (jobId: " + job.Id + ")");
611    //            GetJobItemById(job.Id).LogMessage("Response: " + response.StatusMessage);
612
613    //            UpdateJobItem(response.Obj);
614
615    //            if (response.Obj.State == State.Abort) {
616    //              pendingOptimizersByJobId.Remove(job.Id);
617    //              parentOptimizersByPendingOptimizer.Remove(originalOptimizer);
618    //              GetJobItemById(job.Id).LogMessage("Job successfully aborted");
619    //              return;
620    //            }
621
622    //            if (response.Obj.State == State.Failed) {
623    //              pendingOptimizersByJobId.Remove(job.Id);
624    //              parentOptimizersByPendingOptimizer.Remove(originalOptimizer);
625    //              GetJobItemById(job.Id).LogMessage("Job failed with exception: " + response.Obj.Exception);
626    //              return;
627    //            }
628    //          } catch (Exception e) {
629    //            LogMessage("Error: Result polling failed for job (" + job.Id + "): " + e.Message);
630    //            GetJobItemById(job.Id).LogMessage("Error: Result polling failed: " + e.Message);
631    //          }
632
633    //          if (response != null && response.Success && response.Obj != null && response.Obj.State == State.Finished) {
634    //            try {
635    //              ResponseObject<SerializedJob> jobResponse = executionEngineFacade.GetLastSerializedResult(job.Id, false, false);
636    //              restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
637    //              UpdateSnapshot(jobResponse);
638    //            } catch (Exception e) {
639    //              LogMessage("Error: Retrieving final result failed for job (" + job.Id + "): " + e.Message);
640    //              GetJobItemById(job.Id).LogMessage("Error: Retrieving final result failed: " + e.Message);
641    //            }
642    //          } else {
643    //            Thread.Sleep(resultPollingIntervalMs);
644    //          }
645    //        } finally {
646    //          // use finally to always release the semaphore lock
647    //          resultPollingSemaphore.Release();
648    //        }
649    //      } while (restoredObject == null || restoredObject.ExecutionState != Core.ExecutionState.Stopped);
650
651    //      LogMessage("Job finished (jobId: " + job.Id + ")");
652    //      GetJobItemById(job.Id).LogMessage("Job finished");
653    //      // job retrieved... replace the existing optimizers with the finished one
654
655    //      IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
656
657    //      ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer);
658    //      pendingOptimizersByJobId.Remove(job.Id);
659    //      parentOptimizersByPendingOptimizer.Remove(originalOptimizer);
660
661    //    } catch (ThreadInterruptedException exception) {
662
663    //    } finally {
664    //      GetJobItemById(job.Id).LogMessage("ResultsPolling Thread stopped");
665    //      //lock (resultPollingThreads) {
666    //      //  resultPollingThreads.Remove(job.Id);
667    //      //  if (resultPollingThreads.Count == 0) {
668    //      //    IsPollingResults = false;
669    //      //  }
670    //      //}
671
672    //      // check if finished
673    //      if (pendingOptimizersByJobId.Count == 0) {
674    //        this.ExecutionState = Core.ExecutionState.Stopped;
675    //        OnStopped();
676    //      }
677    //    }
678    //  });
679    //}
680
681    private void UpdateJobItem(JobDto jobDto) {
682      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);
683      jobItem.JobDto = jobDto;
684    }
685
686    private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
687      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
688      jobItem.LatestSnapshot = response;
689    }
690
691    private void LogMessage(string message) {
692      // HeuristicLab.Log is not Thread-Safe, so lock on every call
693      lock (locker) {
694        log.LogMessage(message);
695      }
696    }
697
698    private void LogMessage(Guid jobId, string message) {
699      GetJobItemById(jobId).LogMessage(message);
700      LogMessage(message + " (jobId: " + jobId + ")");
701    }
702
703    void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) {
704      JobItem jobItem = (JobItem)sender;
705      if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) {
706        RequestSnapshot(jobItem.JobDto.Id);
707      }
708    }
709
710    public void RequestSnapshot(Guid jobId) {
711      IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
712      ResponseObject<SerializedJob> response;
713      int retryCount = 0;
714
715      Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
716      if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
717        // job already finished
718        Logger.Debug("HiveEngine: Abort - GetLastResult(false)");
719        response = executionEngineFacade.GetLastSerializedResult(jobId, false, false);
720        Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
721      } else {
722        // server sent snapshot request to client
723        // poll until snapshot is ready
724        do {
725          Thread.Sleep(snapshotPollingIntervalMs);
726          Logger.Debug("HiveEngine: Abort - GetLastResult(true)");
727          response = executionEngineFacade.GetLastSerializedResult(jobId, false, true);
728          Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
729          retryCount++;
730          // loop while
731          // 1. problem with communication with server
732          // 2. job result not yet ready
733        } while (
734          (retryCount < maxSnapshotRetries) && (
735          !response.Success ||
736          response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE)
737          );
738      }
739      if (response.Success) {
740        LogMessage(jobId, "Snapshot polling successfull for job " + jobId);
741        GetJobItemById(jobId).LatestSnapshot = response;
742      } else {
743        LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage);
744      }
745    }
746
747    #region Required Plugin Search
748    /// <summary>
749    /// Returns a list of plugins in which the type itself and all members
750    /// of the type are declared. Objectgraph is searched recursively.
751    /// </summary>
752    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
753      HashSet<Type> types = new HashSet<Type>();
754      FindTypes(type, types, "HeuristicLab.");
755      return GetDeclaringPlugins(types);
756    }
757
758    /// <summary>
759    /// Returns the plugins (including dependencies) in which the given types are declared
760    /// </summary>
761    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
762      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
763      foreach (Type t in types) {
764        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
765      }
766      return plugins;
767    }
768
769    /// <summary>
770    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
771    /// Also searches the dependencies recursively.
772    /// </summary>
773    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
774      if (!plugins.Contains(plugin)) {
775        plugins.Add(plugin);
776        foreach (IPluginDescription dependency in plugin.Dependencies) {
777          FindDeclaringPlugins(dependency, plugins);
778        }
779      }
780    }
781
782    /// <summary>
783    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
784    /// Be aware that search is not performed on attributes
785    /// </summary>
786    /// <param name="type">the type to be searched</param>
787    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
788    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
789    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
790      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
791        types.Add(type);
792
793        // constructors
794        foreach (ConstructorInfo info in type.GetConstructors()) {
795          foreach (ParameterInfo paramInfo in info.GetParameters()) {
796            FindTypes(paramInfo.ParameterType, types, namespaceStart);
797          }
798        }
799
800        // interfaces
801        foreach (Type t in type.GetInterfaces()) {
802          FindTypes(t, types, namespaceStart);
803        }
804
805        // events
806        foreach (EventInfo info in type.GetEvents()) {
807          FindTypes(info.EventHandlerType, types, namespaceStart);
808          FindTypes(info.DeclaringType, types, namespaceStart);
809        }
810
811        // properties
812        foreach (PropertyInfo info in type.GetProperties()) {
813          FindTypes(info.PropertyType, types, namespaceStart);
814        }
815
816        // fields
817        foreach (FieldInfo info in type.GetFields()) {
818          FindTypes(info.FieldType, types, namespaceStart);
819        }
820
821        // methods
822        foreach (MethodInfo info in type.GetMethods()) {
823          foreach (ParameterInfo paramInfo in info.GetParameters()) {
824            FindTypes(paramInfo.ParameterType, types, namespaceStart);
825          }
826          FindTypes(info.ReturnType, types, namespaceStart);
827        }
828      }
829    }
830    #endregion
831
832    #region Eventhandler
833
834    public event EventHandler ExecutionTimeChanged;
835    private void OnExecutionTimeChanged() {
836      EventHandler handler = ExecutionTimeChanged;
837      if (handler != null) handler(this, EventArgs.Empty);
838    }
839
840    public event EventHandler ExecutionStateChanged;
841    private void OnExecutionStateChanged() {
842      LogMessage("ExecutionState changed to " + executionState.ToString());
843      EventHandler handler = ExecutionStateChanged;
844      if (handler != null) handler(this, EventArgs.Empty);
845    }
846
847    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
848
849    public event EventHandler Started;
850    private void OnStarted() {
851      LogMessage("Started");
852      timer.Start();
853      EventHandler handler = Started;
854      if (handler != null) handler(this, EventArgs.Empty);
855    }
856
857    public event EventHandler Stopped;
858    private void OnStopped() {
859      timer.Stop();
860      LogMessage("Stopped");
861      EventHandler handler = Stopped;
862      if (handler != null) handler(this, EventArgs.Empty);
863    }
864
865    public event EventHandler Paused;
866    private void OnPaused() {
867      timer.Stop();
868      LogMessage("Paused");
869      EventHandler handler = Paused;
870      if (handler != null) handler(this, EventArgs.Empty);
871    }
872
873    public event EventHandler Prepared;
874    protected virtual void OnPrepared() {
875      LogMessage("Prepared");
876      EventHandler handler = Prepared;
877      if (handler != null) handler(this, EventArgs.Empty);
878    }
879
880    public event EventHandler ResourceIdsChanged;
881    protected virtual void OnResourceIdsChanged() {
882      EventHandler handler = ResourceIdsChanged;
883      if (handler != null) handler(this, EventArgs.Empty);
884    }
885
886    public event EventHandler ExperimentChanged;
887    protected virtual void OnExperimentChanged() {
888      LogMessage("Experiment changed");
889      EventHandler handler = ExperimentChanged;
890      if (handler != null) handler(this, EventArgs.Empty);
891    }
892
893    public event EventHandler ServerUrlChanged;
894    protected virtual void OnServerUrlChanged() {
895      EventHandler handler = ServerUrlChanged;
896      if (handler != null) handler(this, EventArgs.Empty);
897    }
898
899    public event EventHandler IsResultsPollingChanged;
900    private void OnIsPollingResultsChanged() {
901      if (this.IsPollingResults) {
902        LogMessage("Results Polling Started");
903        timer.Start();
904      } else {
905        LogMessage("Results Polling Stopped");
906        timer.Stop();
907      }
908      EventHandler handler = IsResultsPollingChanged;
909      if (handler != null) handler(this, EventArgs.Empty);
910    }
911
912    private void RegisterJobItemListEvents() {
913      jobItems.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
914      jobItems.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
915      jobItems.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
916      jobItems.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
917    }
918
919    private void DeregisterJobItemListEvents() {
920      jobItems.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
921      jobItems.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
922      jobItems.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
923      jobItems.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
924    }
925
926    void jobItems_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
927      UpdateJobItemEvents(e);
928    }
929
930    private void UpdateJobItemEvents(CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
931      if (e.OldItems != null) {
932        foreach (var item in e.OldItems) {
933          item.Value.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
934          item.Value.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
935        }
936      }
937      if (e.Items != null) {
938        foreach (var item in e.Items) {
939          item.Value.JobStateChanged += new EventHandler(JobItem_JobStateChanged);
940          item.Value.SnapshotRequestedStateChanged += new EventHandler(JobItem_SnapshotRequestedStateChanged);
941        }
942      }
943    }
944
945    void jobItems_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
946      UpdateJobItemEvents(e);
947    }
948
949    void jobItems_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
950      UpdateJobItemEvents(e);
951    }
952
953    void jobItems_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
954      foreach (var item in e.OldItems) {
955        item.Value.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
956        item.Value.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
957      }
958    }
959    #endregion
960  }
961}
Note: See TracBrowser for help on using the repository browser.