Free cookie consent management tool by TermsFeed Policy Generator

source: branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs @ 7000

Last change on this file since 7000 was 6401, checked in by svonolfe, 13 years ago

Implemented result and algorithm streaming (#1542)

File size: 8.8 KB
RevLine 
[6349]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
[6354]27using System.Reflection;
28using System.IO;
29using HeuristicLab.Persistence.Default.Xml;
30using System.Diagnostics;
31using HeuristicLab.Optimization;
32using System.Linq;
[6388]33using Microsoft.Hpc.Scheduler;
34using System.ServiceModel;
35using HeuristicLab.MPIAlgorithmRunner;
36using HeuristicLab.Operators.MPISupport;
[6394]37using Microsoft.Hpc.Scheduler.Properties;
38using System.Xml;
[6398]39using System.ComponentModel;
[6349]40
41namespace HeuristicLab.MPIEngine {
42  /// <summary>
43  /// Represents an engine that executes its steps in parallel (if possible) using multiple threads.
44  /// This engine is suitable for parallel processing on shared memory systems which provide multiple cores.
45  /// </summary>
46  [StorableClass]
47  [Item("MPI Engine", "Engine for parallel execution of algorithms using multiple processes (suitable for distributed memory systems with multiple cores).")]
48  public class MPIEngine : Engine {
[6395]49    private string username;
[6398]50
51    [Category("MPISettings")]
52    [Browsable(true)]
53    public string UserName {
54      get {
55        return username;
56      }
57
58      set {
59        username = value;
60      }
61    }
62
[6395]63    private string password;
64
[6398]65    [Category("MPISettings")]
66    [Browsable(true)]
67    [PasswordPropertyText(true)]
68    public string Password {
69      get {
70        return password;
71      }
72
73      set {
74        password = value;
75      }
76    }
77
[6395]78    [Storable]
79    private string headNode;
80
[6398]81    [Category("MPISettings")]
82    [Browsable(true)]
83    public string HeadNode {
84      get {
85        return headNode;
86      }
87
88      set {
89        headNode = value;
90      }
91    }
92
[6395]93    [Storable]
94    private string path;
95
[6398]96    [Category("MPISettings")]
97    [Browsable(true)]
98    public string Path {
99      get {
100        return path;
101      }
102
103      set {
104        path = value;
105      }
106    }
107
[6395]108    [Storable]
[6398]109    private int updateInterval;
110
111    [Category("MPISettings")]
112    [Browsable(true)]
113    public int UpdateInterval {
114      get {
115        return updateInterval;
116      }
117
118      set {
119        updateInterval = value;
120      }
121    }
122
123    [Storable]
[6395]124    private int cpuPerNode;
125
[6398]126    [Category("MPISettings")]
127    [Browsable(true)]
128    public int CpuPerNode {
129      get {
130        return cpuPerNode;
131      }
132
133      set {
134        cpuPerNode = value;
135      }
136    }
137
[6395]138    [Storable]
139    private List<string> requestedNodes;
140
[6398]141    [Category("MPISettings")]
142    [Browsable(true)]
143    public List<string> RequestedNodes {
144      get {
145        return requestedNodes;
146      }
147
148      set {
149        requestedNodes = value;
150      }
151    }
152
[6349]153    [StorableConstructor]
154    protected MPIEngine(bool deserializing) : base(deserializing) { }
155
[6395]156    protected MPIEngine(MPIEngine original, Cloner cloner) : base(original, cloner) {
157      username = original.username;
158      password = original.password;
159      headNode = original.headNode;
160      path = original.path;
[6398]161      updateInterval = original.updateInterval;
[6395]162      cpuPerNode = original.cpuPerNode;
163      requestedNodes = new List<string>();
164      foreach (string node in original.requestedNodes)
165        requestedNodes.Add(node);
166    }
167
168    public MPIEngine() : base() {
169      username = @"user";
170      password = @"password";
171      headNode = "blade00.hpc.fh-hagenberg.at";
172      path = @"C:\public\MPISupport";
[6398]173      updateInterval = 5000;
174      cpuPerNode = 3;
[6395]175      requestedNodes = new List<string>();
176      requestedNodes.Add("BLADE00");
177    }
178
[6349]179    public override IDeepCloneable Clone(Cloner cloner) {
180      return new MPIEngine(this, cloner);
181    }
182
[6388]183    private IAlgorithm algorithm;
[6354]184
185    public override void Start() {
186      if (ExecutionStack.Count == 1) {
187        ExecutionContext context = ExecutionStack.First() as ExecutionContext;
188
189        ExecutionContext algorithmContext = context.Parent as ExecutionContext;
190
191        EngineAlgorithm alg = typeof(ExecutionContext).InvokeMember("parameterizedItem",
192          BindingFlags.GetField | BindingFlags.NonPublic |
193          BindingFlags.Instance, null, algorithmContext, null) as EngineAlgorithm;
194
195        alg = alg.Clone() as EngineAlgorithm;
196        alg.Engine = new SequentialEngine.SequentialEngine();
197
[6388]198        algorithm = alg;
[6354]199      }
200
201      base.Start();
[6349]202    }
203
[6394]204    protected override void OnPaused() {
205      base.OnPaused();
206
207      Stop();
208    }
209
[6354]210    protected override void Run(System.Threading.CancellationToken cancellationToken) {
211      if (ExecutionStack.Count == 1) {
212        ExecutionContext context = ExecutionStack.Pop() as ExecutionContext;
[6349]213
[6354]214        IScope globalScope = context.Scope;
[6349]215
[6388]216        string exec = @"mpiexec";
[6394]217        string args = @"-c " + cpuPerNode + " /genvlist CCP_JOBID " + path + @"\HeuristicLab.MPIAlgorithmRunner-3.3.exe";
[6354]218
[6388]219        IScheduler scheduler = new Scheduler();
[6394]220        scheduler.Connect(headNode);
[6354]221
[6388]222        ISchedulerJob job = scheduler.CreateJob();
223        job.Name = "HeuristicLab.MPIEngine";
[6394]224        foreach (string requestedNode in requestedNodes)
225          job.RequestedNodes.Add(requestedNode);
[6388]226        ISchedulerTask task = job.CreateTask();
227        task.Name = "HeuristicLab.MPIAlgorithmRunner";
228        task.CommandLine = exec + " " + args;
229        task.StdOutFilePath = "stdout.txt";
230        task.StdErrFilePath = "stderr.txt";
[6401]231        task.WorkDirectory = path;
232        task.MinimumNumberOfCores = task.MaximumNumberOfCores = cpuPerNode * requestedNodes.Count;
[6388]233        job.AddTask(task);
[6354]234
[6395]235        scheduler.SubmitJob(job, username, password);
[6354]236
[6388]237        try {
238          string address = null;
239          int timeout = 10;
[6354]240
[6388]241          while (address == null && timeout > 0) {
[6394]242            cancellationToken.ThrowIfCancellationRequested();
243
[6388]244            ISchedulerJob schedulerJob = scheduler.OpenJob(job.Id);
245            if (schedulerJob != null) {
[6394]246              NameValue property = schedulerJob.GetCustomProperties().FirstOrDefault(p => p.Name == "address");
247
248              if (property != null) {
249                address = property.Value;
[6388]250              } else {
251                System.Threading.Thread.Sleep(1000);
252                timeout--;
[6349]253              }
[6394]254            }           
[6349]255          }
256
[6388]257          if (address == null) {
258            throw new Exception("A timeout occurred when starting the MPIAlgorithmRunner");
259          }
[6354]260
[6388]261          NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None);
[6401]262          netTCPBinding.TransferMode = TransferMode.Streamed;
[6394]263          netTCPBinding.MaxReceivedMessageSize = int.MaxValue;
[6388]264          ChannelFactory<IAlgorithmBroker> factory = new ChannelFactory<IAlgorithmBroker>(netTCPBinding, address);
265          IAlgorithmBroker proxy = factory.CreateChannel();
266
[6401]267          Stream stream = new MemoryStream();
268          XmlGenerator.Serialize(algorithm, stream);
269          stream = new MemoryStream((stream as MemoryStream).GetBuffer());
[6388]270
[6401]271          proxy.TransmitAlgorithm(stream);
272          proxy.SetUpdateInterval(updateInterval);
273
[6388]274          while (!proxy.IsAlgorithmTerminated()) {
[6394]275            cancellationToken.ThrowIfCancellationRequested();
276
[6401]277            ItemList<ResultCollection> results = StreamingHelper.StreamItem(proxy.GetResults()) as ItemList<ResultCollection>;
[6388]278
279            ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection);
280
281            if (resultCollection != null && results != null) {
282              if (!resultCollection.ContainsKey("MPIResults"))
283                resultCollection.Add(new Result("MPIResults", results));
284
285              resultCollection["MPIResults"].Value = results;
286            }
287
[6398]288            System.Threading.Thread.Sleep(updateInterval);
[6388]289          }
290        }
291        catch (Exception e) {
[6394]292          scheduler.CancelJob(job.Id, "Exception: " + e.GetType());
[6388]293          throw e;
294        }
[6349]295      }
296    }
297  }
298}
Note: See TracBrowser for help on using the repository browser.