#region License Information /* HeuristicLab * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Text; using HeuristicLab.Optimization; using HeuristicLab.Persistence.Default.Xml; using System.Threading; using Microsoft.Hpc.Scheduler; using System.Net; using System.ServiceModel; using HeuristicLab.Operators.MPISupport; using HeuristicLab.Core; using System.Diagnostics; using HeuristicLab.Common; using System.IO; namespace HeuristicLab.MPIAlgorithmRunner { class Program { EventWaitHandle waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset); static void Main(string[] args) { if (args.Length == 2) { string fileName = args[0]; int calcTime; if (int.TryParse(args[1], out calcTime)) { using (new MPI.Environment(ref args)) { if (MPI.Communicator.world.Rank != 0) { Program p = new Program(); p.StartAlgorithm(fileName,calcTime); } } } else { string resultName = args[1]; using (new MPI.Environment(ref args)) { if (MPI.Communicator.world.Rank != 0) { Program p = new Program(); p.StartAlgorithm(fileName, resultName + MPI.Communicator.world.Rank + ".hl"); } } } } else { using (new MPI.Environment(ref args)) { int clients = MPI.Communicator.world.Group.Size - 1; Console.WriteLine("Clients: " + clients); MPI.Communicator communicator = MPI.Communicator.world.Clone() as MPI.Communicator; if (communicator.Rank == 0) { ServiceHost service = AlgorithmBroker.StartService(communicator); AlgorithmBroker broker = (service.SingletonInstance as AlgorithmBroker); string address = GetAddress(service); SetJobStatus(address); bool[] finished = new bool[clients]; int finishedCount = 0; while (finishedCount != clients) { ItemList results = new ItemList(); for (int i = 0; i < clients; i++) { if (!finished[i]) { int client = i + 1; ResultCollection result = communicator.Receive>(client, 1).InnerItem; Console.WriteLine("Received result " + result); if (results.Count != clients) { results.Add(result); } else { results[i] = result; } Console.WriteLine("Probing..."); if (communicator.ImmediateProbe(client, 2) != null) { finished[i] = true; finishedCount++; } } } Console.WriteLine("Update results"); lock (broker.resultLocker) broker.Results = results; } lock (broker.ExitWaitHandle) { broker.Terminated = true; } broker.ExitWaitHandle.WaitOne(); Console.WriteLine("Finished."); Thread.Sleep(1000); service.Close(); } else { Program p = new Program(); p.StartAlgorithm(communicator); } } } } private static void SetJobStatus(string address) { Console.WriteLine("Started service... at " + address); // Discover the job's context from the environment String headNodeName = System.Environment.GetEnvironmentVariable("CCP_SCHEDULER"); int jobId = System.Convert.ToInt32(System.Environment.GetEnvironmentVariable("CCP_JOBID")); Console.WriteLine(jobId + "@" + headNodeName); // Connect to the head node and get the job IScheduler scheduler = new Scheduler(); scheduler.Connect(headNodeName); ISchedulerJob job = scheduler.OpenJob(jobId); job.SetCustomProperty("address", address); job.Commit(); } private static string GetAddress(ServiceHost service) { IPHostEntry IPHost = Dns.GetHostByName(Dns.GetHostName()); string address = "net.tcp://" + IPHost.AddressList[0].ToString() + ":" + service.ChannelDispatchers[0].Listener.Uri.Port + "/AlgorithmBroker"; return address; } public void StartAlgorithm(MPI.Communicator communicator) { IAlgorithm alg = communicator.Receive>(0, 0).InnerItem; int updateInterval = communicator.Receive(0, 1); Console.WriteLine("Starting algorithm..."); alg.Prepare(true); waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset); alg.Started += new EventHandler(alg_Started); alg.Start(); waitHandle.WaitOne(); alg.Started -= new EventHandler(alg_Started); while (alg.ExecutionState != ExecutionState.Stopped) { Console.WriteLine("Pausing alg..."); alg.Pause(); while (alg.ExecutionState == ExecutionState.Started) { Thread.Sleep(100); } communicator.Send>( new MPITransportWrapper(alg.Results), 0, 1); Console.WriteLine("Sending update..."); Console.WriteLine("Resuming alg..."); if (alg.ExecutionState == ExecutionState.Paused) alg.Start(); Thread.Sleep(updateInterval); } communicator.Send(communicator.Rank, 0, 2); communicator.Send>( new MPITransportWrapper(alg.Results), 0, 1); } void alg_Started(object sender, EventArgs e) { waitHandle.Set(); } public void StartAlgorithm(string fileName, string resultName) { IAlgorithm alg = XmlParser.Deserialize(fileName); alg.Stopped += new EventHandler(algorithm_Stopped); waitHandle.Reset(); alg.Prepare(true); alg.Start(); Console.WriteLine("ALG STARTED"); Stopwatch sw = new Stopwatch(); sw.Start(); if (MPI.Communicator.world.Rank == 1) { Thread.Sleep(10000); Thread t = new Thread(delegate() { while (true) { if (alg.ExecutionState == Core.ExecutionState.Started) { alg.Pause(); while (alg.ExecutionState == Core.ExecutionState.Started) Thread.Sleep(100); if (alg.Results.ContainsKey("Best valid VRP Solution Distance")) { Console.WriteLine("BestDistance: " + alg.Results["Best valid VRP Solution Distance"].Value.ToString()); } if (alg.Results.ContainsKey("Best valid VRP Solution VehicleUtilization")) { Console.WriteLine("BestVehicles: " + alg.Results["Best valid VRP Solution VehicleUtilization"].Value.ToString()); } XmlGenerator.Serialize(alg, resultName); alg.Start(); } Thread.Sleep(300000); } }); t.IsBackground = true; t.Start(); } waitHandle.WaitOne(); Console.WriteLine("ALG STOPPED"); sw.Stop(); Console.WriteLine("TIME: " + sw.ElapsedMilliseconds + "ms"); if (alg.Results.ContainsKey("Best valid VRP Solution Distance") && alg.Results.ContainsKey("Best valid VRP Solution VehicleUtilization")) { Console.WriteLine("BestDistance: " + alg.Results["Best valid VRP Solution Distance"].Value.ToString() + ", " + "BestVehicles: " + alg.Results["Best valid VRP Solution VehicleUtilization"].Value.ToString()); } XmlGenerator.Serialize(alg, resultName); } void algorithm_Stopped(object sender, EventArgs e) { waitHandle.Set(); } public void StartAlgorithm(string fileName, int calculationTime) { IAlgorithm alg = XmlParser.Deserialize(fileName); IOperator breakpoint = ((alg.Parameters["Analyzer"] as IValueParameter).Value as IOperator); waitHandle.Reset(); alg.Start(); Thread.Sleep(calculationTime); if (alg.ExecutionState == ExecutionState.Started) { waitHandle.Reset(); alg.Paused += new EventHandler(alg_Paused); breakpoint.Breakpoint = true; waitHandle.WaitOne(); breakpoint.Breakpoint = false; alg.Paused -= new EventHandler(alg_Paused); } Mutex mutex = new Mutex(false, "ResultFile"); mutex.WaitOne(); Console.WriteLine("SERIALIZING"); XmlGenerator.Serialize(alg, fileName); MPI.Communicator.world.Abort(0); } void alg_Paused(object sender, EventArgs e) { waitHandle.Set(); } } }