#region License Information
/* HeuristicLab
* Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using HeuristicLab.Clients.Common;
using HeuristicLab.Clients.Hive;
using HeuristicLab.Common;
using HeuristicLab.Core;
using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
namespace HeuristicLab.HiveEngine {
///
/// Represents an engine that executes operations which can be executed in parallel on the hive
///
[StorableClass]
[Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
public class HiveEngine : Engine, IHiveEngine {
private static object locker = new object();
private CancellationToken cancellationToken;
private bool firstRun = true;
AutoResetEvent handle = new AutoResetEvent(false);
[Storable]
public Guid ParentTaskId { get; set; }
[Storable]
public bool PauseWhileCalculatingChilds { get; set; }
[Storable]
private IOperator currentOperator;
[Storable]
private int nrOfSentRuns;
[Storable]
private string username;
public string Username {
get { return username; }
set { username = value; }
}
[Storable]
private string password;
public string Password {
get { return password; }
set { password = value; }
}
[Storable]
public string ResourceNames { get; set; }
[Storable]
private int priority;
public int Priority {
get { return priority; }
set { priority = value; }
}
[Storable]
private TimeSpan executionTimeOnHive;
public TimeSpan ExecutionTimeOnHive {
get { return executionTimeOnHive; }
set {
if (value != executionTimeOnHive) {
executionTimeOnHive = value;
OnExecutionTimeOnHiveChanged();
}
}
}
[Storable]
private bool isPrivileged;
public bool IsPrivileged {
get { return isPrivileged; }
set { isPrivileged = value; }
}
// Task can't be storable, so RefreshableHiveExperiment can't be stored. But as previous runs are only informative it does not matter (only execution time on hive will be wrong because of that -> Todo)
private ItemCollection jobs = new ItemCollection();
public ItemCollection Jobs {
get { return jobs; }
set { jobs = value; }
}
private List onlinePlugins;
public List OnlinePlugins {
get { return onlinePlugins; }
set { onlinePlugins = value; }
}
private List alreadyUploadedPlugins;
public List AlreadyUploadedPlugins {
get { return alreadyUploadedPlugins; }
set { alreadyUploadedPlugins = value; }
}
[Storable]
private OperationCollection continueCollection = null;
[Storable]
private Guid lastJobGuid = Guid.Empty;
public bool IsAllowedPrivileged { get; set; }
#region constructors and cloning
public HiveEngine() {
this.ResourceNames = "HEAL";
this.Priority = 0;
this.log = new ThreadSafeLog();
this.IsAllowedPrivileged = HiveServiceLocator.Instance.CallHiveService((s) => s.IsAllowedPrivileged());
HiveServiceClient cl = ClientFactory.CreateClient();
username = cl.ClientCredentials.UserName.UserName;
password = cl.ClientCredentials.UserName.Password;
PauseWhileCalculatingChilds = true;
}
[StorableConstructor]
protected HiveEngine(bool deserializing) : base(deserializing) { }
[StorableHook(HookType.AfterDeserialization)]
private void AfterDeserialization() {
if (username != string.Empty && password != string.Empty) {
HiveServiceLocator.Instance.Username = username;
HiveServiceLocator.Instance.Password = password;
}
}
protected HiveEngine(HiveEngine original, Cloner cloner)
: base(original, cloner) {
this.ResourceNames = original.ResourceNames;
this.currentOperator = cloner.Clone(original.currentOperator);
this.priority = original.priority;
this.executionTimeOnHive = original.executionTimeOnHive;
this.IsPrivileged = original.IsPrivileged;
this.username = original.username;
this.password = original.password;
this.continueCollection = original.continueCollection;
this.lastJobGuid = original.lastJobGuid;
this.nrOfSentRuns = original.nrOfSentRuns;
this.ParentTaskId = original.ParentTaskId;
this.PauseWhileCalculatingChilds = original.PauseWhileCalculatingChilds;
// do not clone jobs - otherwise they would be sent with every task
}
public override IDeepCloneable Clone(Cloner cloner) {
return new HiveEngine(this, cloner);
}
#endregion
#region Events
protected override void OnPrepared() {
base.OnPrepared();
this.ExecutionTimeOnHive = TimeSpan.Zero;
continueCollection = null;
lastJobGuid = Guid.Empty;
}
public event EventHandler ExecutionTimeOnHiveChanged;
protected virtual void OnExecutionTimeOnHiveChanged() {
var handler = ExecutionTimeOnHiveChanged;
if (handler != null) handler(this, EventArgs.Empty);
}
#endregion
protected override void Run(CancellationToken cancellationToken) {
this.cancellationToken = cancellationToken;
Run(ExecutionStack);
}
private void Run(object state) {
Stack executionStack = (Stack)state;
IOperation next;
OperationCollection coll;
IAtomicOperation operation;
if (firstRun) {
TaskScheduler.UnobservedTaskException += new EventHandler(TaskScheduler_UnobservedTaskException);
this.OnlinePlugins = HiveServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.Hash != null).ToList();
this.AlreadyUploadedPlugins = new List();
firstRun = false;
}
while (executionStack.Count > 0) {
cancellationToken.ThrowIfCancellationRequested();
if (continueCollection != null && lastJobGuid != Guid.Empty && PauseWhileCalculatingChilds) {
EngineTask[] eTasks = RetrieveResultsFromHive(lastJobGuid);
RestoreStateFromExecutedHiveTasks(eTasks, continueCollection);
continueCollection = null;
lastJobGuid = Guid.Empty;
}
next = executionStack.Pop();
if (next is OperationCollection) {
coll = (OperationCollection)next;
if (coll.Parallel) {
try {
// clone the parent scope here and reuse it for each operation. otherwise for each task the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
parentScopeClone.SubScopes.Clear();
parentScopeClone.ClearParentScopes();
EngineTask[] tasks = new EngineTask[coll.Count];
for (int i = 0; i < coll.Count; i++) {
tasks[i] = new EngineTask(coll[i], new SequentialEngine.SequentialEngine());
}
var experiment = CreateJob();
var engineTasks = ExecuteOnHive(experiment, tasks, parentScopeClone, new CancellationToken());
if (PauseWhileCalculatingChilds) {
continueCollection = coll;
Pause();
} else {
RestoreStateFromExecutedHiveTasks(engineTasks, coll);
}
}
catch {
executionStack.Push(coll); throw;
}
} else {
for (int i = coll.Count - 1; i >= 0; i--)
if (coll[i] != null) executionStack.Push(coll[i]);
}
} else if (next is IAtomicOperation) {
operation = (IAtomicOperation)next;
try {
next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
}
catch (Exception ex) {
executionStack.Push(operation);
if (ex is OperationCanceledException) throw ex;
else throw new OperatorExecutionException(operation.Operator, ex);
}
if (next != null) executionStack.Push(next);
if (operation.Operator.Breakpoint) {
log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
Pause();
}
}
}
}
private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
e.SetObserved(); // avoid crash of process
}
private IRandom FindRandomParameter(IExecutionContext ec) {
try {
if (ec == null)
return null;
foreach (var p in ec.Parameters) {
if (p.Name == "Random" && p is IValueParameter)
return ((IValueParameter)p).Value as IRandom;
}
return FindRandomParameter(ec.Parent);
}
catch { return null; }
}
private void RestoreStateFromExecutedHiveTasks(EngineTask[] tasks, OperationCollection coll) {
IScope[] scopes = new Scope[tasks.Length];
if (tasks.Count() != coll.Count) {
throw new ArgumentException("Retrieved tasks don't match operation collection");
}
int j = 0;
foreach (var hiveJob in tasks) {
var scope = ((IAtomicOperation)((EngineTask)hiveJob).InitialOperation).Scope;
scopes[j++] = scope;
}
for (int i = 0; i < coll.Count; i++) {
if (coll[i] is IAtomicOperation) {
ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
} else if (coll[i] is OperationCollection) {
// todo ??
}
}
}
private static void ExchangeScope(IScope source, IScope target) {
target.Variables.Clear();
target.Variables.AddRange(source.Variables);
target.SubScopes.Clear();
target.SubScopes.AddRange(source.SubScopes);
// TODO: validate if parent scopes match - otherwise source is invalid
}
///
/// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
///
///
private EngineTask[] ExecuteOnHive(RefreshableJob refreshableJob, EngineTask[] tasks, IScope parentScopeClone, CancellationToken cancellationToken) {
log.LogMessage(string.Format("Executing {0} operations on the hive.", tasks.Length));
IScope[] scopes = new Scope[tasks.Length];
object locker = new object();
var hiveExperiment = refreshableJob.Job;
try {
// create upload-tasks
for (int i = 0; i < tasks.Length; i++) {
var engineHiveTask = new EngineHiveTask(tasks[i], parentScopeClone);
engineHiveTask.Task.Priority = this.Priority;
if (ParentTaskId != Guid.Empty && PauseWhileCalculatingChilds) {
engineHiveTask.Task.ParentTaskId = ParentTaskId;
}
refreshableJob.HiveTasks.Add(engineHiveTask);
// shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
//IRandom random = FindRandomParameter(tasks[i].InitialOperation as IExecutionContext);
//if (random != null)
//random.Reset(random.Next());
}
HiveClient.StartJob((e) => { log.LogException(e); }, refreshableJob, cancellationToken);
while (refreshableJob.Job.Id == Guid.Empty) {
Thread.Sleep(500);
}
this.lastJobGuid = refreshableJob.Job.Id;
refreshableJob.Progress.Finished += new EventHandler(Progress_Finished);
handle.WaitOne();
refreshableJob.Progress.Finished -= new EventHandler(Progress_Finished);
if (!PauseWhileCalculatingChilds) {
while (!refreshableJob.AllJobsFinished()) {
Thread.Sleep(1000);
}
List allHiveTasks = refreshableJob.HiveTasks.ToList();
allHiveTasks.ForEach(x => this.ExecutionTimeOnHive += x.Task.ExecutionTime);
var failedJobs = allHiveTasks.Where(x => x.Task.State != TaskState.Finished);
if (failedJobs.Count() > 0) {
throw new HiveEngineException("Task (" + failedJobs.First().Task.Id + ") failed: " + failedJobs.First().Task.StateLog.Last().Exception);
}
List engineTasks = new List();
foreach (var hTask in allHiveTasks) {
EngineTask ehTask = (EngineTask)hTask.ItemTask;
engineTasks.Add(ehTask);
}
jobs.Clear();
DeleteHiveExperiment(refreshableJob.Id);
return engineTasks.ToArray();
} else {
return null;
}
}
catch (OperationCanceledException e) {
throw e;
}
catch (Exception e) {
log.LogException(e);
throw e;
}
}
void Progress_Finished(object sender, EventArgs e) {
handle.Set();
}
private EngineTask[] RetrieveResultsFromHive(Guid jobGuid) {
Job job = HiveServiceLocator.Instance.CallHiveService(s => s.GetJob(jobGuid));
var allTasks = HiveServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobTasks(job.Id));
var totalJobCount = allTasks.Count();
TaskDownloader downloader = new TaskDownloader(allTasks.Select(x => x.Id));
downloader.StartAsync();
while (!downloader.IsFinished) {
Thread.Sleep(500);
if (downloader.IsFaulted) {
throw downloader.Exception;
}
}
List allHiveTasks = downloader.Results.Values.ToList();
allHiveTasks.ForEach(x => this.ExecutionTimeOnHive += x.Task.ExecutionTime);
var failedJobs = allHiveTasks.Where(x => x.Task.State != TaskState.Finished);
if (failedJobs.Count() > 0) {
throw new HiveEngineException("Task (" + failedJobs.First().Task.Id + ") failed: " + failedJobs.First().Task.StateLog.Last().Exception);
}
List engineTasks = new List();
foreach (var hTask in allHiveTasks) {
EngineTask ehTask = (EngineTask)hTask.ItemTask;
engineTasks.Add(ehTask);
}
jobs.Clear();
DeleteHiveExperiment(jobGuid);
return engineTasks.ToArray();
}
private RefreshableJob CreateJob() {
lock (locker) {
var hiveExperiment = new Job();
hiveExperiment.Name = "HiveEngine Run " + nrOfSentRuns++;
hiveExperiment.DateCreated = DateTime.Now;
hiveExperiment.ResourceNames = this.ResourceNames;
hiveExperiment.IsPrivileged = this.IsPrivileged;
var refreshableHiveExperiment = new RefreshableJob(hiveExperiment);
//refreshableHiveExperiment.RefreshAutomatically = false;
refreshableHiveExperiment.IsDownloadable = false; // download happens automatically so disable button
jobs.Add(refreshableHiveExperiment);
return refreshableHiveExperiment;
}
}
private void DeleteHiveExperiment(Guid jobId) {
HiveClient.TryAndRepeat(() => {
HiveServiceLocator.Instance.CallHiveService(s => s.DeleteJob(jobId));
}, 5, string.Format("Could not delete jobs"));
}
private List GetResourceIds() {
return HiveServiceLocator.Instance.CallHiveService(service => {
var resourceNames = ResourceNames.Split(';');
var resourceIds = new List();
foreach (var resourceName in resourceNames) {
Guid resourceId = service.GetResourceId(resourceName);
if (resourceId == Guid.Empty) {
throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
}
resourceIds.Add(resourceId);
}
return resourceIds;
});
}
}
}