#region License Information /* HeuristicLab * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Linq; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; using HeuristicLab.Optimization; using System.Drawing; using HeuristicLab.Collections; using System.Collections.Generic; using HeuristicLab.Hive.Contracts.BusinessObjects; using System.IO; using HeuristicLab.Persistence.Default.Xml; using HeuristicLab.PluginInfrastructure; using System.Reflection; using HeuristicLab.Hive.Contracts.Interfaces; using HeuristicLab.Hive.Contracts; using System.Threading; using HeuristicLab.Tracing; using HeuristicLab.Hive.JobBase; using System.Diagnostics; using System.Collections; namespace HeuristicLab.Hive.Experiment { /// /// An experiment which contains multiple batch runs of algorithms. /// [Item(itemName, itemDescription)] [Creatable("Testing & Analysis")] [StorableClass] public class HiveExperiment : NamedItem, IExecutable { private const string itemName = "Hive Experiment"; private const string itemDescription = "An experiment which contains multiple batch runs of algorithms which are executed in the Hive."; private const int resultPollingIntervalMs = 15000; private object locker = new object(); private const int maxSnapshotRetries = 20; private System.Timers.Timer timer; private bool pausePending, stopPending; private DateTime lastUpdateTime; [Storable] private IDictionary pendingOptimizers = new Dictionary(); [Storable] private JobItemList jobItems; public JobItemList JobItems { get { return jobItems; } } [Storable] private string serverUrl; public string ServerUrl { get { return serverUrl; } set { if (serverUrl != value) { serverUrl = value; OnServerUrlChanged(); } } } [Storable] private string resourceIds; public string ResourceIds { get { return resourceIds; } set { if (resourceIds != value) { resourceIds = value; OnResourceIdsChanged(); } } } [Storable] private HeuristicLab.Optimization.Experiment experiment; public HeuristicLab.Optimization.Experiment Experiment { get { return experiment; } set { if (experiment != value) { experiment = value; OnExperimentChanged(); } } } [Storable] private ILog log; public ILog Log { get { return log; } } [StorableConstructor] public HiveExperiment(bool deserializing) : base(deserializing) { } public HiveExperiment() : base(itemName, itemDescription) { this.ServerUrl = HeuristicLab.Hive.Experiment.Properties.Settings.Default.HiveServerUrl; this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds; this.log = new Log(); pausePending = stopPending = false; jobItems = new JobItemList(); InitTimer(); } public override IDeepCloneable Clone(Cloner cloner) { LogMessage("I am beeing cloned"); HiveExperiment clone = (HiveExperiment)base.Clone(cloner); clone.resourceIds = this.resourceIds; clone.serverUrl = this.serverUrl; clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment); clone.executionState = this.executionState; clone.executionTime = this.executionTime; clone.pendingOptimizers = new Dictionary(); foreach (var pair in this.pendingOptimizers) clone.pendingOptimizers[pair.Key] = (IOptimizer)cloner.Clone(pair.Value); clone.log = (ILog)cloner.Clone(log); clone.stopPending = this.stopPending; clone.pausePending = this.pausePending; clone.jobItems = (JobItemList)cloner.Clone(jobItems); return clone; } [StorableHook(HookType.AfterDeserialization)] private void AfterDeserialization() { InitTimer(); LogMessage("I was deserialized."); } private void InitTimer() { timer = new System.Timers.Timer(100); timer.AutoReset = true; timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed); } private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { DateTime now = DateTime.Now; ExecutionTime += now - lastUpdateTime; lastUpdateTime = now; } public IEnumerable ResourceGroups { get { if (!string.IsNullOrEmpty(resourceIds)) { return resourceIds.Split(';'); } else { return new List(); } } } #region IExecutable Members [Storable] private Core.ExecutionState executionState; public ExecutionState ExecutionState { get { return executionState; } private set { if (executionState != value) { executionState = value; OnExecutionStateChanged(); } } } [Storable] private TimeSpan executionTime; public TimeSpan ExecutionTime { get { return executionTime; } private set { if (executionTime != value) { executionTime = value; OnExecutionTimeChanged(); } } } public void Pause() { throw new NotSupportedException(); } public void Prepare() { if (experiment != null) { experiment.Prepare(); this.ExecutionState = Core.ExecutionState.Prepared; OnPrepared(); } } public void Start() { OnStarted(); lastUpdateTime = DateTime.Now; this.ExecutionState = Core.ExecutionState.Started; Thread t = new Thread(() => { IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl); pendingOptimizers = new Dictionary(); IEnumerable groups = ResourceGroups; foreach (IOptimizer optimizer in GetOptimizers(false)) { SerializedJob serializedJob = CreateSerializedJob(optimizer); ResponseObject response = executionEngineFacade.AddJobWithGroupStrings(serializedJob, groups); pendingOptimizers.Add(response.Obj.Id, optimizer); JobItem jobItem = new JobItem() { JobDto = response.Obj, LatestSnapshot = new ResponseObject() { Obj = serializedJob, StatusMessage = "Initial Snapshot", Success = true } }; jobItems.Add(jobItem); LogMessage("Sent job to server (jobId: " + response.Obj.Id + ")"); } // start results polling after sending sending the jobs to the server (to avoid race conflicts at the optimizers-collection) foreach (JobItem jobItem in jobItems) { StartResultPollingThread(jobItem.JobDto); } }); t.Start(); } /// /// Returns all optimizers in the current Experiment /// /// if false only top level optimizers are returned, if true the optimizer-tree is flatted /// private IEnumerable GetOptimizers(bool flatout) { if (!flatout) { return experiment.Optimizers; } else { throw new NotImplementedException(); } } private void ReplaceOptimizer(IOptimizer originalOptimizer, IOptimizer newOptimizer) { lock (locker) { int originalOptimizerIndex = experiment.Optimizers.IndexOf(originalOptimizer); experiment.Optimizers[originalOptimizerIndex] = newOptimizer; } } public void Stop() { // todo } #endregion private SerializedJob CreateSerializedJob(IOptimizer optimizer) { IJob job = new OptimizerJob() { Optimizer = optimizer }; // serialize job MemoryStream memStream = new MemoryStream(); XmlGenerator.Serialize(job, memStream); byte[] jobByteArray = memStream.ToArray(); memStream.Dispose(); // find out which which plugins are needed for the given object List pluginsNeeded = ( from p in GetDeclaringPlugins(optimizer.GetType()) select new HivePluginInfoDto() { Name = p.Name, Version = p.Version }).ToList(); JobDto jobDto = new JobDto() { CoresNeeded = 1, // [chn] how to determine real cores needed? PluginsNeeded = pluginsNeeded, State = State.Offline, MemoryNeeded = 0, UserId = Guid.Empty // [chn] set real userid here! }; SerializedJob serializedJob = new SerializedJob() { JobInfo = jobDto, SerializedJobData = jobByteArray }; return serializedJob; } private void StartResultPollingThread(JobDto job) { Thread t = new Thread(() => { IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl); IJob restoredObject = null; do { Thread.Sleep(resultPollingIntervalMs); //lock (locker) { [chn] try without locking for better performance if (stopPending) return; ResponseObject response = executionEngineFacade.GetJobById(job.Id); LogMessage("Response: " + response.StatusMessage + " (jobId: " + job.Id + ")"); if (response.Obj != null) { UpdateJobItem(response.Obj); } // loop while // 1. the user doesn't request an abort // 2. there is a problem with server communication (success==false) // 3. no result for the job is available yet (response.Obj==null) // 4. the result that we get from the server is a snapshot and not the final result if (response.Success && response.Obj != null && response.Obj.State == State.Finished) { ResponseObject jobResponse = executionEngineFacade.GetLastSerializedResult(job.Id, false, false); restoredObject = XmlParser.Deserialize(new MemoryStream(jobResponse.Obj.SerializedJobData)); UpdateSnapshot(jobResponse); } //} } while (restoredObject == null || restoredObject.ExecutionState != Core.ExecutionState.Stopped); LogMessage("Job finished (jobId: " + job.Id + ")"); // job retrieved... replace the existing optimizers with the finished one IOptimizer originalOptimizer = pendingOptimizers[job.Id]; IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer; ReplaceOptimizer(originalOptimizer, restoredOptimizer); pendingOptimizers.Remove(job.Id); if (pendingOptimizers.Count == 0) { // finished this.ExecutionState = Core.ExecutionState.Stopped; OnStopped(); } }); t.Start(); } private void UpdateJobItem(JobDto jobDto) { JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id); jobItem.JobDto = jobDto; } private void UpdateSnapshot(ResponseObject response) { JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id); jobItem.LatestSnapshot = response; } private void LogMessage(string message) { // HeuristicLab.Log is not Thread-Safe, so lock every call lock (locker) { log.LogMessage(message); } } #region Required Plugin Search /// /// Returns a list of plugins in which the type itself and all members /// of the type are declared. Objectgraph is searched recursively. /// private IEnumerable GetDeclaringPlugins(Type type) { HashSet types = new HashSet(); FindTypes(type, types, "HeuristicLab."); return GetDeclaringPlugins(types); } /// /// Returns the plugins (including dependencies) in which the given types are declared /// private IEnumerable GetDeclaringPlugins(IEnumerable types) { HashSet plugins = new HashSet(); foreach (Type t in types) { FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins); } return plugins; } /// /// Finds the dependencies of the given plugin and adds it to the plugins hashset. /// Also searches the dependencies recursively. /// private void FindDeclaringPlugins(IPluginDescription plugin, HashSet plugins) { if (!plugins.Contains(plugin)) { plugins.Add(plugin); foreach (IPluginDescription dependency in plugin.Dependencies) { FindDeclaringPlugins(dependency, plugins); } } } /// /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart /// Be aware that search is not performed on attributes /// /// the type to be searched /// found types will be stored there, needed in order to avoid duplicates /// only types from namespaces which start with this will be searched and added private void FindTypes(Type type, HashSet types, string namespaceStart) { if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) { types.Add(type); // constructors foreach (ConstructorInfo info in type.GetConstructors()) { foreach (ParameterInfo paramInfo in info.GetParameters()) { FindTypes(paramInfo.ParameterType, types, namespaceStart); } } // interfaces foreach (Type t in type.GetInterfaces()) { FindTypes(t, types, namespaceStart); } // events foreach (EventInfo info in type.GetEvents()) { FindTypes(info.EventHandlerType, types, namespaceStart); FindTypes(info.DeclaringType, types, namespaceStart); } // properties foreach (PropertyInfo info in type.GetProperties()) { FindTypes(info.PropertyType, types, namespaceStart); } // fields foreach (FieldInfo info in type.GetFields()) { FindTypes(info.FieldType, types, namespaceStart); } // methods foreach (MethodInfo info in type.GetMethods()) { foreach (ParameterInfo paramInfo in info.GetParameters()) { FindTypes(paramInfo.ParameterType, types, namespaceStart); } FindTypes(info.ReturnType, types, namespaceStart); } } } #endregion #region Eventhandler public event EventHandler ExecutionTimeChanged; private void OnExecutionTimeChanged() { EventHandler handler = ExecutionTimeChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ExecutionStateChanged; private void OnExecutionStateChanged() { LogMessage("ExecutionState changed to " + executionState.ToString()); EventHandler handler = ExecutionStateChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler> ExceptionOccurred; public event EventHandler Started; private void OnStarted() { LogMessage("Started"); timer.Start(); EventHandler handler = Started; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Stopped; private void OnStopped() { timer.Stop(); LogMessage("Stopped"); EventHandler handler = Stopped; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Paused; private void OnPaused() { timer.Stop(); LogMessage("Paused"); EventHandler handler = Paused; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Prepared; protected virtual void OnPrepared() { LogMessage("Prepared"); EventHandler handler = Prepared; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ResourceIdsChanged; protected virtual void OnResourceIdsChanged() { EventHandler handler = ResourceIdsChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ExperimentChanged; protected virtual void OnExperimentChanged() { LogMessage("Experiment changed"); EventHandler handler = ExperimentChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ServerUrlChanged; protected virtual void OnServerUrlChanged() { EventHandler handler = ServerUrlChanged; if (handler != null) handler(this, EventArgs.Empty); } #endregion } }