#region License Information
/* HeuristicLab
* Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using HeuristicLab.Common;
using HeuristicLab.Core;
using HeuristicLab.Hive.Contracts;
using HeuristicLab.Hive.Contracts.BusinessObjects;
using HeuristicLab.Hive.Contracts.Interfaces;
using HeuristicLab.Hive.Contracts.ResponseObjects;
using HeuristicLab.Hive.Experiment.Jobs;
using HeuristicLab.Hive.Tracing;
namespace HeuristicLab.Hive.Experiment {
///
/// An experiment which contains multiple batch runs of algorithms.
///
[Item(itemName, itemDescription)]
public class HiveExperiment : NamedItem, IExecutable, IProgressReporter {
private object locker = new object();
private const string itemName = "Hive Experiment";
private const string itemDescription = "A runner for a single experiment, which's algorithms are executed in the Hive.";
private System.Timers.Timer timer;
private JobResultPoller jobResultPoller;
private Guid? rootJobId;
private DateTime lastUpdateTime;
#region Properties
private Guid hiveExperimentId;
public Guid HiveExperimentId {
get { return hiveExperimentId; }
set { hiveExperimentId = value; }
}
private HiveJob hiveJob;
public HiveJob HiveJob {
get { return hiveJob; }
set {
DeregisterHiveJobEvents();
if (hiveJob != value) {
hiveJob = value;
RegisterHiveJobEvents();
OnHiveJobChanged();
}
}
}
private ILog log;
public ILog Log {
get { return log; }
}
private string resourceIds;
public string ResourceIds {
get { return resourceIds; }
set {
if (resourceIds != value) {
resourceIds = value;
OnResourceIdsChanged();
}
}
}
private bool isPollingResults;
public bool IsPollingResults {
get { return isPollingResults; }
private set {
if (isPollingResults != value) {
isPollingResults = value;
OnIsPollingResultsChanged();
}
}
}
private bool isProgressing;
public bool IsProgressing {
get { return isProgressing; }
set {
if (isProgressing != value) {
isProgressing = value;
OnIsProgressingChanged();
}
}
}
private IProgress progress;
public IProgress Progress {
get { return progress; }
}
#endregion
public HiveExperiment()
: base(itemName, itemDescription) {
this.ResourceIds = HeuristicLab.Hive.Experiment.Properties.Settings.Default.ResourceIds;
this.log = new Log();
InitTimer();
}
public HiveExperiment(HiveExperimentDto hiveExperimentDto)
: this() {
UpdateFromDto(hiveExperimentDto);
}
protected HiveExperiment(HiveExperiment original, Cloner cloner)
: base(original, cloner) {
this.ResourceIds = original.resourceIds;
this.ExecutionState = original.executionState;
this.ExecutionTime = original.executionTime;
this.log = cloner.Clone(log);
this.lastUpdateTime = original.lastUpdateTime;
this.rootJobId = original.rootJobId;
}
public override IDeepCloneable Clone(Cloner cloner) {
return new HiveExperiment(this, cloner);
}
public void UpdateFromDto(HiveExperimentDto hiveExperimentDto) {
this.HiveExperimentId = hiveExperimentDto.Id;
this.Name = hiveExperimentDto.Name;
this.Description = hiveExperimentDto.Description;
this.ResourceIds = hiveExperimentDto.ResourceIds;
this.rootJobId = hiveExperimentDto.RootJobId;
}
public HiveExperimentDto ToHiveExperimentDto() {
return new HiveExperimentDto() {
Id = this.HiveExperimentId,
Name = this.Name,
Description = this.Description,
ResourceIds = this.ResourceIds,
RootJobId = this.rootJobId
};
}
public void SetExperiment(Optimization.Experiment experiment) {
this.HiveJob = new HiveJob(experiment);
Prepare();
}
private void RegisterHiveJobEvents() {
if (HiveJob != null) {
HiveJob.JobStateChanged += new EventHandler(HiveJob_JobStateChanged);
}
}
private void DeregisterHiveJobEvents() {
if (HiveJob != null) {
HiveJob.JobStateChanged -= new EventHandler(HiveJob_JobStateChanged);
}
}
///
/// Returns the experiment from the root HiveJob
///
public Optimization.Experiment GetExperiment() {
if (this.HiveJob != null) {
return HiveJob.Job.OptimizerAsExperiment;
}
return null;
}
#region IExecutable Members
private Core.ExecutionState executionState;
public ExecutionState ExecutionState {
get { return executionState; }
private set {
if (executionState != value) {
executionState = value;
OnExecutionStateChanged();
}
}
}
private TimeSpan executionTime;
public TimeSpan ExecutionTime {
get { return executionTime; }
private set {
if (executionTime != value) {
executionTime = value;
OnExecutionTimeChanged();
}
}
}
public void Pause() {
throw new NotSupportedException();
}
public void Prepare() {
// do nothing
}
public void Start() {
OnStarted();
ExecutionTime = new TimeSpan();
lastUpdateTime = DateTime.Now;
this.ExecutionState = Core.ExecutionState.Started;
Thread t = new Thread(RunUploadExperiment);
t.Name = "RunUploadExperimentThread";
t.Start();
}
private void RunUploadExperiment() {
try {
this.progress = new Progress("Connecting to server...");
IsProgressing = true;
using (Disposable service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
IEnumerable groups = ToResourceIdList(this.ResourceIds);
this.HiveJob.SetIndexInParentOptimizerList(null);
int totalJobCount = this.HiveJob.GetAllHiveJobs().Count();
int jobCount = 0;
this.progress.Status = "Uploading jobs...";
UploadJobWithChildren(service.Obj, this.HiveJob, null, groups, ref jobCount, totalJobCount);
this.rootJobId = this.HiveJob.JobDto.Id;
LogMessage("Finished sending jobs to hive");
// insert or update HiveExperiment
this.progress.Status = "Uploading HiveExperiment...";
ResponseObject resp = service.Obj.UpdateHiveExperiment(this.ToHiveExperimentDto());
this.UpdateFromDto(resp.Obj);
StartResultPolling();
}
}
catch (Exception e) {
OnExceptionOccured(e);
}
finally {
IsProgressing = false;
}
}
///
/// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs
///
///
///
///
/// shall be null if its the root job
///
private void UploadJobWithChildren(IClientFacade service, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable groups, ref int jobCount, int totalJobCount) {
jobCount++;
this.progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount);
SerializedJob serializedJob;
if (hiveJob.Job.ComputeInParallel &&
(hiveJob.Job.Optimizer is Optimization.Experiment || hiveJob.Job.Optimizer is Optimization.BatchRun)) {
hiveJob.JobDto.State = JobState.WaitForChildJobs;
hiveJob.Job.CollectChildJobs = false; // don't collect child-jobs on slaves
serializedJob = hiveJob.GetAsSerializedJob(true);
} else {
serializedJob = hiveJob.GetAsSerializedJob(false);
}
this.progress.Status = string.Format("Uploading job {0} of {1} ({2} kb)", jobCount, totalJobCount, serializedJob.SerializedJobData.Count() / 1024);
this.progress.ProgressValue = (double)jobCount / totalJobCount;
ResponseObject response;
if (parentHiveJob != null) {
response = service.AddChildJob(parentHiveJob.JobDto.Id, serializedJob);
} else {
response = service.AddJobWithGroupStrings(serializedJob, groups);
}
if (response.StatusMessage == ResponseStatus.Ok) {
LogMessage(response.Obj.Id, "Job sent to Hive");
hiveJob.JobDto = response.Obj;
foreach (HiveJob child in hiveJob.ChildHiveJobs) {
UploadJobWithChildren(service, child, hiveJob, groups, ref jobCount, totalJobCount);
}
} else {
throw new AddJobToHiveException(response.StatusMessage.ToString());
}
}
///
/// Converts a string which can contain Ids separated by ';' to a enumerable
///
private IEnumerable ToResourceIdList(string resourceGroups) {
if (!string.IsNullOrEmpty(resourceGroups)) {
return resourceIds.Split(';');
} else {
return new List();
}
}
public void Stop() {
using (Disposable service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
foreach (HiveJob hj in HiveJob.GetAllHiveJobs()) {
service.Obj.AbortJob(hj.JobDto.Id);
}
}
}
#endregion
public void StartResultPolling() {
if (!jobResultPoller.IsPolling) {
jobResultPoller.Start();
} else {
throw new JobResultPollingException("Result polling already running");
}
}
public void StopResultPolling() {
if (jobResultPoller.IsPolling) {
jobResultPoller.Stop();
} else {
throw new JobResultPollingException("Result polling not running");
}
}
#region HiveJob Events
void HiveJob_JobStateChanged(object sender, EventArgs e) {
if (HiveJob != null) {
rootJobId = HiveJob.JobDto.Id;
}
}
#endregion
#region Eventhandler
public event EventHandler ExecutionTimeChanged;
private void OnExecutionTimeChanged() {
EventHandler handler = ExecutionTimeChanged;
if (handler != null) handler(this, EventArgs.Empty);
}
public event EventHandler ExecutionStateChanged;
private void OnExecutionStateChanged() {
LogMessage("ExecutionState changed to " + executionState.ToString());
EventHandler handler = ExecutionStateChanged;
if (handler != null) handler(this, EventArgs.Empty);
}
public event EventHandler Started;
private void OnStarted() {
LogMessage("Started");
timer.Start();
EventHandler handler = Started;
if (handler != null) handler(this, EventArgs.Empty);
}
public event EventHandler Stopped;
private void OnStopped() {
LogMessage("Stopped");
timer.Stop();
EventHandler handler = Stopped;
if (handler != null) handler(this, EventArgs.Empty);
}
public event EventHandler Paused;
private void OnPaused() {
LogMessage("Paused");
EventHandler handler = Paused;
if (handler != null) handler(this, EventArgs.Empty);
}
public event EventHandler Prepared;
protected virtual void OnPrepared() {
LogMessage("Prepared");
EventHandler handler = Prepared;
if (handler != null) handler(this, EventArgs.Empty);
}
public event EventHandler ResourceIdsChanged;
protected virtual void OnResourceIdsChanged() {
EventHandler handler = ResourceIdsChanged;
if (handler != null) handler(this, EventArgs.Empty);
}
public event EventHandler IsResultsPollingChanged;
private void OnIsPollingResultsChanged() {
if (this.IsPollingResults) {
LogMessage("Results Polling Started");
} else {
LogMessage("Results Polling Stopped");
}
EventHandler handler = IsResultsPollingChanged;
if (handler != null) handler(this, EventArgs.Empty);
}
public event EventHandler> ExceptionOccurred;
private void OnExceptionOccured(Exception e) {
var handler = ExceptionOccurred;
if (handler != null) handler(this, new EventArgs(e));
}
public event EventHandler HiveJobChanged;
private void OnHiveJobChanged() {
if (jobResultPoller != null && jobResultPoller.IsPolling) {
jobResultPoller.Stop();
DeregisterResultPollingEvents();
}
if (HiveJob != null) {
jobResultPoller = new JobResultPoller(HiveJob, ApplicationConstants.RESULT_POLLING_INTERVAL);
RegisterResultPollingEvents();
}
EventHandler handler = HiveJobChanged;
if (handler != null) handler(this, EventArgs.Empty);
}
public event EventHandler IsProgressingChanged;
private void OnIsProgressingChanged() {
var handler = IsProgressingChanged;
if (handler != null) handler(this, EventArgs.Empty);
}
#endregion
#region JobResultPoller Events
private void RegisterResultPollingEvents() {
jobResultPoller.ExceptionOccured += new EventHandler>(jobResultPoller_ExceptionOccured);
jobResultPoller.JobResultsReceived += new EventHandler>(jobResultPoller_JobResultReceived);
jobResultPoller.PollingStarted += new EventHandler(jobResultPoller_PollingStarted);
jobResultPoller.PollingFinished += new EventHandler(jobResultPoller_PollingFinished);
jobResultPoller.IsPollingChanged += new EventHandler(jobResultPoller_IsPollingChanged);
}
private void DeregisterResultPollingEvents() {
jobResultPoller.ExceptionOccured -= new EventHandler>(jobResultPoller_ExceptionOccured);
jobResultPoller.JobResultsReceived -= new EventHandler>(jobResultPoller_JobResultReceived);
jobResultPoller.PollingStarted -= new EventHandler(jobResultPoller_PollingStarted);
jobResultPoller.PollingFinished -= new EventHandler(jobResultPoller_PollingFinished);
jobResultPoller.IsPollingChanged -= new EventHandler(jobResultPoller_IsPollingChanged);
}
void jobResultPoller_IsPollingChanged(object sender, EventArgs e) {
this.IsPollingResults = jobResultPoller.IsPolling;
}
void jobResultPoller_PollingFinished(object sender, EventArgs e) {
LogMessage("Polling results finished");
}
void jobResultPoller_PollingStarted(object sender, EventArgs e) {
LogMessage("Polling results started");
}
void jobResultPoller_JobResultReceived(object sender, EventArgs e) {
foreach (JobResult jobResult in e.Value) {
HiveJob hj = hiveJob.GetHiveJobByJobId(jobResult.Id);
if (hj != null) {
hj.UpdateFromJobResult(jobResult);
if ((hj.JobDto.State == JobState.Aborted ||
hj.JobDto.State == JobState.Failed ||
hj.JobDto.State == JobState.Finished) &&
!hj.IsFinishedOptimizerDownloaded) {
LogMessage(hj.JobDto.Id, "Downloading optimizer for job");
OptimizerJob optimizerJob = LoadOptimizerJob(hj.JobDto.Id);
if (jobResult.ParentJobId.HasValue) {
HiveJob parentHiveJob = HiveJob.GetHiveJobByJobId(jobResult.ParentJobId.Value);
parentHiveJob.UpdateChildOptimizer(optimizerJob, hj.JobDto.Id);
} else {
this.HiveJob.IsFinishedOptimizerDownloaded = true;
}
}
}
}
GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory)
if (AllJobsFinished()) {
this.ExecutionState = Core.ExecutionState.Stopped;
StopResultPolling();
OnStopped();
}
}
private bool AllJobsFinished() {
return HiveJob.GetAllHiveJobs().All(hj => hj.IsFinishedOptimizerDownloaded);
}
void jobResultPoller_ExceptionOccured(object sender, EventArgs e) {
OnExceptionOccured(e.Value);
}
#endregion
#region Execution Time Timer
private void InitTimer() {
timer = new System.Timers.Timer(100);
timer.AutoReset = true;
timer.Elapsed += new System.Timers.ElapsedEventHandler(timer_Elapsed);
}
private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
DateTime now = DateTime.Now;
ExecutionTime += now - lastUpdateTime;
lastUpdateTime = now;
}
#endregion
#region Logging
private void LogMessage(string message) {
// HeuristicLab.Log is not Thread-Safe, so lock on every call
lock (locker) {
log.LogMessage(message);
Logger.Debug(message);
}
}
private void LogMessage(Guid jobId, string message) {
LogMessage(message + " (jobId: " + jobId + ")");
}
#endregion
///
/// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem
///
public void LoadHiveJob() {
progress = new Progress();
try {
IsProgressing = true;
int totalJobCount = 0;
int jobCount = 0;
progress.Status = "Connecting to Server...";
using (Disposable service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
// fetch all JobDto objects to create the full tree of tree of HiveJob objects
progress.Status = "Downloading list of jobs...";
JobResultList allResults = service.Obj.GetChildJobResults(rootJobId.Value, true, true).Obj;
totalJobCount = allResults.Count;
// download them first
IDictionary allSerializedJobs = new Dictionary();
foreach (JobResult jobResult in allResults) {
jobCount++;
progress.Status = string.Format("Downloading {0} of {1} jobs...", jobCount, totalJobCount);
allSerializedJobs.Add(jobResult.Id, service.Obj.GetLastSerializedResult(jobResult.Id).Obj);
progress.ProgressValue = (double)jobCount / totalJobCount;
}
jobCount = 1;
progress.Status = string.Format("Deserializing {0} of {1} jobs... ({2} kb)", jobCount, totalJobCount, allSerializedJobs[this.rootJobId.Value].SerializedJobData.Count() / 1024);
this.HiveJob = new HiveJob(allSerializedJobs[this.rootJobId.Value], false);
allSerializedJobs.Remove(this.rootJobId.Value); // reduce memory footprint
progress.ProgressValue = (double)jobCount / totalJobCount;
if (this.HiveJob.JobDto.DateFinished.HasValue) {
this.ExecutionTime = this.HiveJob.JobDto.DateFinished.Value - this.HiveJob.JobDto.DateCreated.Value;
this.lastUpdateTime = this.HiveJob.JobDto.DateFinished.Value;
this.ExecutionState = Core.ExecutionState.Stopped;
OnStopped();
} else {
this.ExecutionTime = DateTime.Now - this.HiveJob.JobDto.DateCreated.Value;
this.lastUpdateTime = DateTime.Now;
this.ExecutionState = Core.ExecutionState.Started;
OnStarted();
}
// build child-job tree
LoadChildResults(service.Obj, this.HiveJob, allResults, allSerializedJobs, progress, totalJobCount, ref jobCount);
StartResultPolling();
}
}
catch (Exception e) {
OnExceptionOccured(e);
}
finally {
IsProgressing = false;
}
}
private void LoadChildResults(IClientFacade service, HiveJob parentHiveJob, JobResultList allResults, IDictionary allSerializedJobs, IProgress progress, int totalJobCount, ref int jobCount) {
IEnumerable childResults = from result in allResults
where result.ParentJobId.HasValue && result.ParentJobId.Value == parentHiveJob.JobDto.Id
orderby result.DateCreated ascending
select result;
foreach (JobResult jobResult in childResults) {
jobCount++;
progress.Status = string.Format("Deserializing {0} of {1} jobs ({2} kb)...", jobCount, totalJobCount, allSerializedJobs[jobResult.Id].SerializedJobData.Count() / 1024);
OptimizerJob optimizerJob = SerializedJob.Deserialize(allSerializedJobs[jobResult.Id].SerializedJobData);
progress.ProgressValue = (double)jobCount / totalJobCount;
HiveJob childHiveJob = new HiveJob(optimizerJob, false);
parentHiveJob.AddChildHiveJob(childHiveJob);
childHiveJob.JobDto = allSerializedJobs[jobResult.Id].JobInfo;
allSerializedJobs.Remove(jobResult.Id); // reduce memory footprint
if (jobCount % 10 == 0) GC.Collect(); // this is needed or otherwise HL takes over the system when the number of jobs is high
LoadChildResults(service, childHiveJob, allResults, allSerializedJobs, progress, totalJobCount, ref jobCount);
}
}
private OptimizerJob LoadOptimizerJob(Guid jobId) {
using (Disposable service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
ResponseObject serializedJob = service.Obj.GetLastSerializedResult(jobId);
return SerializedJob.Deserialize(serializedJob.Obj.SerializedJobData);
}
}
}
}