Free cookie consent management tool by TermsFeed Policy Generator

source: branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs @ 6401

Last change on this file since 6401 was 6401, checked in by svonolfe, 13 years ago

Implemented result and algorithm streaming (#1542)

File size: 8.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using System.Reflection;
28using System.IO;
29using HeuristicLab.Persistence.Default.Xml;
30using System.Diagnostics;
31using HeuristicLab.Optimization;
32using System.Linq;
33using Microsoft.Hpc.Scheduler;
34using System.ServiceModel;
35using HeuristicLab.MPIAlgorithmRunner;
36using HeuristicLab.Operators.MPISupport;
37using Microsoft.Hpc.Scheduler.Properties;
38using System.Xml;
39using System.ComponentModel;
40
41namespace HeuristicLab.MPIEngine {
42  /// <summary>
43  /// Represents an engine that executes its steps in parallel (if possible) using multiple threads.
44  /// This engine is suitable for parallel processing on shared memory systems which provide multiple cores.
45  /// </summary>
46  [StorableClass]
47  [Item("MPI Engine", "Engine for parallel execution of algorithms using multiple processes (suitable for distributed memory systems with multiple cores).")]
48  public class MPIEngine : Engine {
49    private string username;
50
51    [Category("MPISettings")]
52    [Browsable(true)]
53    public string UserName {
54      get {
55        return username;
56      }
57
58      set {
59        username = value;
60      }
61    }
62
63    private string password;
64
65    [Category("MPISettings")]
66    [Browsable(true)]
67    [PasswordPropertyText(true)]
68    public string Password {
69      get {
70        return password;
71      }
72
73      set {
74        password = value;
75      }
76    }
77
78    [Storable]
79    private string headNode;
80
81    [Category("MPISettings")]
82    [Browsable(true)]
83    public string HeadNode {
84      get {
85        return headNode;
86      }
87
88      set {
89        headNode = value;
90      }
91    }
92
93    [Storable]
94    private string path;
95
96    [Category("MPISettings")]
97    [Browsable(true)]
98    public string Path {
99      get {
100        return path;
101      }
102
103      set {
104        path = value;
105      }
106    }
107
108    [Storable]
109    private int updateInterval;
110
111    [Category("MPISettings")]
112    [Browsable(true)]
113    public int UpdateInterval {
114      get {
115        return updateInterval;
116      }
117
118      set {
119        updateInterval = value;
120      }
121    }
122
123    [Storable]
124    private int cpuPerNode;
125
126    [Category("MPISettings")]
127    [Browsable(true)]
128    public int CpuPerNode {
129      get {
130        return cpuPerNode;
131      }
132
133      set {
134        cpuPerNode = value;
135      }
136    }
137
138    [Storable]
139    private List<string> requestedNodes;
140
141    [Category("MPISettings")]
142    [Browsable(true)]
143    public List<string> RequestedNodes {
144      get {
145        return requestedNodes;
146      }
147
148      set {
149        requestedNodes = value;
150      }
151    }
152
153    [StorableConstructor]
154    protected MPIEngine(bool deserializing) : base(deserializing) { }
155
156    protected MPIEngine(MPIEngine original, Cloner cloner) : base(original, cloner) {
157      username = original.username;
158      password = original.password;
159      headNode = original.headNode;
160      path = original.path;
161      updateInterval = original.updateInterval;
162      cpuPerNode = original.cpuPerNode;
163      requestedNodes = new List<string>();
164      foreach (string node in original.requestedNodes)
165        requestedNodes.Add(node);
166    }
167
168    public MPIEngine() : base() {
169      username = @"user";
170      password = @"password";
171      headNode = "blade00.hpc.fh-hagenberg.at";
172      path = @"C:\public\MPISupport";
173      updateInterval = 5000;
174      cpuPerNode = 3;
175      requestedNodes = new List<string>();
176      requestedNodes.Add("BLADE00");
177    }
178
179    public override IDeepCloneable Clone(Cloner cloner) {
180      return new MPIEngine(this, cloner);
181    }
182
183    private IAlgorithm algorithm;
184
185    public override void Start() {
186      if (ExecutionStack.Count == 1) {
187        ExecutionContext context = ExecutionStack.First() as ExecutionContext;
188
189        ExecutionContext algorithmContext = context.Parent as ExecutionContext;
190
191        EngineAlgorithm alg = typeof(ExecutionContext).InvokeMember("parameterizedItem",
192          BindingFlags.GetField | BindingFlags.NonPublic |
193          BindingFlags.Instance, null, algorithmContext, null) as EngineAlgorithm;
194
195        alg = alg.Clone() as EngineAlgorithm;
196        alg.Engine = new SequentialEngine.SequentialEngine();
197
198        algorithm = alg;
199      }
200
201      base.Start();
202    }
203
204    protected override void OnPaused() {
205      base.OnPaused();
206
207      Stop();
208    }
209
210    protected override void Run(System.Threading.CancellationToken cancellationToken) {
211      if (ExecutionStack.Count == 1) {
212        ExecutionContext context = ExecutionStack.Pop() as ExecutionContext;
213
214        IScope globalScope = context.Scope;
215
216        string exec = @"mpiexec";
217        string args = @"-c " + cpuPerNode + " /genvlist CCP_JOBID " + path + @"\HeuristicLab.MPIAlgorithmRunner-3.3.exe";
218
219        IScheduler scheduler = new Scheduler();
220        scheduler.Connect(headNode);
221
222        ISchedulerJob job = scheduler.CreateJob();
223        job.Name = "HeuristicLab.MPIEngine";
224        foreach (string requestedNode in requestedNodes)
225          job.RequestedNodes.Add(requestedNode);
226        ISchedulerTask task = job.CreateTask();
227        task.Name = "HeuristicLab.MPIAlgorithmRunner";
228        task.CommandLine = exec + " " + args;
229        task.StdOutFilePath = "stdout.txt";
230        task.StdErrFilePath = "stderr.txt";
231        task.WorkDirectory = path;
232        task.MinimumNumberOfCores = task.MaximumNumberOfCores = cpuPerNode * requestedNodes.Count;
233        job.AddTask(task);
234
235        scheduler.SubmitJob(job, username, password);
236
237        try {
238          string address = null;
239          int timeout = 10;
240
241          while (address == null && timeout > 0) {
242            cancellationToken.ThrowIfCancellationRequested();
243
244            ISchedulerJob schedulerJob = scheduler.OpenJob(job.Id);
245            if (schedulerJob != null) {
246              NameValue property = schedulerJob.GetCustomProperties().FirstOrDefault(p => p.Name == "address");
247
248              if (property != null) {
249                address = property.Value;
250              } else {
251                System.Threading.Thread.Sleep(1000);
252                timeout--;
253              }
254            }           
255          }
256
257          if (address == null) {
258            throw new Exception("A timeout occurred when starting the MPIAlgorithmRunner");
259          }
260
261          NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None);
262          netTCPBinding.TransferMode = TransferMode.Streamed;
263          netTCPBinding.MaxReceivedMessageSize = int.MaxValue;
264          ChannelFactory<IAlgorithmBroker> factory = new ChannelFactory<IAlgorithmBroker>(netTCPBinding, address);
265          IAlgorithmBroker proxy = factory.CreateChannel();
266
267          Stream stream = new MemoryStream();
268          XmlGenerator.Serialize(algorithm, stream);
269          stream = new MemoryStream((stream as MemoryStream).GetBuffer());
270
271          proxy.TransmitAlgorithm(stream);
272          proxy.SetUpdateInterval(updateInterval);
273
274          while (!proxy.IsAlgorithmTerminated()) {
275            cancellationToken.ThrowIfCancellationRequested();
276
277            ItemList<ResultCollection> results = StreamingHelper.StreamItem(proxy.GetResults()) as ItemList<ResultCollection>;
278
279            ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection);
280
281            if (resultCollection != null && results != null) {
282              if (!resultCollection.ContainsKey("MPIResults"))
283                resultCollection.Add(new Result("MPIResults", results));
284
285              resultCollection["MPIResults"].Value = results;
286            }
287
288            System.Threading.Thread.Sleep(updateInterval);
289          }
290        }
291        catch (Exception e) {
292          scheduler.CancelJob(job.Id, "Exception: " + e.GetType());
293          throw e;
294        }
295      }
296    }
297  }
298}
Note: See TracBrowser for help on using the repository browser.