#region License Information
/* HeuristicLab
* Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using HeuristicLab.Common;
using HeuristicLab.Core;
using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
using System.Reflection;
using System.IO;
using HeuristicLab.Persistence.Default.Xml;
using System.Diagnostics;
using HeuristicLab.Optimization;
using System.Linq;
using Microsoft.Hpc.Scheduler;
using System.ServiceModel;
using HeuristicLab.MPIAlgorithmRunner;
using HeuristicLab.Operators.MPISupport;
using Microsoft.Hpc.Scheduler.Properties;
using System.Xml;
using System.ComponentModel;
namespace HeuristicLab.MPIEngine {
///
/// Represents an engine that executes its steps in parallel (if possible) using multiple threads.
/// This engine is suitable for parallel processing on shared memory systems which provide multiple cores.
///
[StorableClass]
[Item("MPI Engine", "Engine for parallel execution of algorithms using multiple processes (suitable for distributed memory systems with multiple cores).")]
public class MPIEngine : Engine {
private string username;
[Category("MPISettings")]
[Browsable(true)]
public string UserName {
get {
return username;
}
set {
username = value;
}
}
private string password;
[Category("MPISettings")]
[Browsable(true)]
[PasswordPropertyText(true)]
public string Password {
get {
return password;
}
set {
password = value;
}
}
[Storable]
private string headNode;
[Category("MPISettings")]
[Browsable(true)]
public string HeadNode {
get {
return headNode;
}
set {
headNode = value;
}
}
[Storable]
private string path;
[Category("MPISettings")]
[Browsable(true)]
public string Path {
get {
return path;
}
set {
path = value;
}
}
[Storable]
private int updateInterval;
[Category("MPISettings")]
[Browsable(true)]
public int UpdateInterval {
get {
return updateInterval;
}
set {
updateInterval = value;
}
}
[Storable]
private int cpuPerNode;
[Category("MPISettings")]
[Browsable(true)]
public int CpuPerNode {
get {
return cpuPerNode;
}
set {
cpuPerNode = value;
}
}
[Storable]
private List requestedNodes;
[Category("MPISettings")]
[Browsable(true)]
public List RequestedNodes {
get {
return requestedNodes;
}
set {
requestedNodes = value;
}
}
[StorableConstructor]
protected MPIEngine(bool deserializing) : base(deserializing) { }
protected MPIEngine(MPIEngine original, Cloner cloner) : base(original, cloner) {
username = original.username;
password = original.password;
headNode = original.headNode;
path = original.path;
updateInterval = original.updateInterval;
cpuPerNode = original.cpuPerNode;
requestedNodes = new List();
foreach (string node in original.requestedNodes)
requestedNodes.Add(node);
}
public MPIEngine() : base() {
username = @"user";
password = @"password";
headNode = "blade00.hpc.fh-hagenberg.at";
path = @"C:\public\MPISupport";
updateInterval = 5000;
cpuPerNode = 3;
requestedNodes = new List();
requestedNodes.Add("BLADE00");
}
public override IDeepCloneable Clone(Cloner cloner) {
return new MPIEngine(this, cloner);
}
private IAlgorithm algorithm;
public override void Start() {
if (ExecutionStack.Count == 1) {
ExecutionContext context = ExecutionStack.First() as ExecutionContext;
ExecutionContext algorithmContext = context.Parent as ExecutionContext;
EngineAlgorithm alg = typeof(ExecutionContext).InvokeMember("parameterizedItem",
BindingFlags.GetField | BindingFlags.NonPublic |
BindingFlags.Instance, null, algorithmContext, null) as EngineAlgorithm;
alg = alg.Clone() as EngineAlgorithm;
alg.Engine = new SequentialEngine.SequentialEngine();
algorithm = alg;
}
base.Start();
}
protected override void OnPaused() {
base.OnPaused();
Stop();
}
protected override void Run(System.Threading.CancellationToken cancellationToken) {
if (ExecutionStack.Count == 1) {
ExecutionContext context = ExecutionStack.Pop() as ExecutionContext;
IScope globalScope = context.Scope;
string exec = @"mpiexec";
string args = @"-c " + cpuPerNode + " /genvlist CCP_JOBID " + path + @"\HeuristicLab.MPIAlgorithmRunner-3.3.exe";
IScheduler scheduler = new Scheduler();
scheduler.Connect(headNode);
ISchedulerJob job = scheduler.CreateJob();
job.Name = "HeuristicLab.MPIEngine";
foreach (string requestedNode in requestedNodes)
job.RequestedNodes.Add(requestedNode);
ISchedulerTask task = job.CreateTask();
task.Name = "HeuristicLab.MPIAlgorithmRunner";
task.CommandLine = exec + " " + args;
task.StdOutFilePath = "stdout.txt";
task.StdErrFilePath = "stderr.txt";
task.WorkDirectory = path;
task.MinimumNumberOfCores = task.MaximumNumberOfCores = cpuPerNode * requestedNodes.Count;
job.AddTask(task);
scheduler.SubmitJob(job, username, password);
try {
string address = null;
int timeout = 10;
while (address == null && timeout > 0) {
cancellationToken.ThrowIfCancellationRequested();
ISchedulerJob schedulerJob = scheduler.OpenJob(job.Id);
if (schedulerJob != null) {
NameValue property = schedulerJob.GetCustomProperties().FirstOrDefault(p => p.Name == "address");
if (property != null) {
address = property.Value;
} else {
System.Threading.Thread.Sleep(1000);
timeout--;
}
}
}
if (address == null) {
throw new Exception("A timeout occurred when starting the MPIAlgorithmRunner");
}
NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None);
netTCPBinding.TransferMode = TransferMode.Streamed;
netTCPBinding.MaxReceivedMessageSize = int.MaxValue;
ChannelFactory factory = new ChannelFactory(netTCPBinding, address);
IAlgorithmBroker proxy = factory.CreateChannel();
Stream stream = new MemoryStream();
XmlGenerator.Serialize(algorithm, stream);
stream = new MemoryStream((stream as MemoryStream).GetBuffer());
proxy.TransmitAlgorithm(stream);
proxy.SetUpdateInterval(updateInterval);
while (!proxy.IsAlgorithmTerminated()) {
cancellationToken.ThrowIfCancellationRequested();
ItemList results = StreamingHelper.StreamItem(proxy.GetResults()) as ItemList;
ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection);
if (resultCollection != null && results != null) {
if (!resultCollection.ContainsKey("MPIResults"))
resultCollection.Add(new Result("MPIResults", results));
resultCollection["MPIResults"].Value = results;
}
System.Threading.Thread.Sleep(updateInterval);
}
}
catch (Exception e) {
scheduler.CancelJob(job.Id, "Exception: " + e.GetType());
throw e;
}
}
}
}
}