#region License Information /* HeuristicLab * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Threading; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Hive.Contracts; using HeuristicLab.Hive.Contracts.BusinessObjects; using HeuristicLab.Hive.Contracts.Interfaces; using HeuristicLab.Hive.Contracts.ResponseObjects; using HeuristicLab.Hive.Experiment.Jobs; using HeuristicLab.Hive.Tracing; namespace HeuristicLab.Hive.Experiment { /// /// An experiment which contains multiple batch runs of algorithms. /// [Item(itemName, itemDescription)] public class HiveExperiment : NamedItem, IExecutable, IProgressReporter { private object locker = new object(); private const string itemName = "Hive Experiment"; private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive."; private System.Timers.Timer timer; private JobResultPoller jobResultPoller; private Guid? rootJobId; private DateTime lastUpdateTime; #region Properties private Guid hiveExperimentId; public Guid HiveExperimentId { get { return hiveExperimentId; } set { hiveExperimentId = value; } } private HiveJob hiveJob; public HiveJob HiveJob { get { return hiveJob; } set { DeregisterHiveJobEvents(); if (hiveJob != value) { hiveJob = value; RegisterHiveJobEvents(); OnHiveJobChanged(); } } } private ILog log; public ILog Log { get { return log; } } private string resourceIds; public string ResourceIds { get { return resourceIds; } set { if (resourceIds != value) { resourceIds = value; OnResourceIdsChanged(); } } } private bool isPollingResults; public bool IsPollingResults { get { return isPollingResults; } private set { if (isPollingResults != value) { isPollingResults = value; OnIsPollingResultsChanged(); } } } private bool isProgressing; public bool IsProgressing { get { return isProgressing; } set { if (isProgressing != value) { isProgressing = value; OnIsProgressingChanged(); } } } private IProgress progress; public IProgress Progress { get { return progress; } } #endregion public HiveExperiment() : base(itemName, itemDescription) { this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds; this.log = new Log(); InitTimer(); } public HiveExperiment(HiveExperimentDto hiveExperimentDto) : this() { UpdateFromDto(hiveExperimentDto); } protected HiveExperiment(HiveExperiment original, Cloner cloner) : base(original, cloner) { this.ResourceIds = original.resourceIds; this.ExecutionState = original.executionState; this.ExecutionTime = original.executionTime; this.log = cloner.Clone(log); this.lastUpdateTime = original.lastUpdateTime; this.rootJobId = original.rootJobId; } public override IDeepCloneable Clone(Cloner cloner) { return new HiveExperiment(this, cloner); } public void UpdateFromDto(HiveExperimentDto hiveExperimentDto) { this.HiveExperimentId = hiveExperimentDto.Id; this.Name = hiveExperimentDto.Name; this.Description = hiveExperimentDto.Description; this.ResourceIds = hiveExperimentDto.ResourceIds; this.rootJobId = hiveExperimentDto.RootJobId; } public HiveExperimentDto ToHiveExperimentDto() { return new HiveExperimentDto() { Id = this.HiveExperimentId, Name = this.Name, Description = this.Description, ResourceIds = this.ResourceIds, RootJobId = this.rootJobId }; } public void SetExperiment(Optimization.Experiment experiment) { this.HiveJob = new HiveJob(experiment); Prepare(); } private void RegisterHiveJobEvents() { if (HiveJob != null) { HiveJob.JobStateChanged += new EventHandler(HiveJob_JobStateChanged); } } private void DeregisterHiveJobEvents() { if (HiveJob != null) { HiveJob.JobStateChanged -= new EventHandler(HiveJob_JobStateChanged); } } /// /// Returns the experiment from the root HiveJob /// public Optimization.Experiment GetExperiment() { if (this.HiveJob != null) { return HiveJob.Job.OptimizerAsExperiment; } return null; } #region IExecutable Members private Core.ExecutionState executionState; public ExecutionState ExecutionState { get { return executionState; } private set { if (executionState != value) { executionState = value; OnExecutionStateChanged(); } } } private TimeSpan executionTime; public TimeSpan ExecutionTime { get { return executionTime; } private set { if (executionTime != value) { executionTime = value; OnExecutionTimeChanged(); } } } public void Pause() { throw new NotSupportedException(); } public void Prepare() { // do nothing } public void Start() { OnStarted(); ExecutionTime = new TimeSpan(); lastUpdateTime = DateTime.Now; this.ExecutionState = Core.ExecutionState.Started; Thread t = new Thread(RunUploadExperiment); t.Name = "RunUploadExperimentThread"; t.Start(); } private void RunUploadExperiment() { try { this.progress = new Progress("Connecting to server..."); IsProgressing = true; using (Disposable service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { IEnumerable groups = ToResourceIdList(this.ResourceIds); this.HiveJob.SetIndexInParentOptimizerList(null); int totalJobCount = this.HiveJob.GetAllHiveJobs().Count(); int jobCount = 0; this.progress.Status = "Uploading jobs..."; UploadJobWithChildren(service.Obj, this.HiveJob, null, groups, ref jobCount, totalJobCount); this.rootJobId = this.HiveJob.JobDto.Id; LogMessage("Finished sending jobs to hive"); // insert or update HiveExperiment this.progress.Status = "Uploading HiveExperiment..."; ResponseObject resp = service.Obj.UpdateHiveExperiment(this.ToHiveExperimentDto()); this.UpdateFromDto(resp.Obj); StartResultPolling(); } } catch (Exception e) { OnExceptionOccured(e); } finally { IsProgressing = false; } } /// /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs /// /// /// /// /// shall be null if its the root job /// private void UploadJobWithChildren(IClientFacade service, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable groups, ref int jobCount, int totalJobCount) { jobCount++; this.progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount); SerializedJob serializedJob; if (hiveJob.Job.ComputeInParallel && (hiveJob.Job.Optimizer is Optimization.Experiment || hiveJob.Job.Optimizer is Optimization.BatchRun)) { hiveJob.JobDto.State = JobState.WaitForChildJobs; hiveJob.Job.CollectChildJobs = false; // don't collect child-jobs on slaves serializedJob = hiveJob.GetAsSerializedJob(true); } else { serializedJob = hiveJob.GetAsSerializedJob(false); } this.progress.Status = string.Format("Uploading job {0} of {1} ({2} kb)", jobCount, totalJobCount, serializedJob.SerializedJobData.Count() / 1024); this.progress.ProgressValue = (double)jobCount / totalJobCount; ResponseObject response; if (parentHiveJob != null) { response = service.AddChildJob(parentHiveJob.JobDto.Id, serializedJob); } else { response = service.AddJobWithGroupStrings(serializedJob, groups); } if (response.StatusMessage == ResponseStatus.Ok) { LogMessage(response.Obj.Id, "Job sent to Hive"); hiveJob.JobDto = response.Obj; foreach (HiveJob child in hiveJob.ChildHiveJobs) { UploadJobWithChildren(service, child, hiveJob, groups, ref jobCount, totalJobCount); } } else { throw new AddJobToHiveException(response.StatusMessage.ToString()); } } /// /// Converts a string which can contain Ids separated by ';' to a enumerable /// private IEnumerable ToResourceIdList(string resourceGroups) { if (!string.IsNullOrEmpty(resourceGroups)) { return resourceIds.Split(';'); } else { return new List(); } } public void Stop() { using (Disposable service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { foreach (HiveJob hj in HiveJob.GetAllHiveJobs()) { service.Obj.AbortJob(hj.JobDto.Id); } } } #endregion public void StartResultPolling() { if (!jobResultPoller.IsPolling) { jobResultPoller.Start(); } else { throw new JobResultPollingException("Result polling already running"); } } public void StopResultPolling() { if (jobResultPoller.IsPolling) { jobResultPoller.Stop(); } else { throw new JobResultPollingException("Result polling not running"); } } #region HiveJob Events void HiveJob_JobStateChanged(object sender, EventArgs e) { if (HiveJob != null) { rootJobId = HiveJob.JobDto.Id; } } #endregion #region Eventhandler public event EventHandler ExecutionTimeChanged; private void OnExecutionTimeChanged() { EventHandler handler = ExecutionTimeChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ExecutionStateChanged; private void OnExecutionStateChanged() { LogMessage("ExecutionState changed to " + executionState.ToString()); EventHandler handler = ExecutionStateChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Started; private void OnStarted() { LogMessage("Started"); timer.Start(); EventHandler handler = Started; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Stopped; private void OnStopped() { LogMessage("Stopped"); timer.Stop(); EventHandler handler = Stopped; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Paused; private void OnPaused() { LogMessage("Paused"); EventHandler handler = Paused; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler Prepared; protected virtual void OnPrepared() { LogMessage("Prepared"); EventHandler handler = Prepared; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler ResourceIdsChanged; protected virtual void OnResourceIdsChanged() { EventHandler handler = ResourceIdsChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler IsResultsPollingChanged; private void OnIsPollingResultsChanged() { if (this.IsPollingResults) { LogMessage("Results Polling Started"); } else { LogMessage("Results Polling Stopped"); } EventHandler handler = IsResultsPollingChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler> ExceptionOccurred; private void OnExceptionOccured(Exception e) { var handler = ExceptionOccurred; if (handler != null) handler(this, new EventArgs(e)); } public event EventHandler HiveJobChanged; private void OnHiveJobChanged() { if (jobResultPoller != null && jobResultPoller.IsPolling) { jobResultPoller.Stop(); DeregisterResultPollingEvents(); } if (HiveJob != null) { jobResultPoller = new JobResultPoller(HiveJob, ApplicationConstants.RESULT_POLLING_INTERVAL); RegisterResultPollingEvents(); } EventHandler handler = HiveJobChanged; if (handler != null) handler(this, EventArgs.Empty); } public event EventHandler IsProgressingChanged; private void OnIsProgressingChanged() { var handler = IsProgressingChanged; if (handler != null) handler(this, EventArgs.Empty); } #endregion #region JobResultPoller Events private void RegisterResultPollingEvents() { jobResultPoller.ExceptionOccured += new EventHandler>(jobResultPoller_ExceptionOccured); jobResultPoller.JobResultsReceived += new EventHandler>(jobResultPoller_JobResultReceived); jobResultPoller.PollingStarted += new EventHandler(jobResultPoller_PollingStarted); jobResultPoller.PollingFinished += new EventHandler(jobResultPoller_PollingFinished); jobResultPoller.IsPollingChanged += new EventHandler(jobResultPoller_IsPollingChanged); } private void DeregisterResultPollingEvents() { jobResultPoller.ExceptionOccured -= new EventHandler>(jobResultPoller_ExceptionOccured); jobResultPoller.JobResultsReceived -= new EventHandler>(jobResultPoller_JobResultReceived); jobResultPoller.PollingStarted -= new EventHandler(jobResultPoller_PollingStarted); jobResultPoller.PollingFinished -= new EventHandler(jobResultPoller_PollingFinished); jobResultPoller.IsPollingChanged -= new EventHandler(jobResultPoller_IsPollingChanged); } void jobResultPoller_IsPollingChanged(object sender, EventArgs e) { this.IsPollingResults = jobResultPoller.IsPolling; } void jobResultPoller_PollingFinished(object sender, EventArgs e) { LogMessage("Polling results finished"); } void jobResultPoller_PollingStarted(object sender, EventArgs e) { LogMessage("Polling results started"); } void jobResultPoller_JobResultReceived(object sender, EventArgs e) { foreach (JobResult jobResult in e.Value) { HiveJob hj = hiveJob.GetHiveJobByJobId(jobResult.Id); if (hj != null) { hj.UpdateFromJobResult(jobResult); if ((hj.JobDto.State == JobState.Aborted || hj.JobDto.State == JobState.Failed || hj.JobDto.State == JobState.Finished) && !hj.IsFinishedOptimizerDownloaded) { LogMessage(hj.JobDto.Id, "Downloading optimizer for job"); OptimizerJob optimizerJob = LoadOptimizerJob(hj.JobDto.Id); if (jobResult.ParentJobId.HasValue) { HiveJob parentHiveJob = HiveJob.GetHiveJobByJobId(jobResult.ParentJobId.Value); parentHiveJob.UpdateChildOptimizer(optimizerJob, hj.JobDto.Id); } else { this.HiveJob.IsFinishedOptimizerDownloaded = true; } } } } GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory) if (AllJobsFinished()) { this.ExecutionState = Core.ExecutionState.Stopped; StopResultPolling(); OnStopped(); } } private bool AllJobsFinished() { return HiveJob.GetAllHiveJobs().All(hj => hj.IsFinishedOptimizerDownloaded); } void jobResultPoller_ExceptionOccured(object sender, EventArgs e) { OnExceptionOccured(e.Value); } #endregion #region Execution Time Timer private void InitTimer() { timer = new System.Timers.Timer(100); timer.AutoReset = true; timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed); } private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { DateTime now = DateTime.Now; ExecutionTime += now - lastUpdateTime; lastUpdateTime = now; } #endregion #region Logging private void LogMessage(string message) { // HeuristicLab.Log is not Thread-Safe, so lock on every call lock (locker) { log.LogMessage(message); Logger.Debug(message); } } private void LogMessage(Guid jobId, string message) { LogMessage(message + " (jobId: " + jobId + ")"); } #endregion /// /// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem /// public void LoadHiveJob() { progress = new Progress(); try { IsProgressing = true; int totalJobCount = 0; int jobCount = 0; progress.Status = "Connecting to Server..."; using (Disposable service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { // fetch all JobDto objects to create the full tree of tree of HiveJob objects progress.Status = "Downloading list of jobs..."; JobResultList allResults = service.Obj.GetChildJobResults(rootJobId.Value, true, true).Obj; totalJobCount = allResults.Count; // download them first IDictionary allSerializedJobs = new Dictionary(); foreach (JobResult jobResult in allResults) { jobCount++; progress.Status = string.Format("Downloading {0} of {1} jobs...", jobCount, totalJobCount); allSerializedJobs.Add(jobResult.Id, service.Obj.GetLastSerializedResult(jobResult.Id).Obj); progress.ProgressValue = (double)jobCount / totalJobCount; } jobCount = 1; progress.Status = string.Format("Deserializing {0} of {1} jobs... ({2} kb)", jobCount, totalJobCount, allSerializedJobs[this.rootJobId.Value].SerializedJobData.Count() / 1024); this.HiveJob = new HiveJob(allSerializedJobs[this.rootJobId.Value], false); allSerializedJobs.Remove(this.rootJobId.Value); // reduce memory footprint progress.ProgressValue = (double)jobCount / totalJobCount; if (this.HiveJob.JobDto.DateFinished.HasValue) { this.ExecutionTime = this.HiveJob.JobDto.DateFinished.Value - this.HiveJob.JobDto.DateCreated.Value; this.lastUpdateTime = this.HiveJob.JobDto.DateFinished.Value; this.ExecutionState = Core.ExecutionState.Stopped; OnStopped(); } else { this.ExecutionTime = DateTime.Now - this.HiveJob.JobDto.DateCreated.Value; this.lastUpdateTime = DateTime.Now; this.ExecutionState = Core.ExecutionState.Started; OnStarted(); } // build child-job tree LoadChildResults(service.Obj, this.HiveJob, allResults, allSerializedJobs, progress, totalJobCount, ref jobCount); StartResultPolling(); } } catch (Exception e) { OnExceptionOccured(e); } finally { IsProgressing = false; } } private void LoadChildResults(IClientFacade service, HiveJob parentHiveJob, JobResultList allResults, IDictionary allSerializedJobs, IProgress progress, int totalJobCount, ref int jobCount) { IEnumerable childResults = from result in allResults where result.ParentJobId.HasValue && result.ParentJobId.Value == parentHiveJob.JobDto.Id orderby result.DateCreated ascending select result; foreach (JobResult jobResult in childResults) { jobCount++; progress.Status = string.Format("Deserializing {0} of {1} jobs ({2} kb)...", jobCount, totalJobCount, allSerializedJobs[jobResult.Id].SerializedJobData.Count() / 1024); OptimizerJob optimizerJob = SerializedJob.Deserialize(allSerializedJobs[jobResult.Id].SerializedJobData); progress.ProgressValue = (double)jobCount / totalJobCount; HiveJob childHiveJob = new HiveJob(optimizerJob, false); parentHiveJob.AddChildHiveJob(childHiveJob); childHiveJob.JobDto = allSerializedJobs[jobResult.Id].JobInfo; allSerializedJobs.Remove(jobResult.Id); // reduce memory footprint if (jobCount % 10 == 0) GC.Collect(); // this is needed or otherwise HL takes over the system when the number of jobs is high LoadChildResults(service, childHiveJob, allResults, allSerializedJobs, progress, totalJobCount, ref jobCount); } } private OptimizerJob LoadOptimizerJob(Guid jobId) { using (Disposable service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { ResponseObject serializedJob = service.Obj.GetLastSerializedResult(jobId); return SerializedJob.Deserialize(serializedJob.Obj.SerializedJobData); } } } }