#region License Information /* HeuristicLab * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Threading; using HeuristicLab.Clients.Hive.Jobs; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Optimization; using HeuristicLab.Services.Hive.Common; using HeuristicLab.Services.Hive.Common.DataTransfer; using HeuristicLab.Services.Hive.Common.ServiceContracts; using HeuristicLab.Tracing; namespace HeuristicLab.Clients.Hive { using System.Configuration; using System.IO; using HeuristicLab.PluginInfrastructure; using DT = HeuristicLab.Services.Hive.Common.DataTransfer; /// /// An experiment which contains multiple batch runs of algorithms. /// [Item(itemName, itemDescription)] public class HiveExperimentClient : NamedItem, IExecutable, IProgressReporter { private object locker = new object(); private const string itemName = "Hive Experiment"; private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive."; private System.Timers.Timer timer; private DateTime lastUpdateTime; private Guid rootJobId; private JobResultPoller jobResultPoller; private Guid hiveExperimentId; public Guid HiveExperimentId { get { return hiveExperimentId; } set { hiveExperimentId = value; } } private HiveJob hiveJob; public HiveJob HiveJob { get { return hiveJob; } set { DeregisterHiveJobEvents(); if (hiveJob != value) { hiveJob = value; RegisterHiveJobEvents(); OnHiveJobChanged(); } } } private ILog log; public ILog Log { get { return log; } } private string resourceNames; public string ResourceNames { get { return resourceNames; } set { if (resourceNames != value) { resourceNames = value; OnResourceNamesChanged(); } } } private bool isPollingResults; public bool IsPollingResults { get { return isPollingResults; } private set { if (isPollingResults != value) { isPollingResults = value; OnIsPollingResultsChanged(); } } } private bool isProgressing; public bool IsProgressing { get { return isProgressing; } set { if (isProgressing != value) { isProgressing = value; OnIsProgressingChanged(); } } } private IProgress progress; public IProgress Progress { get { return progress; } } private IEnumerable onlinePlugins; public IEnumerable OnlinePlugins { get { return onlinePlugins; } set { onlinePlugins = value; } } private List alreadyUploadedPlugins; public List AlreadyUploadedPlugins { get { return alreadyUploadedPlugins; } set { alreadyUploadedPlugins = value; } } private bool useLocalPlugins; public bool UseLocalPlugins { get { return useLocalPlugins; } set { useLocalPlugins = value; } } public HiveExperimentClient() : base(itemName, itemDescription) { this.ResourceNames = "HEAL"; this.log = new Log(); InitTimer(); } public HiveExperimentClient(DT.HiveExperiment hiveExperimentDto) : this() { UpdateFromDto(hiveExperimentDto); } protected HiveExperimentClient(HiveExperimentClient original, Cloner cloner) : base(original, cloner) { this.ResourceNames = original.resourceNames; this.ExecutionState = original.executionState; this.ExecutionTime = original.executionTime; this.log = cloner.Clone(original.log); this.lastUpdateTime = original.lastUpdateTime; this.rootJobId = original.rootJobId; } public override IDeepCloneable Clone(Cloner cloner) { return new HiveExperimentClient(this, cloner); } public void UpdateFromDto(DT.HiveExperiment hiveExperimentDto) { this.HiveExperimentId = hiveExperimentDto.Id; this.Name = hiveExperimentDto.Name; this.Description = hiveExperimentDto.Description; this.ResourceNames = hiveExperimentDto.ResourceNames; this.rootJobId = hiveExperimentDto.RootJobId; } public DT.HiveExperiment ToHiveExperimentDto() { return new DT.HiveExperiment() { Id = this.HiveExperimentId, Name = this.Name, Description = this.Description, ResourceNames = this.ResourceNames, RootJobId = this.rootJobId }; } public void SetExperiment(Experiment experiment) { this.HiveJob = new HiveJob(experiment); Prepare(); } private void RegisterHiveJobEvents() { if (HiveJob != null) { HiveJob.JobStateChanged += new EventHandler(HiveJob_JobStateChanged); } } private void DeregisterHiveJobEvents() { if (HiveJob != null) { HiveJob.JobStateChanged -= new EventHandler(HiveJob_JobStateChanged); } } /// /// Returns the experiment from the root HiveJob /// public Experiment GetExperiment() { if (this.HiveJob != null) { return HiveJob.OptimizerJob.OptimizerAsExperiment; } return null; } #region IExecutable Members private ExecutionState executionState; public ExecutionState ExecutionState { get { return executionState; } private set { if (executionState != value) { executionState = value; OnExecutionStateChanged(); } } } private TimeSpan executionTime; public TimeSpan ExecutionTime { get { return executionTime; } private set { if (executionTime != value) { executionTime = value; OnExecutionTimeChanged(); } } } public void Pause() { throw new NotSupportedException(); } public void Prepare() { this.timer.Stop(); this.ExecutionState = Core.ExecutionState.Prepared; this.ExecutionTime = TimeSpan.Zero; } public void Start() { OnStarted(); ExecutionTime = TimeSpan.Zero; lastUpdateTime = DateTime.Now; this.ExecutionState = Core.ExecutionState.Started; Thread t = new Thread(RunUploadExperiment); t.Name = "RunUploadExperimentThread"; t.Start(); } private void RunUploadExperiment() { try { this.progress = new Progress("Connecting to server..."); IsProgressing = true; ServiceLocator.Instance.CallHiveService(service => { IEnumerable resourceNames = ToResourceNameList(this.ResourceNames); var resourceIds = new List(); foreach (var resourceName in resourceNames) { Guid resourceId = service.GetResourceId(resourceName); if (resourceId == Guid.Empty) { throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName)); } resourceIds.Add(resourceId); } this.HiveJob.SetIndexInParentOptimizerList(null); int totalJobCount = this.HiveJob.GetAllHiveJobs().Count(); int jobCount = 0; this.progress.Status = "Uploading plugins..."; this.OnlinePlugins = service.GetPlugins(); this.AlreadyUploadedPlugins = new List(); Plugin configFilePlugin = UploadConfigurationFile(service); this.alreadyUploadedPlugins.Add(configFilePlugin); this.progress.Status = "Uploading jobs..."; UploadJobWithChildren(service, this.HiveJob, null, resourceIds, ref jobCount, totalJobCount, configFilePlugin.Id); this.rootJobId = this.HiveJob.Job.Id; LogMessage("Finished sending jobs to hive"); // insert or update HiveExperiment this.progress.Status = "Uploading HiveExperiment..."; DT.HiveExperiment he = service.GetHiveExperiment(service.AddHiveExperiment(this.ToHiveExperimentDto())); this.UpdateFromDto(he); StartResultPolling(); }); } catch (Exception e) { OnExceptionOccured(e); this.Prepare(); } finally { IsProgressing = false; } } /// /// Uploads the local configuration file as plugin /// private static Plugin UploadConfigurationFile(IHiveService service) { string exeFilePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "HeuristicLab 3.3.exe"); string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath); Plugin configPlugin = new Plugin() { Name = "Configuration", IsLocal = true, Version = new Version() }; PluginData configFile = new PluginData() { FileName = configFileName, Data = File.ReadAllBytes(configFileName) }; configPlugin.Id = service.AddPlugin(configPlugin, new List { configFile }); return configPlugin; } /// /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs /// /// /// /// shall be null if its the root job /// private void UploadJobWithChildren(IHiveService service, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable groups, ref int jobCount, int totalJobCount, Guid configPluginId) { jobCount++; this.progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount); JobData jobData; List plugins; if (hiveJob.OptimizerJob.ComputeInParallel && (hiveJob.OptimizerJob.Optimizer is Optimization.Experiment || hiveJob.OptimizerJob.Optimizer is Optimization.BatchRun)) { hiveJob.Job.IsParentJob = true; hiveJob.Job.FinishWhenChildJobsFinished = true; hiveJob.OptimizerJob.CollectChildJobs = false; // don't collect child-jobs on slaves jobData = hiveJob.GetAsJobData(true, out plugins); } else { hiveJob.Job.IsParentJob = false; hiveJob.Job.FinishWhenChildJobsFinished = false; jobData = hiveJob.GetAsJobData(false, out plugins); } hiveJob.Job.PluginsNeededIds = GetPluginDependencies(service, onlinePlugins, alreadyUploadedPlugins, plugins, useLocalPlugins); hiveJob.Job.PluginsNeededIds.Add(configPluginId); this.progress.Status = string.Format("Uploading job {0} of {1} ({2} kb)", jobCount, totalJobCount, jobData.Data.Count() / 1024); this.progress.ProgressValue = (double)jobCount / totalJobCount; hiveJob.Job.SetState(JobState.Transferring); if (parentHiveJob != null) { hiveJob.Job.Id = service.AddChildJob(parentHiveJob.Job.Id, hiveJob.Job, jobData); } else { hiveJob.Job.Id = service.AddJob(hiveJob.Job, jobData, groups); } LogMessage(hiveJob.Job.Id, "Job sent to Hive"); foreach (HiveJob child in hiveJob.ChildHiveJobs) { UploadJobWithChildren(service, child, hiveJob, groups, ref jobCount, totalJobCount, configPluginId); } } /// /// Converts a string which can contain Ids separated by ';' to a enumerable /// private IEnumerable ToResourceNameList(string resourceGroups) { if (!string.IsNullOrEmpty(resourceGroups)) { return resourceNames.Split(';'); } else { return new List(); } } public void Stop() { ServiceLocator.Instance.CallHiveService(service => { foreach (HiveJob hj in HiveJob.GetAllHiveJobs()) { service.StopJob(hj.Job.Id); } }); } #endregion #region HiveJob Events void HiveJob_JobStateChanged(object sender, EventArgs e) { if (HiveJob != null) { rootJobId = HiveJob.Job.Id; } } #endregion #region Eventhandler public event EventHandler ExecutionTimeChanged; private void OnExecutionTimeChanged() { EventHandler handler = ExecutionTimeChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ExecutionStateChanged; private void OnExecutionStateChanged() { LogMessage("ExecutionState changed to " + executionState.ToString()); EventHandler handler = ExecutionStateChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Started; private void OnStarted() { LogMessage("Started"); timer.Start(); EventHandler handler = Started; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Stopped; private void OnStopped() { LogMessage("Stopped"); timer.Stop(); EventHandler handler = Stopped; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Paused; private void OnPaused() { LogMessage("Paused"); EventHandler handler = Paused; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Prepared; protected virtual void OnPrepared() { LogMessage("Prepared"); EventHandler handler = Prepared; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ResourceNamesChanged; protected virtual void OnResourceNamesChanged() { EventHandler handler = ResourceNamesChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler IsResultsPollingChanged; private void OnIsPollingResultsChanged() { if (this.IsPollingResults) { LogMessage("Results Polling Started"); } else { LogMessage("Results Polling Stopped"); } EventHandler handler = IsResultsPollingChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler> ExceptionOccurred; private void OnExceptionOccured(Exception e) { var handler = ExceptionOccurred; if (handler != null) handler(this, new EventArgs(e)); } public event EventHandler HiveJobChanged; private void OnHiveJobChanged() { if (jobResultPoller != null && jobResultPoller.IsPolling) { jobResultPoller.Stop(); DeregisterResultPollingEvents(); } if (HiveJob != null) { jobResultPoller = new JobResultPoller(HiveJob, ApplicationConstants.ResultPollingInterval); RegisterResultPollingEvents(); } EventHandler handler = HiveJobChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler IsProgressingChanged; private void OnIsProgressingChanged() { var handler = IsProgressingChanged; if (handler != null) handler(this, EventArgs.Empty); } #endregion #region JobResultPoller Events public void StartResultPolling() { if (!jobResultPoller.IsPolling) { jobResultPoller.Start(); } else { throw new JobResultPollingException("Result polling already running"); } } public void StopResultPolling() { if (jobResultPoller.IsPolling) { jobResultPoller.Stop(); } else { throw new JobResultPollingException("Result polling not running"); } } private void RegisterResultPollingEvents() { jobResultPoller.ExceptionOccured += new EventHandler>(jobResultPoller_ExceptionOccured); jobResultPoller.JobResultsReceived += new EventHandler>>(jobResultPoller_JobResultReceived); jobResultPoller.PollingStarted += new EventHandler(jobResultPoller_PollingStarted); jobResultPoller.PollingFinished += new EventHandler(jobResultPoller_PollingFinished); jobResultPoller.IsPollingChanged += new EventHandler(jobResultPoller_IsPollingChanged); } private void DeregisterResultPollingEvents() { jobResultPoller.ExceptionOccured -= new EventHandler>(jobResultPoller_ExceptionOccured); jobResultPoller.JobResultsReceived -= new EventHandler>>(jobResultPoller_JobResultReceived); jobResultPoller.PollingStarted -= new EventHandler(jobResultPoller_PollingStarted); jobResultPoller.PollingFinished -= new EventHandler(jobResultPoller_PollingFinished); jobResultPoller.IsPollingChanged -= new EventHandler(jobResultPoller_IsPollingChanged); } private void jobResultPoller_IsPollingChanged(object sender, EventArgs e) { this.IsPollingResults = jobResultPoller.IsPolling; } private void jobResultPoller_PollingFinished(object sender, EventArgs e) { LogMessage("Polling results finished"); } private void jobResultPoller_PollingStarted(object sender, EventArgs e) { LogMessage("Polling results started"); } private void jobResultPoller_JobResultReceived(object sender, EventArgs> e) { foreach (LightweightJob lightweightJob in e.Value) { HiveJob hj = hiveJob.GetHiveJobByJobId(lightweightJob.Id); if (hj != null) { hj.UpdateFromLightweightJob(lightweightJob); if ((hj.Job.State == JobState.Aborted || hj.Job.State == JobState.Failed || hj.Job.State == JobState.Finished) && !hj.IsFinishedOptimizerDownloaded) { LogMessage(hj.Job.Id, "Downloading optimizer for job"); OptimizerJob optimizerJob = LoadOptimizerJob(hj.Job.Id); if (optimizerJob == null) { // something bad happened to this job. set to finished to allow the rest beeing downloaded hj.IsFinishedOptimizerDownloaded = true; } else { if (lightweightJob.ParentJobId.HasValue) { HiveJob parentHiveJob = HiveJob.GetHiveJobByJobId(lightweightJob.ParentJobId.Value); parentHiveJob.UpdateChildOptimizer(optimizerJob, hj.Job.Id); } else { this.HiveJob.IsFinishedOptimizerDownloaded = true; } } } } } GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory) if (AllJobsFinished()) { this.ExecutionState = Core.ExecutionState.Stopped; StopResultPolling(); OnStopped(); } } private bool AllJobsFinished() { return HiveJob.GetAllHiveJobs().All(hj => hj.IsFinishedOptimizerDownloaded); } private void jobResultPoller_ExceptionOccured(object sender, EventArgs e) { OnExceptionOccured(e.Value); } #endregion #region Execution Time Timer private void InitTimer() { timer = new System.Timers.Timer(100); timer.AutoReset = true; timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed); } private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { DateTime now = DateTime.Now; ExecutionTime += now - lastUpdateTime; lastUpdateTime = now; } #endregion #region Logging private void LogMessage(string message) { // HeuristicLab.Log is not Thread-Safe, so lock on every call lock (locker) { log.LogMessage(message); Logger.Debug(message); } } private void LogMessage(Guid jobId, string message) { //GetJobItemById(jobId).LogMessage(message); LogMessage(message + " (jobId: " + jobId + ")"); } #endregion #region Job Loading /// /// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem /// public void LoadHiveJob() { progress = new Progress(); try { IsProgressing = true; int totalJobCount = 0; IEnumerable allJobs; progress.Status = "Connecting to Server..."; // fetch all Job objects to create the full tree of tree of HiveJob objects progress.Status = "Downloading list of jobs..."; allJobs = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightChildJobs(rootJobId, true, true)); totalJobCount = allJobs.Count(); HiveJobDownloader downloader = new HiveJobDownloader(allJobs.Select(x => x.Id)); downloader.StartAsync(); while (!downloader.IsFinished) { progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount; progress.Status = string.Format("Downloading/deserializing jobs... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount); Thread.Sleep(500); } IDictionary allHiveJobs = downloader.Results; this.HiveJob = allHiveJobs[this.rootJobId]; //// download them first //IDictionary allJobs = new Dictionary(); //IDictionary allJobDatas = new Dictionary(); //foreach (LightweightJob lightweightJob in allResults) { // jobCount++; // progress.Status = string.Format("Downloading {0} of {1} jobs...", jobCount, totalJobCount); // allJobs.Add(lightweightJob.Id, service.GetJob(lightweightJob.Id)); // allJobDatas.Add(lightweightJob.Id, service.GetJobData(lightweightJob.Id)); // progress.ProgressValue = (double)jobCount / totalJobCount; //} //jobCount = 1; //progress.Status = string.Format("Deserializing {0} of {1} jobs... ({2} kb)", jobCount, totalJobCount, allJobDatas[this.rootJobId].Data.Count() / 1024); //this.HiveJob = new HiveJob(allJobs[this.rootJobId], allJobDatas[this.rootJobId], false); //allJobDatas.Remove(this.rootJobId); // reduce memory footprint //allJobs.Remove(this.rootJobId); //progress.ProgressValue = (double)jobCount / totalJobCount; if (this.HiveJob.Job.DateFinished.HasValue) { this.ExecutionTime = this.HiveJob.Job.DateFinished.Value - this.HiveJob.Job.DateCreated; this.lastUpdateTime = this.HiveJob.Job.DateFinished.Value; this.ExecutionState = Core.ExecutionState.Stopped; OnStopped(); } else { this.ExecutionTime = DateTime.Now - this.HiveJob.Job.DateCreated; this.lastUpdateTime = DateTime.Now; this.ExecutionState = Core.ExecutionState.Started; OnStarted(); } // build child-job tree //LoadChildResults(service, this.HiveJob, allResults, allJobs, allJobDatas, progress, totalJobCount, ref jobCount); BuildHiveJobTree(this.HiveJob, allJobs, allHiveJobs); StartResultPolling(); } catch (Exception e) { OnExceptionOccured(e); } finally { IsProgressing = false; } } private void BuildHiveJobTree(HiveJob parentHiveJob, IEnumerable allJobs, IDictionary allHiveJobs) { IEnumerable childJobs = from job in allJobs where job.ParentJobId.HasValue && job.ParentJobId.Value == parentHiveJob.Job.Id orderby job.DateCreated ascending select job; foreach (LightweightJob job in childJobs) { HiveJob childHiveJob = allHiveJobs[job.Id]; parentHiveJob.AddChildHiveJob(childHiveJob); BuildHiveJobTree(childHiveJob, allJobs, allHiveJobs); } } private OptimizerJob LoadOptimizerJob(Guid jobId) { JobData jobData = ServiceLocator.Instance.CallHiveService(s => s.GetJobData(jobId)); try { return PersistenceUtil.Deserialize(jobData.Data); } catch { return null; } } #endregion #region Plugin Management /// /// Checks if plugins are available on Hive Server. If not they are uploaded. Ids are returned. /// /// An active service-proxy /// List of plugins which are available online /// List of plugins which have been uploaded from this HiveExperiment /// List of plugins which need to be uploaded /// If true, the plugins which are already online are ignored. All local plugins are uploaded, but only once. /// private static List GetPluginDependencies(IHiveService service, IEnumerable onlinePlugins, List alreadyUploadedPlugins, IEnumerable neededPlugins, bool useLocalPlugins) { var pluginIds = new List(); foreach (var neededPlugin in neededPlugins) { Plugin foundPlugin = alreadyUploadedPlugins.SingleOrDefault(p => p.Name == neededPlugin.Name && p.Version == neededPlugin.Version); if (foundPlugin == null) { foundPlugin = onlinePlugins.SingleOrDefault(p => p.Name == neededPlugin.Name && p.Version == neededPlugin.Version); if (useLocalPlugins || foundPlugin == null) { Plugin p = CreatePlugin(neededPlugin, useLocalPlugins); List pd = CreatePluginDatas(neededPlugin); p.Id = service.AddPlugin(p, pd); alreadyUploadedPlugins.Add(p); } else { pluginIds.Add(foundPlugin.Id); } } else { pluginIds.Add(foundPlugin.Id); } } return pluginIds; } private static Plugin CreatePlugin(IPluginDescription plugin, bool useLocalPlugins) { return new Plugin() { Name = plugin.Name, Version = plugin.Version, IsLocal = useLocalPlugins }; } private static List CreatePluginDatas(IPluginDescription plugin) { List pluginDatas = new List(); foreach (IPluginFile pf in plugin.Files) { PluginData pluginData = new PluginData(); pluginData.Data = File.ReadAllBytes(pf.Name); pluginData.FileName = Path.GetFileName(pf.Name); pluginDatas.Add(pluginData); } return pluginDatas; } /// /// Gets the Ids of all plugins needed for executing the job. /// All loaded plugins are assumed to be necessary. /// If a plugin with the same name and version is already online, it is used. Otherwise the local plugin is uploaded. /// If useLocalPlugins is true, all local plugins are uploaded regardless of the existence of the same plugin online. /// //public static List GetPluginsNeededIds(bool useLocalPlugins) { // IEnumerable localPlugins = ApplicationManager.Manager.Plugins; // List pluginsNeededIds = new List(); // using (var service = ServiceLocator.Instance.GetService()) { // IEnumerable onlinePlugins = service.Obj.GetPlugins(); // foreach (IPluginDescription localPlugin in localPlugins) { // Plugin found = onlinePlugins.Where(onlinePlugin => onlinePlugin.Name == localPlugin.Name && onlinePlugin.Version == localPlugin.Version).SingleOrDefault(); // if (!useLocalPlugins && found != null) { // // plugin is available online; reuse // pluginsNeededIds.Add(found.Id); // } else { // // upload the plugin // Plugin p = new Plugin() { Name = localPlugin.Name, Version = localPlugin.Version, IsLocal = useLocalPlugins }; // List pluginDatas = new List(); // foreach (IPluginFile pf in localPlugin.Files) { // PluginData pluginData = new PluginData(); // pluginData.Data = File.ReadAllBytes(pf.Name); // pluginDatas.Add(pluginData); // } // pluginsNeededIds.Add(service.Obj.AddPlugin(p, pluginDatas)); // } // } // } // return pluginsNeededIds; //} #endregion } }