#region License Information /* HeuristicLab * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using HeuristicLab.Common; using HeuristicLab.Core; using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; using System.Reflection; using System.IO; using HeuristicLab.Persistence.Default.Xml; using System.Diagnostics; using HeuristicLab.Optimization; using System.Linq; using Microsoft.Hpc.Scheduler; using System.ServiceModel; using HeuristicLab.MPIAlgorithmRunner; using HeuristicLab.Operators.MPISupport; using Microsoft.Hpc.Scheduler.Properties; using System.Xml; using System.ComponentModel; namespace HeuristicLab.MPIEngine { /// /// Represents an engine that executes its steps in parallel (if possible) using multiple threads. /// This engine is suitable for parallel processing on shared memory systems which provide multiple cores. /// [StorableClass] [Item("MPI Engine", "Engine for parallel execution of algorithms using multiple processes (suitable for distributed memory systems with multiple cores).")] public class MPIEngine : Engine { private string username; [Category("MPISettings")] [Browsable(true)] public string UserName { get { return username; } set { username = value; } } private string password; [Category("MPISettings")] [Browsable(true)] [PasswordPropertyText(true)] public string Password { get { return password; } set { password = value; } } [Storable] private string headNode; [Category("MPISettings")] [Browsable(true)] public string HeadNode { get { return headNode; } set { headNode = value; } } [Storable] private string path; [Category("MPISettings")] [Browsable(true)] public string Path { get { return path; } set { path = value; } } [Storable] private int updateInterval; [Category("MPISettings")] [Browsable(true)] public int UpdateInterval { get { return updateInterval; } set { updateInterval = value; } } [Storable] private int cpuPerNode; [Category("MPISettings")] [Browsable(true)] public int CpuPerNode { get { return cpuPerNode; } set { cpuPerNode = value; } } [Storable] private List requestedNodes; [Category("MPISettings")] [Browsable(true)] public List RequestedNodes { get { return requestedNodes; } set { requestedNodes = value; } } [StorableConstructor] protected MPIEngine(bool deserializing) : base(deserializing) { } protected MPIEngine(MPIEngine original, Cloner cloner) : base(original, cloner) { username = original.username; password = original.password; headNode = original.headNode; path = original.path; updateInterval = original.updateInterval; cpuPerNode = original.cpuPerNode; requestedNodes = new List(); foreach (string node in original.requestedNodes) requestedNodes.Add(node); } public MPIEngine() : base() { username = @"user"; password = @"password"; headNode = "blade00.hpc.fh-hagenberg.at"; path = @"C:\public\MPISupport"; updateInterval = 5000; cpuPerNode = 3; requestedNodes = new List(); requestedNodes.Add("BLADE00"); } public override IDeepCloneable Clone(Cloner cloner) { return new MPIEngine(this, cloner); } private IAlgorithm algorithm; public override void Start() { if (ExecutionStack.Count == 1) { ExecutionContext context = ExecutionStack.First() as ExecutionContext; ExecutionContext algorithmContext = context.Parent as ExecutionContext; EngineAlgorithm alg = typeof(ExecutionContext).InvokeMember("parameterizedItem", BindingFlags.GetField | BindingFlags.NonPublic | BindingFlags.Instance, null, algorithmContext, null) as EngineAlgorithm; alg = alg.Clone() as EngineAlgorithm; alg.Engine = new SequentialEngine.SequentialEngine(); algorithm = alg; } base.Start(); } protected override void OnPaused() { base.OnPaused(); Stop(); } protected override void Run(System.Threading.CancellationToken cancellationToken) { if (ExecutionStack.Count == 1) { ExecutionContext context = ExecutionStack.Pop() as ExecutionContext; IScope globalScope = context.Scope; string exec = @"mpiexec"; string args = @"-c " + cpuPerNode + " /genvlist CCP_JOBID " + path + @"\HeuristicLab.MPIAlgorithmRunner-3.3.exe"; IScheduler scheduler = new Scheduler(); scheduler.Connect(headNode); ISchedulerJob job = scheduler.CreateJob(); job.Name = "HeuristicLab.MPIEngine"; foreach (string requestedNode in requestedNodes) job.RequestedNodes.Add(requestedNode); ISchedulerTask task = job.CreateTask(); task.Name = "HeuristicLab.MPIAlgorithmRunner"; task.CommandLine = exec + " " + args; task.StdOutFilePath = "stdout.txt"; task.StdErrFilePath = "stderr.txt"; task.WorkDirectory = path; task.MinimumNumberOfCores = task.MaximumNumberOfCores = cpuPerNode * requestedNodes.Count; job.AddTask(task); scheduler.SubmitJob(job, username, password); try { string address = null; int timeout = 10; while (address == null && timeout > 0) { cancellationToken.ThrowIfCancellationRequested(); ISchedulerJob schedulerJob = scheduler.OpenJob(job.Id); if (schedulerJob != null) { NameValue property = schedulerJob.GetCustomProperties().FirstOrDefault(p => p.Name == "address"); if (property != null) { address = property.Value; } else { System.Threading.Thread.Sleep(1000); timeout--; } } } if (address == null) { throw new Exception("A timeout occurred when starting the MPIAlgorithmRunner"); } NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None); netTCPBinding.TransferMode = TransferMode.Streamed; netTCPBinding.MaxReceivedMessageSize = int.MaxValue; ChannelFactory factory = new ChannelFactory(netTCPBinding, address); IAlgorithmBroker proxy = factory.CreateChannel(); Stream stream = new MemoryStream(); XmlGenerator.Serialize(algorithm, stream); stream = new MemoryStream((stream as MemoryStream).GetBuffer()); proxy.TransmitAlgorithm(stream); proxy.SetUpdateInterval(updateInterval); while (!proxy.IsAlgorithmTerminated()) { cancellationToken.ThrowIfCancellationRequested(); ItemList results = StreamingHelper.StreamItem(proxy.GetResults()) as ItemList; ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection); if (resultCollection != null && results != null) { if (!resultCollection.ContainsKey("MPIResults")) resultCollection.Add(new Result("MPIResults", results)); resultCollection["MPIResults"].Value = results; } System.Threading.Thread.Sleep(updateInterval); } } catch (Exception e) { scheduler.CancelJob(job.Id, "Exception: " + e.GetType()); throw e; } } } } }