Free cookie consent management tool by TermsFeed Policy Generator

source: branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs @ 6395

Last change on this file since 6395 was 6395, checked in by svonolfe, 13 years ago

Added cloning and storing to the MPIEngine (#1542)

File size: 7.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using System.Reflection;
28using System.IO;
29using HeuristicLab.Persistence.Default.Xml;
30using System.Diagnostics;
31using HeuristicLab.Optimization;
32using System.Linq;
33using Microsoft.Hpc.Scheduler;
34using System.ServiceModel;
35using HeuristicLab.MPIAlgorithmRunner;
36using HeuristicLab.Operators.MPISupport;
37using Microsoft.Hpc.Scheduler.Properties;
38using System.Xml;
39
40namespace HeuristicLab.MPIEngine {
41  /// <summary>
42  /// Represents an engine that executes its steps in parallel (if possible) using multiple threads.
43  /// This engine is suitable for parallel processing on shared memory systems which provide multiple cores.
44  /// </summary>
45  [StorableClass]
46  [Item("MPI Engine", "Engine for parallel execution of algorithms using multiple processes (suitable for distributed memory systems with multiple cores).")]
47  public class MPIEngine : Engine {
48    private string username;
49    private string password;
50
51    [Storable]
52    private string headNode;
53
54    [Storable]
55    private string path;
56
57    [Storable]
58    private int cpuPerNode;
59
60    [Storable]
61    private List<string> requestedNodes;
62
63    [StorableConstructor]
64    protected MPIEngine(bool deserializing) : base(deserializing) { }
65
66    protected MPIEngine(MPIEngine original, Cloner cloner) : base(original, cloner) {
67      username = original.username;
68      password = original.password;
69      headNode = original.headNode;
70      path = original.path;
71      cpuPerNode = original.cpuPerNode;
72      requestedNodes = new List<string>();
73      foreach (string node in original.requestedNodes)
74        requestedNodes.Add(node);
75    }
76
77    public MPIEngine() : base() {
78      username = @"user";
79      password = @"password";
80      headNode = "blade00.hpc.fh-hagenberg.at";
81      path = @"C:\public\MPISupport";
82      cpuPerNode = 8;
83      requestedNodes = new List<string>();
84      requestedNodes.Add("BLADE00");
85    }
86
87    public override IDeepCloneable Clone(Cloner cloner) {
88      return new MPIEngine(this, cloner);
89    }
90
91    private IAlgorithm algorithm;
92
93    public override void Start() {
94      if (ExecutionStack.Count == 1) {
95        ExecutionContext context = ExecutionStack.First() as ExecutionContext;
96
97        ExecutionContext algorithmContext = context.Parent as ExecutionContext;
98
99        EngineAlgorithm alg = typeof(ExecutionContext).InvokeMember("parameterizedItem",
100          BindingFlags.GetField | BindingFlags.NonPublic |
101          BindingFlags.Instance, null, algorithmContext, null) as EngineAlgorithm;
102
103        alg = alg.Clone() as EngineAlgorithm;
104        alg.Engine = new SequentialEngine.SequentialEngine();
105
106        algorithm = alg;
107      }
108
109      base.Start();
110    }
111
112    protected override void OnPaused() {
113      base.OnPaused();
114
115      Stop();
116    }
117
118    protected override void Run(System.Threading.CancellationToken cancellationToken) {
119      if (ExecutionStack.Count == 1) {
120        ExecutionContext context = ExecutionStack.Pop() as ExecutionContext;
121
122        IScope globalScope = context.Scope;
123
124        string exec = @"mpiexec";
125        string args = @"-c " + cpuPerNode + " /genvlist CCP_JOBID " + path + @"\HeuristicLab.MPIAlgorithmRunner-3.3.exe";
126
127        IScheduler scheduler = new Scheduler();
128        scheduler.Connect(headNode);
129
130        ISchedulerJob job = scheduler.CreateJob();
131        job.Name = "HeuristicLab.MPIEngine";
132        foreach (string requestedNode in requestedNodes)
133          job.RequestedNodes.Add(requestedNode);
134        ISchedulerTask task = job.CreateTask();
135        task.Name = "HeuristicLab.MPIAlgorithmRunner";
136        task.CommandLine = exec + " " + args;
137        task.StdOutFilePath = "stdout.txt";
138        task.StdErrFilePath = "stderr.txt";
139        task.WorkDirectory = path;
140        task.MinimumNumberOfCores = task.MaximumNumberOfCores = 3;
141        job.AddTask(task);
142
143        scheduler.SubmitJob(job, username, password);
144
145        try {
146          string address = null;
147          int timeout = 10;
148
149          while (address == null && timeout > 0) {
150            cancellationToken.ThrowIfCancellationRequested();
151
152            ISchedulerJob schedulerJob = scheduler.OpenJob(job.Id);
153            if (schedulerJob != null) {
154              NameValue property = schedulerJob.GetCustomProperties().FirstOrDefault(p => p.Name == "address");
155
156              if (property != null) {
157                address = property.Value;
158              } else {
159                System.Threading.Thread.Sleep(1000);
160                timeout--;
161              }
162            }           
163          }
164
165          if (address == null) {
166            throw new Exception("A timeout occurred when starting the MPIAlgorithmRunner");
167          }
168
169          NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None);
170          XmlDictionaryReaderQuotas quotas = new XmlDictionaryReaderQuotas();
171          quotas.MaxArrayLength = int.MaxValue;
172          netTCPBinding.ReaderQuotas = quotas;
173          netTCPBinding.MaxReceivedMessageSize = int.MaxValue;
174          ChannelFactory<IAlgorithmBroker> factory = new ChannelFactory<IAlgorithmBroker>(netTCPBinding, address);
175          IAlgorithmBroker proxy = factory.CreateChannel();
176
177          proxy.TransmitAlgorithm(new MPITransportWrapper<IAlgorithm>(algorithm));
178
179          while (!proxy.IsAlgorithmTerminated()) {
180            cancellationToken.ThrowIfCancellationRequested();
181
182            ItemList<ResultCollection> results = proxy.GetResults().InnerItem;
183
184            ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection);
185
186            if (resultCollection != null && results != null) {
187              if (!resultCollection.ContainsKey("MPIResults"))
188                resultCollection.Add(new Result("MPIResults", results));
189
190              resultCollection["MPIResults"].Value = results;
191            }
192
193            System.Threading.Thread.Sleep(5000);
194          }
195        }
196        catch (Exception e) {
197          scheduler.CancelJob(job.Id, "Exception: " + e.GetType());
198          throw e;
199        }
200        finally {
201           /*ISchedulerJob schedulerJob = scheduler.OpenJob(job.Id);
202           if (schedulerJob != null &&
203             (schedulerJob.State == JobState.Running || schedulerJob.State == JobState.Queued)) {
204               scheduler.CancelJob(job.Id, "Cancelled");           
205           } */
206        }
207      }
208    }
209  }
210}
Note: See TracBrowser for help on using the repository browser.