#region License Information /* HeuristicLab * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Text; using HeuristicLab.Optimization; using HeuristicLab.Persistence.Default.Xml; using System.Threading; using HeuristicLab.Operators.MPISupport; using HeuristicLab.Core; using Microsoft.Hpc.Scheduler; using System.ServiceModel; using System.Net; namespace HeuristicLab.MPIAlgorithmRunner { class Program { private static void SetJobStatus(string address) { Console.WriteLine("Started service... at " + address); // Discover the job's context from the environment String headNodeName = System.Environment.GetEnvironmentVariable("CCP_SCHEDULER"); int jobId = System.Convert.ToInt32(System.Environment.GetEnvironmentVariable("CCP_JOBID")); Console.WriteLine(jobId + "@" + headNodeName); // Connect to the head node and get the job IScheduler scheduler = new Scheduler(); scheduler.Connect(headNodeName); ISchedulerJob job = scheduler.OpenJob(jobId); job.SetCustomProperty("address", address); job.Commit(); } private static string GetAddress(ServiceHost service) { IPHostEntry IPHost = Dns.GetHostByName(Dns.GetHostName()); string address = "net.tcp://" + IPHost.AddressList[0].ToString() + ":" + service.ChannelDispatchers[0].Listener.Uri.Port + "/AlgorithmBroker"; return address; } static void Main(string[] args) { using (new MPI.Environment(ref args)) { int clients = MPI.Communicator.world.Group.Size - 1; Console.WriteLine("Clients: " + clients); MPI.Communicator communicator = MPI.Communicator.world.Clone() as MPI.Communicator; if (communicator.Rank == 0) { ServiceHost service = AlgorithmBroker.StartService(communicator); AlgorithmBroker broker = (service.SingletonInstance as AlgorithmBroker); string address = GetAddress(service); SetJobStatus(address); bool[] finished = new bool[clients]; int finishedCount = 0; while (finishedCount != clients) { ItemList results = new ItemList(); for (int i = 0; i < clients; i++) { if (!finished[i]) { int client = i + 1; ResultCollection result = communicator.Receive>(client, 1).InnerItem; Console.WriteLine("Received result " + result); if (results.Count != clients) { results.Add(result); } else { results[i] = result; } Console.WriteLine("Probing..."); if (communicator.ImmediateProbe(client, 2) != null) { finished[i] = true; finishedCount++; } } } Console.WriteLine("Update results"); lock (broker.resultLocker) broker.Results = results; } lock (broker.ExitWaitHandle) { broker.Terminated = true; } broker.ExitWaitHandle.WaitOne(); Console.WriteLine("Finished."); Thread.Sleep(1000); service.Close(); } else { Program p = new Program(); p.StartAlgorithm(communicator); } } } private EventWaitHandle waitHandle; public void StartAlgorithm(MPI.Communicator communicator) { IAlgorithm alg = communicator.Receive>(0, 0).InnerItem; int updateInterval = communicator.Receive(0, 1); Console.WriteLine("Starting algorithm..."); alg.Prepare(true); waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset); alg.Started += new EventHandler(alg_Started); alg.Start(); waitHandle.WaitOne(); alg.Started -= new EventHandler(alg_Started); while (alg.ExecutionState != ExecutionState.Stopped) { Console.WriteLine("Pausing alg..."); alg.Pause(); while (alg.ExecutionState == ExecutionState.Started) { Thread.Sleep(100); } communicator.Send>( new MPITransportWrapper(alg.Results), 0, 1); Console.WriteLine("Sending update..."); Console.WriteLine("Resuming alg..."); if (alg.ExecutionState == ExecutionState.Paused) alg.Start(); Thread.Sleep(updateInterval); } communicator.Send(communicator.Rank, 0, 2); communicator.Send>( new MPITransportWrapper(alg.Results), 0, 1); } void alg_Started(object sender, EventArgs e) { waitHandle.Set(); } } }