#region License Information /* HeuristicLab * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Linq; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; using HeuristicLab.Optimization; using System.Drawing; using HeuristicLab.Collections; using System.Collections.Generic; using HeuristicLab.Hive.Contracts.BusinessObjects; using System.IO; using HeuristicLab.Persistence.Default.Xml; using HeuristicLab.PluginInfrastructure; using System.Reflection; using HeuristicLab.Hive.Contracts.Interfaces; using HeuristicLab.Hive.Contracts; using System.Threading; using HeuristicLab.Tracing; using HeuristicLab.Hive.JobBase; using System.Diagnostics; using System.Collections; namespace HeuristicLab.Hive.Experiment { /// /// An experiment which contains multiple batch runs of algorithms. /// [Item(itemName, itemDescription)] [Creatable("Testing & Analysis")] [StorableClass] public class HiveExperiment : NamedItem, IExecutable { private const string itemName = "Hive Experiment"; private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive."; private const int resultPollingIntervalMs = 15000; private const int snapshotPollingIntervalMs = 1000; private const int maxSnapshotRetries = 20; private object locker = new object(); private System.Timers.Timer timer; private bool pausePending, stopPending; [Storable] private DateTime lastUpdateTime; private bool isPollingResults; public bool IsPollingResults { get { return isPollingResults; } private set { if (isPollingResults != value) { isPollingResults = value; OnIsPollingResultsChanged(); } } } private bool stopResultsPollingPending = false; private IDictionary resultPollingThreads; /// /// Mapping from JobId to an optimizer. /// Stores all pending optimizers. If an optimizer is finished it is removed from this collection /// [Storable] private IDictionary pendingOptimizersByJobId = new Dictionary(); /// /// Stores a mapping from the child-optimizer to the parent optimizer. /// Needed to replace a finished optimizer in the optimizer-tree. /// Only pending optmizers are stored. /// [Storable] private IDictionary parentOptimizersByPendingOptimizer = new Dictionary(); [Storable] private JobItemList jobItems; public JobItemList JobItems { get { return jobItems; } } [Storable] private string serverUrl; public string ServerUrl { get { return serverUrl; } set { if (serverUrl != value) { serverUrl = value; OnServerUrlChanged(); } } } [Storable] private string resourceIds; public string ResourceIds { get { return resourceIds; } set { if (resourceIds != value) { resourceIds = value; OnResourceIdsChanged(); } } } [Storable] private HeuristicLab.Optimization.Experiment experiment; public HeuristicLab.Optimization.Experiment Experiment { get { return experiment; } set { if (experiment != value) { experiment = value; OnExperimentChanged(); } } } [Storable] private ILog log; public ILog Log { get { return log; } } [StorableConstructor] public HiveExperiment(bool deserializing) : base(deserializing) { this.resultPollingThreads = new Dictionary(); jobItems = new JobItemList(); } public HiveExperiment() : base(itemName, itemDescription) { this.ServerUrl = HeuristicLab.Hive.Experiment.Properties.Settings.Default.HiveServerUrl; this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds; this.log = new Log(); pausePending = stopPending = false; jobItems = new JobItemList(); isPollingResults = false; resultPollingThreads = new Dictionary(); InitTimer(); } public override IDeepCloneable Clone(Cloner cloner) { LogMessage("I am beeing cloned"); HiveExperiment clone = (HiveExperiment)base.Clone(cloner); clone.resourceIds = this.resourceIds; clone.serverUrl = this.serverUrl; clone.experiment = (HeuristicLab.Optimization.Experiment)cloner.Clone(experiment); clone.executionState = this.executionState; clone.executionTime = this.executionTime; clone.pendingOptimizersByJobId = new Dictionary(); foreach (var pair in this.pendingOptimizersByJobId) clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value); foreach (var pair in this.parentOptimizersByPendingOptimizer) clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value); clone.log = (ILog)cloner.Clone(log); clone.stopPending = this.stopPending; clone.pausePending = this.pausePending; clone.jobItems = (JobItemList)cloner.Clone(jobItems); clone.lastUpdateTime = this.lastUpdateTime; clone.isPollingResults = this.isPollingResults; return clone; } [StorableHook(HookType.AfterDeserialization)] private void AfterDeserialization() { InitTimer(); this.IsPollingResults = false; this.stopResultsPollingPending = false; LogMessage("I was deserialized."); } private void InitTimer() { timer = new System.Timers.Timer(100); timer.AutoReset = true; timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed); } private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { DateTime now = DateTime.Now; ExecutionTime += now - lastUpdateTime; lastUpdateTime = now; } public IEnumerable ResourceGroups { get { if (!string.IsNullOrEmpty(resourceIds)) { return resourceIds.Split(';'); } else { return new List(); } } } #region IExecutable Members [Storable] private Core.ExecutionState executionState; public ExecutionState ExecutionState { get { return executionState; } private set { if (executionState != value) { executionState = value; OnExecutionStateChanged(); } } } [Storable] private TimeSpan executionTime; public TimeSpan ExecutionTime { get { return executionTime; } private set { if (executionTime != value) { executionTime = value; OnExecutionTimeChanged(); } } } public void Pause() { throw new NotSupportedException(); } public void Prepare() { if (experiment != null) { StopResultPolling(); pendingOptimizersByJobId.Clear(); parentOptimizersByPendingOptimizer.Clear(); jobItems.Clear(); experiment.Prepare(); this.ExecutionState = Core.ExecutionState.Prepared; OnPrepared(); } } public void Start() { OnStarted(); ExecutionTime = new TimeSpan(); lastUpdateTime = DateTime.Now; this.ExecutionState = Core.ExecutionState.Started; Thread t = new Thread(() => { IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade(); pendingOptimizersByJobId = new Dictionary(); parentOptimizersByPendingOptimizer = GetOptimizers(true); IEnumerable groups = ResourceGroups; foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) { SerializedJob serializedJob = CreateSerializedJob(optimizer); ResponseObject response = executionEngineFacade.AddJobWithGroupStrings(serializedJob, groups); pendingOptimizersByJobId.Add(response.Obj.Id, optimizer); JobItem jobItem = new JobItem() { JobDto = response.Obj, LatestSnapshot = new ResponseObject() { Obj = serializedJob, StatusMessage = "Initial Snapshot", Success = true } }; jobItems.Add(jobItem); jobItem.LogMessage("Job sent to Hive"); LogMessage("Sent job to Hive (jobId: " + response.Obj.Id + ")"); } // start results polling after sending sending the jobs to the server (to avoid race conflicts at the optimizers-collection) StartResultPolling(); }); t.Start(); } public void Stop() { this.ExecutionState = Core.ExecutionState.Stopped; foreach (JobItem jobItem in jobItems) { AbortJob(jobItem.JobDto.Id); } OnStopped(); } private void CreateResultPollingThreads() { foreach (JobItem jobItem in JobItems) { if (!resultPollingThreads.ContainsKey(jobItem.JobDto.Id) && jobItem.JobDto.State != State.Finished) { resultPollingThreads.Add(jobItem.JobDto.Id, CreateResultPollingThread(jobItem.JobDto)); } } } public void StartResultPolling() { this.stopResultsPollingPending = false; this.IsPollingResults = true; lock (resultPollingThreads) { CreateResultPollingThreads(); foreach (Thread pollingThread in resultPollingThreads.Values) { if (pollingThread.ThreadState != System.Threading.ThreadState.Running) { pollingThread.Start(); } } } } public void StopResultPolling() { this.stopResultsPollingPending = true; foreach (Thread pollingThread in resultPollingThreads.Values) { pollingThread.Interrupt(); } this.stopResultsPollingPending = false; } private JobItem GetJobItemById(Guid jobId) { return jobItems.Single(x => x.JobDto.Id == jobId); } /// /// Returns all optimizers in the current Experiment /// /// if false only top level optimizers are returned, if true the optimizer-tree is flatted /// private IDictionary GetOptimizers(bool flatout) { if (!flatout) { var optimizers = new Dictionary(); foreach (IOptimizer opt in experiment.Optimizers) { optimizers.Add(experiment, opt); } return optimizers; } else { return FlatOptimizerTree(null, experiment); } } /// /// Recursively iterates all IOptimizers in the optimizer-tree and returns them. /// /// [chn] this could be implemented more cleanly if Experiment and BatchRun would implement an interface like: /// interface IParallelizable { /// IEnumerable<IOptimizer> GetOptimizers(); /// } /// /// a dictionary mapping from the parent optimizer to the child optimizer private IDictionary FlatOptimizerTree(IOptimizer parent, IOptimizer optimizer) { IDictionary optimizers = new Dictionary(); if (optimizer is HeuristicLab.Optimization.Experiment) { HeuristicLab.Optimization.Experiment experiment = optimizer as HeuristicLab.Optimization.Experiment; foreach (IOptimizer opt in experiment.Optimizers) { AddRange(optimizers, FlatOptimizerTree(experiment, opt)); } } else if (optimizer is BatchRun) { BatchRun batchRun = optimizer as BatchRun; for (int i = 0; i < batchRun.Repetitions; i++) { IOptimizer opt = (IOptimizer)batchRun.Algorithm.Clone(); AddRange(optimizers, FlatOptimizerTree(batchRun, opt)); } } else if (optimizer is EngineAlgorithm) { optimizers.Add(optimizer, parent); } else { Logger.Warn("Optimizer of type " + optimizers.GetType().ToString() + " unknown"); optimizers.Add(optimizer, parent); } return optimizers; } private void AddRange(IDictionary optimizers, IDictionary childs) { foreach (KeyValuePair kvp in childs) { optimizers.Add(kvp); } } private void ReplaceOptimizer(IOptimizer parentOptimizer, IOptimizer originalOptimizer, IOptimizer newOptimizer) { lock (locker) { if (parentOptimizer is HeuristicLab.Optimization.Experiment) { HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)parentOptimizer; int originalOptimizerIndex = exp.Optimizers.IndexOf(originalOptimizer); exp.Optimizers[originalOptimizerIndex] = newOptimizer; } else if (parentOptimizer is BatchRun) { BatchRun batchRun = (BatchRun)parentOptimizer; if (newOptimizer is IAlgorithm) { batchRun.Runs.Add(new Run((IAlgorithm)newOptimizer)); } else { throw new NotSupportedException("Only IAlgorithm types supported"); } } else { throw new NotSupportedException("Invalid parentOptimizer"); } } } public void AbortJob(Guid jobId) { IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade(); Response response = executionEngineFacade.AbortJob(jobId); GetJobItemById(jobId).LogMessage("Aborting Job: " + response.StatusMessage); } #endregion private IExecutionEngineFacade GetExecutionEngineFacade() { return ServiceLocator.CreateExecutionEngineFacade(ServerUrl); } private SerializedJob CreateSerializedJob(IOptimizer optimizer) { IJob job = new OptimizerJob() { Optimizer = optimizer }; // serialize job MemoryStream memStream = new MemoryStream(); XmlGenerator.Serialize(job, memStream); byte[] jobByteArray = memStream.ToArray(); memStream.Dispose(); // find out which which plugins are needed for the given object List pluginsNeeded = ( from p in GetDeclaringPlugins(optimizer.GetType()) select new HivePluginInfoDto() { Name = p.Name, Version = p.Version }).ToList(); JobDto jobDto = new JobDto() { CoresNeeded = 1, // [chn] how to determine real cores needed? PluginsNeeded = pluginsNeeded, State = State.Offline, MemoryNeeded = 0, UserId = Guid.Empty // [chn] set real userid here! }; SerializedJob serializedJob = new SerializedJob() { JobInfo = jobDto, SerializedJobData = jobByteArray }; return serializedJob; } private Thread CreateResultPollingThread(JobDto job) { return new Thread(() => { try { GetJobItemById(job.Id).LogMessage("Starting job results polling"); IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade(); IJob restoredObject = null; do { if (stopPending || !this.IsPollingResults) { return; } ResponseObject response = executionEngineFacade.GetJobById(job.Id); LogMessage("Response: " + response.StatusMessage + " (jobId: " + job.Id + ")"); GetJobItemById(job.Id).LogMessage("Response: " + response.StatusMessage); UpdateJobItem(response.Obj); if (response.Obj.State == State.Abort) { // job is aborted, don't poll for results anymore GetJobItemById(job.Id).LogMessage("Job successfully aborted"); return; } // loop while // 1. the user doesn't request an abort // 2. there is a problem with server communication (success==false) // 3. no result for the job is available yet (response.Obj==null) // 4. the result that we get from the server is a snapshot and not the final result if (response.Success && response.Obj != null && response.Obj.State == State.Finished) { ResponseObject jobResponse = executionEngineFacade.GetLastSerializedResult(job.Id, false, false); restoredObject = XmlParser.Deserialize(new MemoryStream(jobResponse.Obj.SerializedJobData)); UpdateSnapshot(jobResponse); } Thread.Sleep(resultPollingIntervalMs); } while (restoredObject == null || restoredObject.ExecutionState != Core.ExecutionState.Stopped); LogMessage("Job finished (jobId: " + job.Id + ")"); GetJobItemById(job.Id).LogMessage("Job finished"); // job retrieved... replace the existing optimizers with the finished one IOptimizer originalOptimizer = pendingOptimizersByJobId[job.Id]; IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer; ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer); pendingOptimizersByJobId.Remove(job.Id); parentOptimizersByPendingOptimizer.Remove(originalOptimizer); } catch (ThreadInterruptedException exception) { } finally { GetJobItemById(job.Id).LogMessage("ResultsPolling Thread stopped"); lock (resultPollingThreads) { resultPollingThreads.Remove(job.Id); if (resultPollingThreads.Count == 0) { IsPollingResults = false; } } // check if finished if (pendingOptimizersByJobId.Count == 0) { this.ExecutionState = Core.ExecutionState.Stopped; OnStopped(); } } }); } private void UpdateJobItem(JobDto jobDto) { JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id); jobItem.JobDto = jobDto; } private void UpdateSnapshot(ResponseObject response) { JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id); jobItem.LatestSnapshot = response; } private void LogMessage(string message) { // HeuristicLab.Log is not Thread-Safe, so lock on every call lock (locker) { log.LogMessage(message); } } public void RequestSnapshot(Guid jobId) { IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade(); ResponseObject response; int retryCount = 0; Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId); if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) { // job already finished Logger.Debug("HiveEngine: Abort - GetLastResult(false)"); response = executionEngineFacade.GetLastSerializedResult(jobId, false, false); Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success); } else { // server sent snapshot request to client // poll until snapshot is ready do { Thread.Sleep(snapshotPollingIntervalMs); Logger.Debug("HiveEngine: Abort - GetLastResult(true)"); response = executionEngineFacade.GetLastSerializedResult(jobId, false, true); Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success); retryCount++; // loop while // 1. problem with communication with server // 2. job result not yet ready } while ( (retryCount < maxSnapshotRetries) && ( !response.Success || response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE) ); } SerializedJob jobResult = response.Obj; if (jobResult != null) { Logger.Debug("HiveEngine: Results-polling - Got result!"); //job = XmlParser.Deserialize(new MemoryStream(jobResult.SerializedJobData)); throw new NotImplementedException("[chn] how to create a view in 3.3 and why should i do this here? shouldnt the caller of this method receive a result and decide what to do?"); //ControlManager.Manager.ShowControl(job.Engine.CreateView()); } } #region Required Plugin Search /// /// Returns a list of plugins in which the type itself and all members /// of the type are declared. Objectgraph is searched recursively. /// private IEnumerable GetDeclaringPlugins(Type type) { HashSet types = new HashSet(); FindTypes(type, types, "HeuristicLab."); return GetDeclaringPlugins(types); } /// /// Returns the plugins (including dependencies) in which the given types are declared /// private IEnumerable GetDeclaringPlugins(IEnumerable types) { HashSet plugins = new HashSet(); foreach (Type t in types) { FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins); } return plugins; } /// /// Finds the dependencies of the given plugin and adds it to the plugins hashset. /// Also searches the dependencies recursively. /// private void FindDeclaringPlugins(IPluginDescription plugin, HashSet plugins) { if (!plugins.Contains(plugin)) { plugins.Add(plugin); foreach (IPluginDescription dependency in plugin.Dependencies) { FindDeclaringPlugins(dependency, plugins); } } } /// /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart /// Be aware that search is not performed on attributes /// /// the type to be searched /// found types will be stored there, needed in order to avoid duplicates /// only types from namespaces which start with this will be searched and added private void FindTypes(Type type, HashSet types, string namespaceStart) { if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) { types.Add(type); // constructors foreach (ConstructorInfo info in type.GetConstructors()) { foreach (ParameterInfo paramInfo in info.GetParameters()) { FindTypes(paramInfo.ParameterType, types, namespaceStart); } } // interfaces foreach (Type t in type.GetInterfaces()) { FindTypes(t, types, namespaceStart); } // events foreach (EventInfo info in type.GetEvents()) { FindTypes(info.EventHandlerType, types, namespaceStart); FindTypes(info.DeclaringType, types, namespaceStart); } // properties foreach (PropertyInfo info in type.GetProperties()) { FindTypes(info.PropertyType, types, namespaceStart); } // fields foreach (FieldInfo info in type.GetFields()) { FindTypes(info.FieldType, types, namespaceStart); } // methods foreach (MethodInfo info in type.GetMethods()) { foreach (ParameterInfo paramInfo in info.GetParameters()) { FindTypes(paramInfo.ParameterType, types, namespaceStart); } FindTypes(info.ReturnType, types, namespaceStart); } } } #endregion #region Eventhandler public event EventHandler ExecutionTimeChanged; private void OnExecutionTimeChanged() { EventHandler handler = ExecutionTimeChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ExecutionStateChanged; private void OnExecutionStateChanged() { LogMessage("ExecutionState changed to " + executionState.ToString()); EventHandler handler = ExecutionStateChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler> ExceptionOccurred; public event EventHandler Started; private void OnStarted() { LogMessage("Started"); timer.Start(); EventHandler handler = Started; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Stopped; private void OnStopped() { timer.Stop(); LogMessage("Stopped"); EventHandler handler = Stopped; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Paused; private void OnPaused() { timer.Stop(); LogMessage("Paused"); EventHandler handler = Paused; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Prepared; protected virtual void OnPrepared() { LogMessage("Prepared"); EventHandler handler = Prepared; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ResourceIdsChanged; protected virtual void OnResourceIdsChanged() { EventHandler handler = ResourceIdsChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ExperimentChanged; protected virtual void OnExperimentChanged() { LogMessage("Experiment changed"); EventHandler handler = ExperimentChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ServerUrlChanged; protected virtual void OnServerUrlChanged() { EventHandler handler = ServerUrlChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler IsResultsPollingChanged; private void OnIsPollingResultsChanged() { if (this.IsPollingResults) { LogMessage("Results Polling Started"); timer.Start(); } else { LogMessage("Results Polling Stopped"); timer.Stop(); } EventHandler handler = IsResultsPollingChanged; if (handler != null) handler(this, EventArgs.Empty); } #endregion } }