Free cookie consent management tool by TermsFeed Policy Generator

source: branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/Program.cs @ 11247

Last change on this file since 11247 was 7566, checked in by svonolfe, 13 years ago

Adapted MPI operators (#1542)

File size: 9.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Optimization;
27using HeuristicLab.Persistence.Default.Xml;
28using System.Threading;
29using Microsoft.Hpc.Scheduler;
30using System.Net;
31using System.ServiceModel;
32using HeuristicLab.Operators.MPISupport;
33using HeuristicLab.Core;
34using System.Diagnostics;
35using HeuristicLab.Common;
36using System.IO;
37
38namespace HeuristicLab.MPIAlgorithmRunner {
39  class Program {
40    EventWaitHandle waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
41
42    static void Main(string[] args) {
43      if (args.Length == 2) {
44        string fileName = args[0];
45        int calcTime;
46        if (int.TryParse(args[1], out calcTime)) {
47         using (new MPI.Environment(ref args)) {
48            if (MPI.Communicator.world.Rank != 0) {
49              Program p = new Program();
50              p.StartAlgorithm(fileName,calcTime);
51            }
52          }                     
53        } else {
54          string resultName = args[1];
55
56          using (new MPI.Environment(ref args)) {
57            if (MPI.Communicator.world.Rank != 0) {
58              Program p = new Program();
59              p.StartAlgorithm(fileName, resultName + MPI.Communicator.world.Rank + ".hl");
60            }
61          }
62        }       
63      } else {
64        using (new MPI.Environment(ref args)) {
65          int clients = MPI.Communicator.world.Group.Size - 1;
66          Console.WriteLine("Clients: " + clients);
67
68          MPI.Communicator communicator = MPI.Communicator.world.Clone() as MPI.Communicator;
69
70          if (communicator.Rank == 0) {
71            ServiceHost service = AlgorithmBroker.StartService(communicator);
72            AlgorithmBroker broker = (service.SingletonInstance as AlgorithmBroker);
73
74            string address = GetAddress(service);
75            SetJobStatus(address);
76
77            bool[] finished = new bool[clients];
78            int finishedCount = 0;
79
80            while (finishedCount != clients) {
81              ItemList<ResultCollection> results = new ItemList<ResultCollection>();
82
83              for (int i = 0; i < clients; i++) {
84                if (!finished[i]) {
85                  int client = i + 1;
86                  ResultCollection result = communicator.Receive<MPITransportWrapper<ResultCollection>>(client, 1).InnerItem;
87
88                  Console.WriteLine("Received result " + result);
89
90                  if (results.Count != clients) {
91                    results.Add(result);
92                  } else {
93                    results[i] = result;
94                  }
95
96                  Console.WriteLine("Probing...");
97                  if (communicator.ImmediateProbe(client, 2) != null) {
98                    finished[i] = true;
99                    finishedCount++;
100                  }
101                }
102              }
103
104              Console.WriteLine("Update results");
105              lock (broker.resultLocker)
106                broker.Results = results;
107            }
108
109            lock (broker.ExitWaitHandle) {
110              broker.Terminated = true;
111            }
112
113            broker.ExitWaitHandle.WaitOne();
114            Console.WriteLine("Finished.");
115            Thread.Sleep(1000);
116            service.Close();
117          } else {
118              Program p = new Program();
119              p.StartAlgorithm(communicator);
120          }
121        }
122      }       
123    }
124
125    private static void SetJobStatus(string address) {
126      Console.WriteLine("Started service... at " + address);
127
128      // Discover the job's context from the environment
129      String headNodeName = System.Environment.GetEnvironmentVariable("CCP_SCHEDULER");
130      int jobId = System.Convert.ToInt32(System.Environment.GetEnvironmentVariable("CCP_JOBID"));
131
132      Console.WriteLine(jobId + "@" + headNodeName);
133
134      // Connect to the head node and get the job
135      IScheduler scheduler = new Scheduler();
136      scheduler.Connect(headNodeName);
137      ISchedulerJob job = scheduler.OpenJob(jobId);
138
139      job.SetCustomProperty("address", address);
140
141      job.Commit();
142    }
143
144    private static string GetAddress(ServiceHost service) {
145      IPHostEntry IPHost = Dns.GetHostByName(Dns.GetHostName());
146      string address = "net.tcp://" + IPHost.AddressList[0].ToString() + ":" + service.ChannelDispatchers[0].Listener.Uri.Port + "/AlgorithmBroker";
147
148      return address;
149    }
150
151    public void StartAlgorithm(MPI.Communicator communicator) {
152      IAlgorithm alg = communicator.Receive<MPITransportWrapper<IAlgorithm>>(0, 0).InnerItem;
153      int updateInterval = communicator.Receive<int>(0, 1);
154
155      Console.WriteLine("Starting algorithm...");
156
157      alg.Prepare(true);
158      waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
159      alg.Started += new EventHandler(alg_Started);
160      alg.Start();
161
162      waitHandle.WaitOne();
163
164      alg.Started -= new EventHandler(alg_Started);
165
166      while (alg.ExecutionState != ExecutionState.Stopped) {
167        Console.WriteLine("Pausing alg...");
168        alg.Pause();
169
170        while (alg.ExecutionState == ExecutionState.Started) {
171          Thread.Sleep(100);
172        }
173
174        communicator.Send<MPITransportWrapper<ResultCollection>>(
175          new MPITransportWrapper<ResultCollection>(alg.Results), 0, 1);
176
177        Console.WriteLine("Sending update...");
178
179        Console.WriteLine("Resuming alg...");
180        if (alg.ExecutionState == ExecutionState.Paused)
181          alg.Start();
182        Thread.Sleep(updateInterval);
183      }
184
185      communicator.Send<int>(communicator.Rank, 0, 2);
186      communicator.Send<MPITransportWrapper<ResultCollection>>(
187          new MPITransportWrapper<ResultCollection>(alg.Results), 0, 1);
188    }
189
190    void alg_Started(object sender, EventArgs e) {
191      waitHandle.Set();
192    }
193
194    public void StartAlgorithm(string fileName, string resultName) {
195      IAlgorithm alg = XmlParser.Deserialize<HeuristicLab.Optimization.IAlgorithm>(fileName);
196
197      alg.Stopped += new EventHandler(algorithm_Stopped);
198      waitHandle.Reset();
199
200      alg.Prepare(true);
201      alg.Start();
202      Console.WriteLine("ALG STARTED");
203      Stopwatch sw = new Stopwatch();
204      sw.Start();
205
206      if (MPI.Communicator.world.Rank == 1) {
207        Thread.Sleep(10000);
208        Thread t = new Thread(delegate() {
209          while (true) {
210            if (alg.ExecutionState == Core.ExecutionState.Started) {
211              alg.Pause();
212
213              while (alg.ExecutionState == Core.ExecutionState.Started)
214                Thread.Sleep(100);
215
216              if (alg.Results.ContainsKey("Best valid VRP Solution Distance")) {
217                Console.WriteLine("BestDistance: " + alg.Results["Best valid VRP Solution Distance"].Value.ToString());
218              }
219
220              if (alg.Results.ContainsKey("Best valid VRP Solution VehicleUtilization")) {
221                Console.WriteLine("BestVehicles: " + alg.Results["Best valid VRP Solution VehicleUtilization"].Value.ToString());
222              }
223
224              XmlGenerator.Serialize(alg, resultName);
225              alg.Start();
226            }
227
228            Thread.Sleep(300000);
229          }
230        });
231        t.IsBackground = true;
232        t.Start();
233      }
234
235      waitHandle.WaitOne();
236
237      Console.WriteLine("ALG STOPPED");
238      sw.Stop();
239      Console.WriteLine("TIME: " + sw.ElapsedMilliseconds + "ms");
240
241      if (alg.Results.ContainsKey("Best valid VRP Solution Distance") && alg.Results.ContainsKey("Best valid VRP Solution VehicleUtilization")) {
242        Console.WriteLine("BestDistance: " + alg.Results["Best valid VRP Solution Distance"].Value.ToString() + ", " +
243                          "BestVehicles: " + alg.Results["Best valid VRP Solution VehicleUtilization"].Value.ToString());
244      }
245
246      XmlGenerator.Serialize(alg, resultName);
247    }
248
249    void algorithm_Stopped(object sender, EventArgs e) {
250      waitHandle.Set();
251    }
252
253    public void StartAlgorithm(string fileName, int calculationTime) {
254      IAlgorithm alg = XmlParser.Deserialize<HeuristicLab.Optimization.IAlgorithm>(fileName);
255      IOperator breakpoint = ((alg.Parameters["Analyzer"] as IValueParameter).Value as IOperator);
256
257      waitHandle.Reset();
258
259      alg.Start();
260
261      Thread.Sleep(calculationTime);
262
263      if (alg.ExecutionState == ExecutionState.Started) {
264        waitHandle.Reset();
265        alg.Paused += new EventHandler(alg_Paused);
266        breakpoint.Breakpoint = true;
267
268        waitHandle.WaitOne();
269
270        breakpoint.Breakpoint = false;
271        alg.Paused -= new EventHandler(alg_Paused);
272      }
273
274      Mutex mutex = new Mutex(false, "ResultFile");
275      mutex.WaitOne();
276      Console.WriteLine("SERIALIZING");
277      XmlGenerator.Serialize(alg, fileName);
278      MPI.Communicator.world.Abort(0);
279    }
280
281    void alg_Paused(object sender, EventArgs e) {
282      waitHandle.Set();
283    }
284  }
285}
Note: See TracBrowser for help on using the repository browser.