Free cookie consent management tool by TermsFeed Policy Generator

source: branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs @ 6394

Last change on this file since 6394 was 6394, checked in by svonolfe, 13 years ago

Added first working version of the MPI engine (#1542)

File size: 6.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using HeuristicLab.Common;
25using HeuristicLab.Core;
26using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
27using System.Reflection;
28using System.IO;
29using HeuristicLab.Persistence.Default.Xml;
30using System.Diagnostics;
31using HeuristicLab.Optimization;
32using System.Linq;
33using Microsoft.Hpc.Scheduler;
34using System.ServiceModel;
35using HeuristicLab.MPIAlgorithmRunner;
36using HeuristicLab.Operators.MPISupport;
37using Microsoft.Hpc.Scheduler.Properties;
38using System.Xml;
39
40namespace HeuristicLab.MPIEngine {
41  /// <summary>
42  /// Represents an engine that executes its steps in parallel (if possible) using multiple threads.
43  /// This engine is suitable for parallel processing on shared memory systems which provide multiple cores.
44  /// </summary>
45  [StorableClass]
46  [Item("MPI Engine", "Engine for parallel execution of algorithms using multiple processes (suitable for distributed memory systems with multiple cores).")]
47  public class MPIEngine : Engine {
48    [StorableConstructor]
49    protected MPIEngine(bool deserializing) : base(deserializing) { }
50    protected MPIEngine(MPIEngine original, Cloner cloner) : base(original, cloner) { }
51    public MPIEngine() : base() { }
52
53    public override IDeepCloneable Clone(Cloner cloner) {
54      return new MPIEngine(this, cloner);
55    }
56
57    private IAlgorithm algorithm;
58
59    public override void Start() {
60      if (ExecutionStack.Count == 1) {
61        ExecutionContext context = ExecutionStack.First() as ExecutionContext;
62
63        ExecutionContext algorithmContext = context.Parent as ExecutionContext;
64
65        EngineAlgorithm alg = typeof(ExecutionContext).InvokeMember("parameterizedItem",
66          BindingFlags.GetField | BindingFlags.NonPublic |
67          BindingFlags.Instance, null, algorithmContext, null) as EngineAlgorithm;
68
69        alg = alg.Clone() as EngineAlgorithm;
70        alg.Engine = new SequentialEngine.SequentialEngine();
71
72        algorithm = alg;
73      }
74
75      base.Start();
76    }
77
78    protected override void OnPaused() {
79      base.OnPaused();
80
81      Stop();
82    }
83
84    protected override void Run(System.Threading.CancellationToken cancellationToken) {
85      string username = @"user";
86      string password = @"password";
87      string headNode = "blade00.hpc.fh-hagenberg.at";
88      List<string> requestedNodes = new List<string>();
89      requestedNodes.Add("BLADE00");
90      string path = @"C:\public\MPISupport";
91      int cpuPerNode = 8;
92
93      if (ExecutionStack.Count == 1) {
94        ExecutionContext context = ExecutionStack.Pop() as ExecutionContext;
95
96        IScope globalScope = context.Scope;
97
98        string exec = @"mpiexec";
99        string args = @"-c " + cpuPerNode + " /genvlist CCP_JOBID " + path + @"\HeuristicLab.MPIAlgorithmRunner-3.3.exe";
100
101        IScheduler scheduler = new Scheduler();
102        scheduler.Connect(headNode);
103
104        ISchedulerJob job = scheduler.CreateJob();
105        job.Name = "HeuristicLab.MPIEngine";
106        foreach (string requestedNode in requestedNodes)
107          job.RequestedNodes.Add(requestedNode);
108        ISchedulerTask task = job.CreateTask();
109        task.Name = "HeuristicLab.MPIAlgorithmRunner";
110        task.CommandLine = exec + " " + args;
111        task.StdOutFilePath = "stdout.txt";
112        task.StdErrFilePath = "stderr.txt";
113        task.WorkDirectory = path;
114        task.MinimumNumberOfCores = task.MaximumNumberOfCores = 3;
115        job.AddTask(task);
116
117        scheduler.SubmitJob(job, username, "");
118
119        try {
120          string address = null;
121          int timeout = 10;
122
123          while (address == null && timeout > 0) {
124            cancellationToken.ThrowIfCancellationRequested();
125
126            ISchedulerJob schedulerJob = scheduler.OpenJob(job.Id);
127            if (schedulerJob != null) {
128              NameValue property = schedulerJob.GetCustomProperties().FirstOrDefault(p => p.Name == "address");
129
130              if (property != null) {
131                address = property.Value;
132              } else {
133                System.Threading.Thread.Sleep(1000);
134                timeout--;
135              }
136            }           
137          }
138
139          if (address == null) {
140            throw new Exception("A timeout occurred when starting the MPIAlgorithmRunner");
141          }
142
143          NetTcpBinding netTCPBinding = new NetTcpBinding(SecurityMode.None);
144          XmlDictionaryReaderQuotas quotas = new XmlDictionaryReaderQuotas();
145          quotas.MaxArrayLength = int.MaxValue;
146          netTCPBinding.ReaderQuotas = quotas;
147          netTCPBinding.MaxReceivedMessageSize = int.MaxValue;
148          ChannelFactory<IAlgorithmBroker> factory = new ChannelFactory<IAlgorithmBroker>(netTCPBinding, address);
149          IAlgorithmBroker proxy = factory.CreateChannel();
150
151          proxy.TransmitAlgorithm(new MPITransportWrapper<IAlgorithm>(algorithm));
152
153          while (!proxy.IsAlgorithmTerminated()) {
154            cancellationToken.ThrowIfCancellationRequested();
155
156            ItemList<ResultCollection> results = proxy.GetResults().InnerItem;
157
158            ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection);
159
160            if (resultCollection != null && results != null) {
161              if (!resultCollection.ContainsKey("MPIResults"))
162                resultCollection.Add(new Result("MPIResults", results));
163
164              resultCollection["MPIResults"].Value = results;
165            }
166
167            System.Threading.Thread.Sleep(5000);
168          }
169        }
170        catch (Exception e) {
171          scheduler.CancelJob(job.Id, "Exception: " + e.GetType());
172          throw e;
173        }
174        finally {
175           /*ISchedulerJob schedulerJob = scheduler.OpenJob(job.Id);
176           if (schedulerJob != null &&
177             (schedulerJob.State == JobState.Running || schedulerJob.State == JobState.Queued)) {
178               scheduler.CancelJob(job.Id, "Cancelled");           
179           } */
180        }
181      }
182    }
183  }
184}
Note: See TracBrowser for help on using the repository browser.