Free cookie consent management tool by TermsFeed Policy Generator

source: branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/Program.cs @ 6401

Last change on this file since 6401 was 6401, checked in by svonolfe, 13 years ago

Implemented result and algorithm streaming (#1542)

File size: 5.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Optimization;
27using HeuristicLab.Persistence.Default.Xml;
28using System.Threading;
29using HeuristicLab.Operators.MPISupport;
30using HeuristicLab.Core;
31using Microsoft.Hpc.Scheduler;
32using System.ServiceModel;
33using System.Net;
34
35namespace HeuristicLab.MPIAlgorithmRunner {
36  class Program {
37    EventWaitHandle waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
38
39    private static void SetJobStatus(string address) {
40      Console.WriteLine("Started service... at " + address);
41
42      // Discover the job's context from the environment
43      String headNodeName = System.Environment.GetEnvironmentVariable("CCP_SCHEDULER");
44      int jobId = System.Convert.ToInt32(System.Environment.GetEnvironmentVariable("CCP_JOBID"));
45
46      Console.WriteLine(jobId + "@" + headNodeName);
47
48      // Connect to the head node and get the job
49      IScheduler scheduler = new Scheduler();
50      scheduler.Connect(headNodeName);
51      ISchedulerJob job = scheduler.OpenJob(jobId);
52
53      job.SetCustomProperty("address", address);
54
55      job.Commit();
56    }
57
58    private static string GetAddress(ServiceHost service) {
59      IPHostEntry IPHost = Dns.GetHostByName(Dns.GetHostName());
60      string address = "net.tcp://" + IPHost.AddressList[0].ToString() + ":" + service.ChannelDispatchers[0].Listener.Uri.Port + "/AlgorithmBroker";
61
62      return address;
63    }
64
65    static void Main(string[] args) {
66      using (new MPI.Environment(ref args)) {
67        int clients = MPI.Communicator.world.Group.Size - 1;
68        Console.WriteLine("Clients: " + clients);
69
70        MPI.Communicator communicator = MPI.Communicator.world.Clone() as MPI.Communicator;
71
72        if (communicator.Rank == 0) {
73          ServiceHost service = AlgorithmBroker.StartService(communicator);
74          AlgorithmBroker broker = (service.SingletonInstance as AlgorithmBroker);
75
76          string address = GetAddress(service);
77          SetJobStatus(address);
78
79          bool[] finished = new bool[clients];
80          int finishedCount = 0;
81
82          while (finishedCount != clients) {
83            ItemList<ResultCollection> results = new ItemList<ResultCollection>();
84
85            for (int i = 0; i < clients; i++) {
86              if (!finished[i]) {
87                int client = i + 1;
88                ResultCollection result = communicator.Receive<MPITransportWrapper<ResultCollection>>(client, 1).InnerItem;
89
90                Console.WriteLine("Received result " + result);
91
92                if (results.Count != clients) {
93                  results.Add(result);
94                } else {
95                  results[i] = result;
96                }
97
98                Console.WriteLine("Probing...");
99                if (communicator.ImmediateProbe(client, 2) != null) {
100                  finished[i] = true;
101                  finishedCount++;
102                }
103              }
104            }
105
106            Console.WriteLine("Update results");
107            lock (broker.resultLocker)
108              broker.Results = results;
109          }
110
111          lock (broker.ExitWaitHandle) {
112            broker.Terminated = true;
113          }
114
115          broker.ExitWaitHandle.WaitOne();
116          Console.WriteLine("Finished.");
117          Thread.Sleep(1000);
118          service.Close();
119        } else {
120          Program p = new Program();
121          p.StartAlgorithm(communicator);
122        }
123      }
124    }
125
126    public void StartAlgorithm(MPI.Communicator communicator) {
127      IAlgorithm alg = communicator.Receive<MPITransportWrapper<IAlgorithm>>(0, 0).InnerItem;
128      int updateInterval = communicator.Receive<int>(0, 1);
129
130      Console.WriteLine("Starting algorithm...");
131
132      alg.Stopped += new EventHandler(algorithm_Stopped);
133      waitHandle.Reset();
134
135      alg.Prepare(true);
136      alg.Start();
137
138      Timer t = new Timer(delegate(object state) {
139        if (alg.ExecutionState == ExecutionState.Started) {
140          Console.WriteLine("Pausing alg...");
141          alg.Pause();
142
143          while (alg.ExecutionState != ExecutionState.Paused) {
144            Thread.Sleep(100);
145          }
146
147          communicator.Send<MPITransportWrapper<ResultCollection>>(
148            new MPITransportWrapper<ResultCollection>(alg.Results), 0, 1);
149
150          Console.WriteLine("Sending update...");
151
152          Console.WriteLine("Resuming alg...");
153          alg.Start();
154        }
155      }, null, updateInterval, updateInterval);
156
157      waitHandle.WaitOne();
158
159      communicator.Send<int>(communicator.Rank, 0, 2);
160      communicator.Send<MPITransportWrapper<ResultCollection>>(
161          new MPITransportWrapper<ResultCollection>(alg.Results), 0, 1);
162    }
163
164    void algorithm_Stopped(object sender, EventArgs e) {
165      waitHandle.Set();
166    }
167  }
168}
Note: See TracBrowser for help on using the repository browser.