Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
03/05/12 10:18:06 (12 years ago)
Author:
svonolfe
Message:

Improved performance, added MPISolutionsCreator (#1542)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/MPI/HeuristicLab.MPIAlgorithmRunner/3.3/Program.cs

    r7205 r7544  
    2727using HeuristicLab.Persistence.Default.Xml;
    2828using System.Threading;
     29using Microsoft.Hpc.Scheduler;
     30using System.Net;
     31using System.ServiceModel;
     32using HeuristicLab.Operators.MPISupport;
     33using HeuristicLab.Core;
     34using System.Diagnostics;
    2935
    3036namespace HeuristicLab.MPIAlgorithmRunner {
     
    3339
    3440    static void Main(string[] args) {
    35       if (args.Length != 2) {
    36         Console.WriteLine("Args:" + args.Length);
    37 
    38         throw new ArgumentException("You must provide two arguments - the algorithm file and the result file");
     41      if (args.Length == 2) {
     42        string fileName = args[0];
     43        string resultName = args[1];
     44
     45        using (new MPI.Environment(ref args)) {
     46          if (MPI.Communicator.world.Rank != 0) {
     47            Program p = new Program();
     48            p.StartAlgorithm(fileName, resultName + MPI.Communicator.world.Rank + ".hl");
     49          }
     50        }
     51      } else {
     52        using (new MPI.Environment(ref args)) {
     53          int clients = MPI.Communicator.world.Group.Size - 1;
     54          Console.WriteLine("Clients: " + clients);
     55
     56          MPI.Communicator communicator = MPI.Communicator.world.Clone() as MPI.Communicator;
     57
     58          if (communicator.Rank == 0) {
     59            ServiceHost service = AlgorithmBroker.StartService(communicator);
     60            AlgorithmBroker broker = (service.SingletonInstance as AlgorithmBroker);
     61
     62            string address = GetAddress(service);
     63            SetJobStatus(address);
     64
     65            bool[] finished = new bool[clients];
     66            int finishedCount = 0;
     67
     68            while (finishedCount != clients) {
     69              ItemList<ResultCollection> results = new ItemList<ResultCollection>();
     70
     71              for (int i = 0; i < clients; i++) {
     72                if (!finished[i]) {
     73                  int client = i + 1;
     74                  ResultCollection result = communicator.Receive<MPITransportWrapper<ResultCollection>>(client, 1).InnerItem;
     75
     76                  Console.WriteLine("Received result " + result);
     77
     78                  if (results.Count != clients) {
     79                    results.Add(result);
     80                  } else {
     81                    results[i] = result;
     82                  }
     83
     84                  Console.WriteLine("Probing...");
     85                  if (communicator.ImmediateProbe(client, 2) != null) {
     86                    finished[i] = true;
     87                    finishedCount++;
     88                  }
     89                }
     90              }
     91
     92              Console.WriteLine("Update results");
     93              lock (broker.resultLocker)
     94                broker.Results = results;
     95            }
     96
     97            lock (broker.ExitWaitHandle) {
     98              broker.Terminated = true;
     99            }
     100
     101            broker.ExitWaitHandle.WaitOne();
     102            Console.WriteLine("Finished.");
     103            Thread.Sleep(1000);
     104            service.Close();
     105          } else {
     106              Program p = new Program();
     107              p.StartAlgorithm(communicator);
     108          }
     109        }
     110      }       
     111    }
     112
     113    private static void SetJobStatus(string address) {
     114      Console.WriteLine("Started service... at " + address);
     115
     116      // Discover the job's context from the environment
     117      String headNodeName = System.Environment.GetEnvironmentVariable("CCP_SCHEDULER");
     118      int jobId = System.Convert.ToInt32(System.Environment.GetEnvironmentVariable("CCP_JOBID"));
     119
     120      Console.WriteLine(jobId + "@" + headNodeName);
     121
     122      // Connect to the head node and get the job
     123      IScheduler scheduler = new Scheduler();
     124      scheduler.Connect(headNodeName);
     125      ISchedulerJob job = scheduler.OpenJob(jobId);
     126
     127      job.SetCustomProperty("address", address);
     128
     129      job.Commit();
     130    }
     131
     132    private static string GetAddress(ServiceHost service) {
     133      IPHostEntry IPHost = Dns.GetHostByName(Dns.GetHostName());
     134      string address = "net.tcp://" + IPHost.AddressList[0].ToString() + ":" + service.ChannelDispatchers[0].Listener.Uri.Port + "/AlgorithmBroker";
     135
     136      return address;
     137    }
     138
     139    public void StartAlgorithm(MPI.Communicator communicator) {
     140      IAlgorithm alg = communicator.Receive<MPITransportWrapper<IAlgorithm>>(0, 0).InnerItem;
     141      int updateInterval = communicator.Receive<int>(0, 1);
     142
     143      Console.WriteLine("Starting algorithm...");
     144
     145      alg.Prepare(true);
     146      waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
     147      alg.Started += new EventHandler(alg_Started);
     148      alg.Start();
     149
     150      waitHandle.WaitOne();
     151
     152      alg.Started -= new EventHandler(alg_Started);
     153
     154      while (alg.ExecutionState != ExecutionState.Stopped) {
     155        Console.WriteLine("Pausing alg...");
     156        alg.Pause();
     157
     158        while (alg.ExecutionState == ExecutionState.Started) {
     159          Thread.Sleep(100);
     160        }
     161
     162        communicator.Send<MPITransportWrapper<ResultCollection>>(
     163          new MPITransportWrapper<ResultCollection>(alg.Results), 0, 1);
     164
     165        Console.WriteLine("Sending update...");
     166
     167        Console.WriteLine("Resuming alg...");
     168        if (alg.ExecutionState == ExecutionState.Paused)
     169          alg.Start();
     170        Thread.Sleep(updateInterval);
    39171      }
    40172
    41       string fileName = args[0];
    42       string resultName = args[1];
    43 
    44       using (new MPI.Environment(ref args)) {
    45         Program p = new Program();
    46         p.StartAlgorithm(fileName, resultName + MPI.Communicator.world.Rank + ".hl");
    47       }
     173      communicator.Send<int>(communicator.Rank, 0, 2);
     174      communicator.Send<MPITransportWrapper<ResultCollection>>(
     175          new MPITransportWrapper<ResultCollection>(alg.Results), 0, 1);
     176    }
     177
     178    void alg_Started(object sender, EventArgs e) {
     179      waitHandle.Set();
    48180    }
    49181
     
    56188      alg.Prepare(true);
    57189      alg.Start();
    58 
    59       Thread.Sleep(10000);     
     190      Console.WriteLine("ALG STARTED");
     191      Stopwatch sw = new Stopwatch();
     192      sw.Start();
     193
     194     /* Thread.Sleep(10000);     
    60195      Thread t = new Thread(delegate() {
    61196        while (true) {
     
    82217      });
    83218      t.IsBackground = true;
    84       t.Start();
     219      t.Start();*/
    85220
    86221      waitHandle.WaitOne();
    87222
     223      Console.WriteLine("ALG STOPPED");
     224      sw.Stop();
     225      Console.WriteLine("TIME: " + sw.ElapsedMilliseconds + "ms");
     226
     227      if (alg.Results.ContainsKey("Best valid Solution Distance") && alg.Results.ContainsKey("Best valid Solution Vehicles")) {
     228        Console.WriteLine("BestDistance: " + alg.Results["Best valid Solution Distance"].Value.ToString() + ", " +
     229                          "BestVehicles: " + alg.Results["Best valid Solution Vehicles"].Value.ToString());
     230      }
     231
    88232      XmlGenerator.Serialize(alg, resultName);
    89233    }
    90234
    91235    void algorithm_Stopped(object sender, EventArgs e) {
    92       waitHandle.Set();
     236      waitHandle.Set(); 
    93237    }
    94238  }
Note: See TracChangeset for help on using the changeset viewer.