Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs @ 4133

Last change on this file since 4133 was 4133, checked in by cneumuel, 14 years ago
  • Made HiveExperiment storable, so that a running HiveExperiment can be disconnected, stored and later resumed. (#1115)
  • Added Log to each JobItem (#1115)
File size: 20.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Linq;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using HeuristicLab.Optimization;
28using System.Drawing;
29using HeuristicLab.Collections;
30using System.Collections.Generic;
31using HeuristicLab.Hive.Contracts.BusinessObjects;
32using System.IO;
33using HeuristicLab.Persistence.Default.Xml;
34using HeuristicLab.PluginInfrastructure;
35using System.Reflection;
36using HeuristicLab.Hive.Contracts.Interfaces;
37using HeuristicLab.Hive.Contracts;
38using System.Threading;
39using HeuristicLab.Tracing;
40using HeuristicLab.Hive.JobBase;
41using System.Diagnostics;
42using System.Collections;
43
44namespace HeuristicLab.Hive.Experiment {
45  /// <summary>
46  /// An experiment which contains multiple batch runs of algorithms.
47  /// </summary>
48  [Item(itemName, itemDescription)]
49  [Creatable("Testing & Analysis")]
50  [StorableClass]
51  public class HiveExperiment : NamedItem, IExecutable {
52    private const string itemName = "Hive Experiment";
53    private const string itemDescription = "An experiment which contains multiple batch runs of algorithms which are executed in the Hive.";
54    private const int resultPollingIntervalMs = 15000;
55    private const int maxSnapshotRetries = 20;
56    private object locker = new object();
57
58    private System.Timers.Timer timer;
59    private bool pausePending, stopPending;
60
61    [Storable]
62    private DateTime lastUpdateTime;
63
64    private bool isPollingResults;
65    public bool IsPollingResults {
66      get { return isPollingResults; }
67      private set {
68        if (isPollingResults != value) {
69          isPollingResults = value;
70          OnIsPollingResultsChanged();
71        }
72      }
73    }
74
75    private bool stopResultsPollingPending = false;
76
77    private IDictionary<Guid, Thread> resultPollingThreads;
78   
79    [Storable]
80    private IDictionary<Guid, IOptimizer> pendingOptimizers = new Dictionary<Guid, IOptimizer>();
81
82    [Storable]
83    private JobItemList jobItems;
84    public JobItemList JobItems {
85      get { return jobItems; }
86    }
87
88
89    [Storable]
90    private string serverUrl;
91    public string ServerUrl {
92      get { return serverUrl; }
93      set {
94        if (serverUrl != value) {
95          serverUrl = value;
96          OnServerUrlChanged();
97        }
98      }
99    }
100
101    [Storable]
102    private string resourceIds;
103    public string ResourceIds {
104      get { return resourceIds; }
105      set {
106        if (resourceIds != value) {
107          resourceIds = value;
108          OnResourceIdsChanged();
109        }
110      }
111    }
112
113    [Storable]
114    private HeuristicLab.Optimization.Experiment experiment;
115    public HeuristicLab.Optimization.Experiment Experiment {
116      get { return experiment; }
117      set {
118        if (experiment != value) {
119          experiment = value;
120          OnExperimentChanged();
121        }
122      }
123    }
124
125    [Storable]
126    private ILog log;
127    public ILog Log {
128      get { return log; }
129    }
130
131    [StorableConstructor]
132    public HiveExperiment(bool deserializing)
133      : base(deserializing) {
134      this.resultPollingThreads = new Dictionary<Guid, Thread>();
135      jobItems = new JobItemList();
136    }
137
138    public HiveExperiment()
139      : base(itemName, itemDescription) {
140      this.ServerUrl = HeuristicLab.Hive.Experiment.Properties.Settings.Default.HiveServerUrl;
141      this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
142      this.log = new Log();
143      pausePending = stopPending = false;
144      jobItems = new JobItemList();
145      isPollingResults = false;
146      resultPollingThreads = new Dictionary<Guid, Thread>();
147      InitTimer();
148    }
149
150    public override IDeepCloneable Clone(Cloner cloner) {
151      LogMessage("I am beeing cloned");
152      HiveExperiment clone = (HiveExperiment)base.Clone(cloner);
153      clone.resourceIds = this.resourceIds;
154      clone.serverUrl = this.serverUrl;
155      clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment);
156      clone.executionState = this.executionState;
157      clone.executionTime = this.executionTime;
158      clone.pendingOptimizers = new Dictionary<Guid, IOptimizer>();
159      foreach (var pair in this.pendingOptimizers)
160        clone.pendingOptimizers[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
161      clone.log = (ILog)cloner.Clone(log);
162      clone.stopPending = this.stopPending;
163      clone.pausePending = this.pausePending;
164      clone.jobItems = (JobItemList)cloner.Clone(jobItems);
165      clone.lastUpdateTime = this.lastUpdateTime;
166      clone.isPollingResults = this.isPollingResults;
167      return clone;
168    }
169
170    [StorableHook(HookType.AfterDeserialization)]
171    private void AfterDeserialization() {
172      InitTimer();
173      this.IsPollingResults = false;
174      this.stopResultsPollingPending = false;
175      LogMessage("I was deserialized.");
176    }
177
178    private void InitTimer() {
179      timer = new System.Timers.Timer(100);
180      timer.AutoReset = true;
181      timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
182    }
183
184    private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
185      DateTime now = DateTime.Now;
186      ExecutionTime += now - lastUpdateTime;
187      lastUpdateTime = now;
188    }
189
190    public IEnumerable<string> ResourceGroups {
191      get {
192        if (!string.IsNullOrEmpty(resourceIds)) {
193          return resourceIds.Split(';');
194        } else {
195          return new List<string>();
196        }
197      }
198    }
199    #region IExecutable Members
200
201    [Storable]
202    private Core.ExecutionState executionState;
203    public ExecutionState ExecutionState {
204      get { return executionState; }
205      private set {
206        if (executionState != value) {
207          executionState = value;
208          OnExecutionStateChanged();
209        }
210      }
211    }
212
213    [Storable]
214    private TimeSpan executionTime;
215    public TimeSpan ExecutionTime {
216      get { return executionTime; }
217      private set {
218        if (executionTime != value) {
219          executionTime = value;
220          OnExecutionTimeChanged();
221        }
222      }
223    }
224
225    public void Pause() {
226      throw new NotSupportedException();
227    }
228
229    public void Prepare() {
230      if (experiment != null) {
231        experiment.Prepare();
232        this.ExecutionState = Core.ExecutionState.Prepared;
233        OnPrepared();
234      }
235    }
236
237    public void Start() {
238      OnStarted();
239      lastUpdateTime = DateTime.Now;
240      this.ExecutionState = Core.ExecutionState.Started;
241      Thread t = new Thread(() => {
242        IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
243
244        pendingOptimizers = new Dictionary<Guid, IOptimizer>();
245        IEnumerable<string> groups = ResourceGroups;
246
247        foreach (IOptimizer optimizer in GetOptimizers(false)) {
248          SerializedJob serializedJob = CreateSerializedJob(optimizer);
249          ResponseObject<JobDto> response = executionEngineFacade.AddJobWithGroupStrings(serializedJob, groups);
250          pendingOptimizers.Add(response.Obj.Id, optimizer);
251
252          JobItem jobItem = new JobItem() {
253            JobDto = response.Obj,
254            LatestSnapshot = new ResponseObject<SerializedJob>() {
255              Obj = serializedJob,
256              StatusMessage = "Initial Snapshot",
257              Success = true
258            }
259          };
260          jobItems.Add(jobItem);
261          jobItem.LogMessage("Job sent to Hive");
262
263          LogMessage("Sent job to Hive (jobId: " + response.Obj.Id + ")");
264        }
265       
266        // start results polling after sending sending the jobs to the server (to avoid race conflicts at the optimizers-collection)
267        StartResultPolling();
268      });
269      t.Start();
270    }
271
272    private void CreateResultPollingThreads() {
273      foreach(JobItem jobItem in JobItems) {
274        resultPollingThreads.Add(jobItem.JobDto.Id, CreateResultPollingThread(jobItem.JobDto));
275      }
276    }
277
278    public void StartResultPolling() {
279      this.stopResultsPollingPending = false;
280      CreateResultPollingThreads();
281      foreach (Thread pollingThread in resultPollingThreads.Values) {
282        pollingThread.Start();
283      }
284      this.IsPollingResults = true;
285    }
286
287    public void StopResultPolling() {
288      this.stopResultsPollingPending = true;
289      foreach (Thread pollingThread in resultPollingThreads.Values) {
290        pollingThread.Interrupt();
291      }
292      this.stopResultsPollingPending = false;
293    }
294
295    private JobItem GetJobItemById(Guid jobId) {
296      return jobItems.Single(x => x.JobDto.Id == jobId);
297    }
298
299    /// <summary>
300    /// Returns all optimizers in the current Experiment
301    /// </summary>
302    /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
303    /// <returns></returns>
304    private IEnumerable<IOptimizer> GetOptimizers(bool flatout) {
305      if (!flatout) {
306        return experiment.Optimizers;
307      } else {
308        throw new NotImplementedException();
309      }
310    }
311
312    private void ReplaceOptimizer(IOptimizer originalOptimizer, IOptimizer newOptimizer) {
313      lock (locker) {
314        int originalOptimizerIndex = experiment.Optimizers.IndexOf(originalOptimizer);
315        experiment.Optimizers[originalOptimizerIndex] = newOptimizer;
316      }
317    }
318
319    public void Stop() {
320      foreach(JobItem jobItem in jobItems) {
321        AbortJob(jobItem.JobDto.Id);
322      }
323    }
324
325    public void AbortJob(Guid jobId) {
326      IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
327      executionEngineFacade.AbortJob(jobId);
328      resultPollingThreads[jobId].Interrupt();
329      GetJobItemById(jobId).LogMessage("Aborting Job");
330    }
331    #endregion
332
333    private IExecutionEngineFacade GetExecutionEngineFacade() {
334      return ServiceLocator.CreateExecutionEngineFacade(ServerUrl);
335    }
336
337    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
338      IJob job = new OptimizerJob() {
339        Optimizer = optimizer
340      };
341
342      // serialize job
343      MemoryStream memStream = new MemoryStream();
344      XmlGenerator.Serialize(job, memStream);
345      byte[] jobByteArray = memStream.ToArray();
346      memStream.Dispose();
347
348      // find out which which plugins are needed for the given object
349      List<HivePluginInfoDto> pluginsNeeded = (
350        from p in GetDeclaringPlugins(optimizer.GetType())
351        select new HivePluginInfoDto() {
352          Name = p.Name,
353          Version = p.Version
354        }).ToList();
355
356      JobDto jobDto = new JobDto() {
357        CoresNeeded = 1, // [chn] how to determine real cores needed?
358        PluginsNeeded = pluginsNeeded,
359        State = State.Offline,
360        MemoryNeeded = 0,
361        UserId = Guid.Empty // [chn] set real userid here!
362      };
363
364      SerializedJob serializedJob = new SerializedJob() {
365        JobInfo = jobDto,
366        SerializedJobData = jobByteArray
367      };
368
369      return serializedJob;
370    }
371
372    private Thread CreateResultPollingThread(JobDto job) {
373      return new Thread(() => {
374        try {
375          GetJobItemById(job.Id).LogMessage("Starting job results polling");
376          IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
377          IJob restoredObject = null;
378
379          do {
380            Thread.Sleep(resultPollingIntervalMs);
381            if (stopPending || !this.IsPollingResults) {
382              return;
383            }
384
385            ResponseObject<JobDto> response = executionEngineFacade.GetJobById(job.Id);
386            LogMessage("Response: " + response.StatusMessage + " (jobId: " + job.Id + ")");
387            GetJobItemById(job.Id).LogMessage("Response: " + response.StatusMessage);
388
389            if (response.Obj != null) {
390              UpdateJobItem(response.Obj);
391            }
392
393            // loop while
394            // 1. the user doesn't request an abort
395            // 2. there is a problem with server communication (success==false)
396            // 3. no result for the job is available yet (response.Obj==null)
397            // 4. the result that we get from the server is a snapshot and not the final result
398            if (response.Success && response.Obj != null && response.Obj.State == State.Finished) {
399              ResponseObject<SerializedJob> jobResponse = executionEngineFacade.GetLastSerializedResult(job.Id, false, false);
400              restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
401              UpdateSnapshot(jobResponse);
402            }
403          } while (restoredObject == null || restoredObject.ExecutionState != Core.ExecutionState.Stopped);
404
405          LogMessage("Job finished (jobId: " + job.Id + ")");
406          GetJobItemById(job.Id).LogMessage("Job finished");
407          // job retrieved... replace the existing optimizers with the finished one
408          IOptimizer originalOptimizer = pendingOptimizers[job.Id];
409          IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
410
411          ReplaceOptimizer(originalOptimizer, restoredOptimizer);
412          pendingOptimizers.Remove(job.Id);
413
414          if (pendingOptimizers.Count == 0) {
415            // finished
416            this.ExecutionState = Core.ExecutionState.Stopped;
417            OnStopped();
418          }
419        } catch (ThreadInterruptedException exception) {
420
421        } finally {
422          GetJobItemById(job.Id).LogMessage("ResultsPolling Thread stopped");
423          resultPollingThreads.Remove(job.Id);
424          if (resultPollingThreads.Count == 0) {
425            IsPollingResults = false;
426          }
427        }
428      });
429    }
430
431    private void UpdateJobItem(JobDto jobDto) {
432      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);
433      jobItem.JobDto = jobDto;
434    }
435
436    private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
437      JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
438      jobItem.LatestSnapshot = response;
439    }
440
441    private void LogMessage(string message) {
442      // HeuristicLab.Log is not Thread-Safe, so lock every call
443      lock (locker) {
444        log.LogMessage(message);
445      }
446    }
447
448    #region Required Plugin Search
449    /// <summary>
450    /// Returns a list of plugins in which the type itself and all members
451    /// of the type are declared. Objectgraph is searched recursively.
452    /// </summary>
453    private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
454      HashSet<Type> types = new HashSet<Type>();
455      FindTypes(type, types, "HeuristicLab.");
456      return GetDeclaringPlugins(types);
457    }
458
459    /// <summary>
460    /// Returns the plugins (including dependencies) in which the given types are declared
461    /// </summary>
462    private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
463      HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
464      foreach (Type t in types) {
465        FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
466      }
467      return plugins;
468    }
469
470    /// <summary>
471    /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
472    /// Also searches the dependencies recursively.
473    /// </summary>
474    private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
475      if (!plugins.Contains(plugin)) {
476        plugins.Add(plugin);
477        foreach (IPluginDescription dependency in plugin.Dependencies) {
478          FindDeclaringPlugins(dependency, plugins);
479        }
480      }
481    }
482
483    /// <summary>
484    /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
485    /// Be aware that search is not performed on attributes
486    /// </summary>
487    /// <param name="type">the type to be searched</param>
488    /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
489    /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
490    private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
491      if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
492        types.Add(type);
493
494        // constructors
495        foreach (ConstructorInfo info in type.GetConstructors()) {
496          foreach (ParameterInfo paramInfo in info.GetParameters()) {
497            FindTypes(paramInfo.ParameterType, types, namespaceStart);
498          }
499        }
500
501        // interfaces
502        foreach (Type t in type.GetInterfaces()) {
503          FindTypes(t, types, namespaceStart);
504        }
505
506        // events
507        foreach (EventInfo info in type.GetEvents()) {
508          FindTypes(info.EventHandlerType, types, namespaceStart);
509          FindTypes(info.DeclaringType, types, namespaceStart);
510        }
511
512        // properties
513        foreach (PropertyInfo info in type.GetProperties()) {
514          FindTypes(info.PropertyType, types, namespaceStart);
515        }
516
517        // fields
518        foreach (FieldInfo info in type.GetFields()) {
519          FindTypes(info.FieldType, types, namespaceStart);
520        }
521
522        // methods
523        foreach (MethodInfo info in type.GetMethods()) {
524          foreach (ParameterInfo paramInfo in info.GetParameters()) {
525            FindTypes(paramInfo.ParameterType, types, namespaceStart);
526          }
527          FindTypes(info.ReturnType, types, namespaceStart);
528        }
529      }
530    }
531    #endregion
532
533    #region Eventhandler
534
535    public event EventHandler ExecutionTimeChanged;
536    private void OnExecutionTimeChanged() {
537      EventHandler handler = ExecutionTimeChanged;
538      if (handler != null) handler(this, EventArgs.Empty);
539    }
540
541    public event EventHandler ExecutionStateChanged;
542    private void OnExecutionStateChanged() {
543      LogMessage("ExecutionState changed to " + executionState.ToString());
544      EventHandler handler = ExecutionStateChanged;
545      if (handler != null) handler(this, EventArgs.Empty);
546    }
547
548    public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
549
550    public event EventHandler Started;
551    private void OnStarted() {
552      LogMessage("Started");
553      timer.Start();
554      EventHandler handler = Started;
555      if (handler != null) handler(this, EventArgs.Empty);
556    }
557
558    public event EventHandler Stopped;
559    private void OnStopped() {
560      timer.Stop();
561      LogMessage("Stopped");
562      EventHandler handler = Stopped;
563      if (handler != null) handler(this, EventArgs.Empty);
564    }
565
566    public event EventHandler Paused;
567    private void OnPaused() {
568      timer.Stop();
569      LogMessage("Paused");
570      EventHandler handler = Paused;
571      if (handler != null) handler(this, EventArgs.Empty);
572    }
573
574    public event EventHandler Prepared;
575    protected virtual void OnPrepared() {
576      LogMessage("Prepared");
577      EventHandler handler = Prepared;
578      if (handler != null) handler(this, EventArgs.Empty);
579    }
580
581    public event EventHandler ResourceIdsChanged;
582    protected virtual void OnResourceIdsChanged() {
583      EventHandler handler = ResourceIdsChanged;
584      if (handler != null) handler(this, EventArgs.Empty);
585    }
586
587    public event EventHandler ExperimentChanged;
588    protected virtual void OnExperimentChanged() {
589      LogMessage("Experiment changed");
590      EventHandler handler = ExperimentChanged;
591      if (handler != null) handler(this, EventArgs.Empty);
592    }
593
594    public event EventHandler ServerUrlChanged;
595    protected virtual void OnServerUrlChanged() {
596      EventHandler handler = ServerUrlChanged;
597      if (handler != null) handler(this, EventArgs.Empty);
598    }
599
600    public event EventHandler IsResultsPollingChanged;
601    private void OnIsPollingResultsChanged() {
602      if (this.IsPollingResults) {
603        LogMessage("Results Polling Started");
604        timer.Start();
605      } else {
606        LogMessage("Results Polling Stopped");
607        timer.Stop();
608      }
609      EventHandler handler = IsResultsPollingChanged;
610      if (handler != null) handler(this, EventArgs.Empty);
611    }
612    #endregion
613  }
614}
Note: See TracBrowser for help on using the repository browser.