Free cookie consent management tool by TermsFeed Policy Generator

source: branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/Program.cs @ 6409

Last change on this file since 6409 was 6402, checked in by svonolfe, 14 years ago

Improved efficiency of algorithm execution in the MPI environment (#1542)

File size: 5.7 KB
RevLine 
[6348]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Optimization;
27using HeuristicLab.Persistence.Default.Xml;
28using System.Threading;
[6354]29using HeuristicLab.Operators.MPISupport;
30using HeuristicLab.Core;
[6388]31using Microsoft.Hpc.Scheduler;
32using System.ServiceModel;
33using System.Net;
[6348]34
35namespace HeuristicLab.MPIAlgorithmRunner {
36  class Program {
[6388]37    private static void SetJobStatus(string address) {
38      Console.WriteLine("Started service... at " + address);
[6348]39
[6388]40      // Discover the job's context from the environment
[6394]41      String headNodeName = System.Environment.GetEnvironmentVariable("CCP_SCHEDULER");
[6388]42      int jobId = System.Convert.ToInt32(System.Environment.GetEnvironmentVariable("CCP_JOBID"));
[6348]43
[6394]44      Console.WriteLine(jobId + "@" + headNodeName);
45
[6388]46      // Connect to the head node and get the job
47      IScheduler scheduler = new Scheduler();
48      scheduler.Connect(headNodeName);
49      ISchedulerJob job = scheduler.OpenJob(jobId);
[6348]50
[6388]51      job.SetCustomProperty("address", address);
52
53      job.Commit();
54    }
55
56    private static string GetAddress(ServiceHost service) {
57      IPHostEntry IPHost = Dns.GetHostByName(Dns.GetHostName());
58      string address = "net.tcp://" + IPHost.AddressList[0].ToString() + ":" + service.ChannelDispatchers[0].Listener.Uri.Port + "/AlgorithmBroker";
59
60      return address;
61    }
62
63    static void Main(string[] args) {
[6348]64      using (new MPI.Environment(ref args)) {
[6394]65        int clients = MPI.Communicator.world.Group.Size - 1;
66        Console.WriteLine("Clients: " + clients);
67
[6354]68        MPI.Communicator communicator = MPI.Communicator.world.Clone() as MPI.Communicator;
69
70        if (communicator.Rank == 0) {
[6388]71          ServiceHost service = AlgorithmBroker.StartService(communicator);
72          AlgorithmBroker broker = (service.SingletonInstance as AlgorithmBroker);
[6354]73
[6388]74          string address = GetAddress(service);
75          SetJobStatus(address);
[6354]76
77          bool[] finished = new bool[clients];
78          int finishedCount = 0;
79
80          while (finishedCount != clients) {
[6401]81            ItemList<ResultCollection> results = new ItemList<ResultCollection>();
82
[6354]83            for (int i = 0; i < clients; i++) {
84              if (!finished[i]) {
85                int client = i + 1;
86                ResultCollection result = communicator.Receive<MPITransportWrapper<ResultCollection>>(client, 1).InnerItem;
87
88                Console.WriteLine("Received result " + result);
89
90                if (results.Count != clients) {
91                  results.Add(result);
92                } else {
93                  results[i] = result;
94                }
95
96                Console.WriteLine("Probing...");
97                if (communicator.ImmediateProbe(client, 2) != null) {
98                  finished[i] = true;
99                  finishedCount++;
100                }
101              }
102            }
103
104            Console.WriteLine("Update results");
[6401]105            lock (broker.resultLocker)
106              broker.Results = results;
[6354]107          }
108
[6388]109          lock (broker.ExitWaitHandle) {
110            broker.Terminated = true;
111          }
112
113          broker.ExitWaitHandle.WaitOne();
[6354]114          Console.WriteLine("Finished.");
[6401]115          Thread.Sleep(1000);
[6388]116          service.Close();
[6354]117        } else {
118          Program p = new Program();
[6398]119          p.StartAlgorithm(communicator);
[6354]120        }
[6348]121      }
122    }
123
[6402]124    private EventWaitHandle waitHandle;
125
[6398]126    public void StartAlgorithm(MPI.Communicator communicator) {
[6354]127      IAlgorithm alg = communicator.Receive<MPITransportWrapper<IAlgorithm>>(0, 0).InnerItem;
[6398]128      int updateInterval = communicator.Receive<int>(0, 1);
[6348]129
[6354]130      Console.WriteLine("Starting algorithm...");
131
[6348]132      alg.Prepare(true);
[6402]133      waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
134      alg.Started += new EventHandler(alg_Started);
[6348]135      alg.Start();
136
[6402]137      waitHandle.WaitOne();
[6354]138
[6402]139      alg.Started -= new EventHandler(alg_Started);
[6354]140
[6402]141      while (alg.ExecutionState != ExecutionState.Stopped) {
142        Console.WriteLine("Pausing alg...");
143        alg.Pause();
[6354]144
[6402]145        while (alg.ExecutionState == ExecutionState.Started) {
146          Thread.Sleep(100);
147        }
[6354]148
[6402]149        communicator.Send<MPITransportWrapper<ResultCollection>>(
150          new MPITransportWrapper<ResultCollection>(alg.Results), 0, 1);
151
152        Console.WriteLine("Sending update...");
153
154        Console.WriteLine("Resuming alg...");
155        if (alg.ExecutionState == ExecutionState.Paused)
[6354]156          alg.Start();
[6402]157        Thread.Sleep(updateInterval);
158      }
[6354]159
160      communicator.Send<int>(communicator.Rank, 0, 2);
161      communicator.Send<MPITransportWrapper<ResultCollection>>(
162          new MPITransportWrapper<ResultCollection>(alg.Results), 0, 1);
[6348]163    }
164
[6402]165    void alg_Started(object sender, EventArgs e) {
[6348]166      waitHandle.Set();
167    }
168  }
169}
Note: See TracBrowser for help on using the repository browser.