#region License Information
/* HeuristicLab
* Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.Text;
using HeuristicLab.Core;
using System.Threading;
using System.IO;
using System.Xml;
using System.IO.Compression;
using HeuristicLab.Common;
using HeuristicLab.Hive.JobBase;
using HeuristicLab.Hive.Contracts.Interfaces;
using HeuristicLab.Hive.Contracts;
using HeuristicLab.PluginInfrastructure;
using HeuristicLab.Hive.Contracts.BusinessObjects;
using HeuristicLab.Tracing;
namespace HeuristicLab.Hive.Engine {
///
/// Represents an engine that executes its operator-graph on the hive.
/// in parallel.
///
public class HiveEngine : ItemBase, IEngine, IEditable {
private const int SNAPSHOT_POLLING_INTERVAL_MS = 1000;
private const int RESULT_POLLING_INTERVAL_MS = 10000;
private const int MAX_SNAPSHOT_RETRIES = 20;
private Guid jobId;
private Job job;
private object locker = new object();
private volatile bool abortRequested;
public string HiveServerUrl { get; set; }
public HiveEngine() {
job = new Job();
abortRequested = false;
Priority = ThreadPriority.Lowest;
}
#region IEngine Members
public IOperatorGraph OperatorGraph {
get { return job.Engine.OperatorGraph; }
}
public IScope GlobalScope {
get { return job.Engine.GlobalScope; }
}
public TimeSpan ExecutionTime {
get { return job.Engine.ExecutionTime; }
}
public ThreadPriority Priority {
get { return job.Engine.Priority; }
set { job.Engine.Priority = value; }
}
public bool Running {
get { return job.Engine.Running; }
}
public bool Canceled {
get { return job.Engine.Canceled; }
}
public bool Terminated {
get { return job.Engine.Terminated; }
}
public void Execute() {
var jobObj = CreateJobObj();
IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
ResponseObject res = executionEngineFacade.AddJob(jobObj);
jobId = res.Obj.Id;
StartResultPollingThread();
}
private void StartResultPollingThread() {
// start a backgroud thread to poll the final result of the job
Thread t = new Thread(() => {
IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
ResponseObject response = null;
Job restoredJob = null;
do {
Thread.Sleep(RESULT_POLLING_INTERVAL_MS);
lock (locker) {
HiveLogger.Debug("HiveEngine: Results-polling - GetLastResult");
response = executionEngineFacade.GetLastSerializedResult(jobId, false);
HiveLogger.Debug("HiveEngine: Results-polling - Server: " + response.StatusMessage + " success: " + response.Success);
// loop while
// 1. the user doesn't request an abort
// 2. there is a problem with server communication (success==false)
// 3. no result for the job is available yet (response.Obj==null)
// 4. the result that we get from the server is a snapshot and not the final result
if (abortRequested) return;
if (response.Success && response.Obj != null) {
HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
restoredJob = (Job)PersistenceManager.RestoreFromGZip(response.Obj.SerializedJobResultData);
HiveLogger.Debug("HiveEngine: Results-polling - IsSnapshotResult: " + (restoredJob.Progress<1.0));
}
}
} while (restoredJob == null || (restoredJob.Progress < 1.0));
job = restoredJob;
PluginManager.ControlManager.ShowControl(job.Engine.CreateView());
OnChanged();
OnFinished();
});
HiveLogger.Debug("HiveEngine: Starting results-polling thread");
t.Start();
}
public void RequestSnapshot() {
IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
int retryCount = 0;
ResponseObject response;
lock (locker) {
HiveLogger.Debug("HiveEngine: Abort - RequestSnapshot");
Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
// job is finished already
HiveLogger.Debug("HiveEngine: Abort - GetLastResult(false)");
response = executionEngineFacade.GetLastSerializedResult(jobId, false);
HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
} else {
// server sent snapshot request to client
// poll until snapshot is ready
do {
Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS);
HiveLogger.Debug("HiveEngine: Abort - GetLastResult(true)");
response = executionEngineFacade.GetLastSerializedResult(jobId, true);
HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
retryCount++;
// loop while
// 1. problem with communication with server
// 2. job result not yet ready
} while (
(retryCount < MAX_SNAPSHOT_RETRIES) && (
!response.Success ||
response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE)
);
}
}
SerializedJobResult jobResult = response.Obj;
if (jobResult != null) {
HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
job = (Job)PersistenceManager.RestoreFromGZip(jobResult.SerializedJobResultData);
PluginManager.ControlManager.ShowControl(job.Engine.CreateView());
}
//HiveLogger.Debug("HiveEngine: Results-polling - Exception!");
//Exception ex = new Exception(response.Obj.Exception.Message);
//ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
}
public void ExecuteStep() {
throw new NotSupportedException();
}
public void ExecuteSteps(int steps) {
throw new NotSupportedException();
}
public void Abort() {
abortRequested = true;
// RequestSnapshot();
IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
executionEngineFacade.AbortJob(jobId);
OnChanged();
OnFinished();
}
public void Reset() {
abortRequested = false;
job.Engine.Reset();
jobId = Guid.NewGuid();
OnInitialized();
}
private HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob CreateJobObj() {
HeuristicLab.Hive.Contracts.BusinessObjects.Job jobObj =
new HeuristicLab.Hive.Contracts.BusinessObjects.Job();
MemoryStream memStream = new MemoryStream();
GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
XmlDocument document = PersistenceManager.CreateXmlDocument();
Dictionary dictionary = new Dictionary();
XmlNode rootNode = document.CreateElement("Root");
document.AppendChild(rootNode);
rootNode.AppendChild(PersistenceManager.Persist(job, document, dictionary));
document.Save(stream);
stream.Close();
HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob executableJobObj =
new HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob();
executableJobObj.JobInfo = jobObj;
executableJobObj.SerializedJobData = memStream.ToArray();
DiscoveryService service = new DiscoveryService();
List plugins = new List();
foreach (IStorable storeable in dictionary.Values) {
PluginInfo pluginInfo = service.GetDeclaringPlugin(storeable.GetType());
if (!plugins.Contains(pluginInfo)) {
plugins.Add(pluginInfo);
foreach (var dependency in pluginInfo.Dependencies) {
if (!plugins.Contains(dependency)) plugins.Add(dependency);
}
}
}
List pluginsNeeded =
new List();
foreach (PluginInfo uniquePlugin in plugins) {
HivePluginInfo pluginInfo =
new HivePluginInfo();
pluginInfo.Name = uniquePlugin.Name;
pluginInfo.Version = uniquePlugin.Version.ToString();
pluginInfo.BuildDate = uniquePlugin.BuildDate;
pluginsNeeded.Add(pluginInfo);
}
jobObj.CoresNeeded = 1;
jobObj.PluginsNeeded = pluginsNeeded;
jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
return executableJobObj;
}
public event EventHandler Initialized;
///
/// Fires a new Initialized event.
///
protected virtual void OnInitialized() {
if (Initialized != null)
Initialized(this, new EventArgs());
}
public event EventHandler> OperationExecuted;
///
/// Fires a new OperationExecuted event.
///
/// The operation that has been executed.
protected virtual void OnOperationExecuted(IOperation operation) {
if (OperationExecuted != null)
OperationExecuted(this, new EventArgs(operation));
}
public event EventHandler> ExceptionOccurred;
///
/// Aborts the execution and fires a new ExceptionOccurred event.
///
/// The exception that was thrown.
protected virtual void OnExceptionOccurred(Exception exception) {
Abort();
if (ExceptionOccurred != null)
ExceptionOccurred(this, new EventArgs(exception));
}
public event EventHandler ExecutionTimeChanged;
///
/// Fires a new ExecutionTimeChanged event.
///
protected virtual void OnExecutionTimeChanged() {
if (ExecutionTimeChanged != null)
ExecutionTimeChanged(this, new EventArgs());
}
public event EventHandler Finished;
///
/// Fires a new Finished event.
///
protected virtual void OnFinished() {
if (Finished != null)
Finished(this, new EventArgs());
}
#endregion
public override IView CreateView() {
return new HiveEngineEditor(this);
}
#region IEditable Members
public IEditor CreateEditor() {
return new HiveEngineEditor(this);
}
#endregion
public override System.Xml.XmlNode GetXmlNode(string name, System.Xml.XmlDocument document, IDictionary persistedObjects) {
XmlNode node = base.GetXmlNode(name, document, persistedObjects);
XmlAttribute attr = document.CreateAttribute("HiveServerUrl");
attr.Value = HiveServerUrl;
node.Attributes.Append(attr);
node.AppendChild(PersistenceManager.Persist("Job", job, document, persistedObjects));
return node;
}
public override void Populate(System.Xml.XmlNode node, IDictionary restoredObjects) {
base.Populate(node, restoredObjects);
HiveServerUrl = node.Attributes["HiveServerUrl"].Value;
job = (Job)PersistenceManager.Restore(node.SelectSingleNode("Job"), restoredObjects);
}
}
}