Free cookie consent management tool by TermsFeed Policy Generator

Changeset 6354


Ignore:
Timestamp:
06/01/11 17:37:05 (13 years ago)
Author:
svonolfe
Message:

Added first version of the MPIEngine (#1542)

Location:
branches/MPI
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • branches/MPI/HeuristicLab.ExtLibs/HeuristicLab.MPInet/MPIAlgorithmRunner/3.3/HeuristicLab.MPIAlgorithmRunner-3.3.csproj

    r6348 r6354  
    120120  </ItemGroup>
    121121  <ItemGroup>
     122    <ProjectReference Include="..\..\..\..\HeuristicLab.Collections\3.3\HeuristicLab.Collections-3.3.csproj">
     123      <Project>{958B43BC-CC5C-4FA2-8628-2B3B01D890B6}</Project>
     124      <Name>HeuristicLab.Collections-3.3</Name>
     125    </ProjectReference>
    122126    <ProjectReference Include="..\..\..\..\HeuristicLab.Common\3.3\HeuristicLab.Common-3.3.csproj">
    123127      <Project>{A9AD58B9-3EF9-4CC1-97E5-8D909039FF5C}</Project>
     
    127131      <Project>{C36BD924-A541-4A00-AFA8-41701378DDC5}</Project>
    128132      <Name>HeuristicLab.Core-3.3</Name>
     133    </ProjectReference>
     134    <ProjectReference Include="..\..\..\..\HeuristicLab.Operators.MPISupport\3.3\HeuristicLab.Operators.MPISupport-3.3.csproj">
     135      <Project>{6BD69CDA-4875-4045-8B35-6FD4602854F5}</Project>
     136      <Name>HeuristicLab.Operators.MPISupport-3.3</Name>
    129137    </ProjectReference>
    130138    <ProjectReference Include="..\..\..\..\HeuristicLab.PluginInfrastructure\3.3\HeuristicLab.PluginInfrastructure-3.3.csproj">
  • branches/MPI/HeuristicLab.ExtLibs/HeuristicLab.MPInet/MPIAlgorithmRunner/3.3/HeuristicLabMPIAlgorithmRunnerPlugin.cs.frame

    r6348 r6354  
    3030  [PluginDependency("HeuristicLab.Common", "3.3")]
    3131  [PluginDependency("HeuristicLab.Core", "3.3")]
     32  [PluginDependency("HeuristicLab.Collections", "3.3")]
    3233  [PluginDependency("HeuristicLab.MPInet", "1.0.0")]
    3334  [PluginDependency("HeuristicLab.Optimization", "3.3")]
    3435  [PluginDependency("HeuristicLab.Persistence", "3.3")]
     36  [PluginDependency("HeuristicLab.Operators.MPISupport", "3.3")]
    3537  public class HeuristicLabProblemsVehicleRoutingPlugin : PluginBase {
    3638  }
  • branches/MPI/HeuristicLab.ExtLibs/HeuristicLab.MPInet/MPIAlgorithmRunner/3.3/Program.cs

    r6348 r6354  
    2727using HeuristicLab.Persistence.Default.Xml;
    2828using System.Threading;
     29using HeuristicLab.Operators.MPISupport;
     30using HeuristicLab.Core;
    2931
    3032namespace HeuristicLab.MPIAlgorithmRunner {
     
    3335
    3436    static void Main(string[] args) {
    35       if (args.Length != 2) {
     37      if (args.Length != 3) {
    3638        Console.WriteLine("Args:" + args.Length);
    3739
    38         throw new ArgumentException("You must provide two arguments - the algorithm file and the result file");
     40        throw new ArgumentException("You must provide 3 arguments - the algorithm file, the result file and the update interval (in ms)");
    3941      }
    4042
    4143      string fileName = args[0];
    4244      string resultName = args[1];
     45      int updateInterval = int.Parse(args[2]);
    4346
    4447      using (new MPI.Environment(ref args)) {
    45         Program p = new Program();
    46         p.StartAlgorithm(fileName, resultName + MPI.Communicator.world.Rank + ".hl");
     48        MPI.Communicator communicator = MPI.Communicator.world.Clone() as MPI.Communicator;
     49
     50        if (communicator.Rank == 0) {
     51          IAlgorithm alg = XmlParser.Deserialize<IAlgorithm>(fileName);
     52          MPITransportWrapper<IAlgorithm> transport = new MPITransportWrapper<IAlgorithm>(alg);
     53
     54          int clients = communicator.Size - 1;
     55          Console.WriteLine("Sending alg to " + clients + " clients");
     56          for (int i = 0; i < clients; i++) {
     57            int client = i + 1;
     58            communicator.Send(transport, client, 0);
     59          }
     60
     61          ItemList<ResultCollection> results = new ItemList<ResultCollection>();
     62         
     63          bool[] finished = new bool[clients];
     64          int finishedCount = 0;
     65
     66          while (finishedCount != clients) {
     67            for (int i = 0; i < clients; i++) {
     68              if (!finished[i]) {
     69                int client = i + 1;
     70                ResultCollection result = communicator.Receive<MPITransportWrapper<ResultCollection>>(client, 1).InnerItem;
     71
     72                Console.WriteLine("Received result " + result);
     73
     74                if (results.Count != clients) {
     75                  results.Add(result);
     76                } else {
     77                  results[i] = result;
     78                }
     79
     80                Console.WriteLine("Probing...");
     81                if (communicator.ImmediateProbe(client, 2) != null) {
     82                  finished[i] = true;
     83                  finishedCount++;
     84                }
     85              }
     86            }
     87
     88            Console.WriteLine("Update results");
     89            lock (resultName) {
     90              XmlGenerator.Serialize(results, resultName);
     91            }
     92          }
     93
     94          Console.WriteLine("Finished.");
     95        } else {
     96          Program p = new Program();
     97          p.StartAlgorithm(updateInterval, communicator);
     98        }
    4799      }
    48100    }
    49101
    50     public void StartAlgorithm(string fileName, string resultName) {
    51       IAlgorithm alg = XmlParser.Deserialize<HeuristicLab.Optimization.IAlgorithm>(fileName);
     102    public void StartAlgorithm(int updateInterval, MPI.Communicator communicator) {
     103      IAlgorithm alg = communicator.Receive<MPITransportWrapper<IAlgorithm>>(0, 0).InnerItem;
     104
     105      Console.WriteLine("Starting algorithm...");
    52106
    53107      alg.Stopped += new EventHandler(algorithm_Stopped);
     
    57111      alg.Start();
    58112
     113      Timer t = new Timer(delegate(object state) {
     114        if (alg.ExecutionState == ExecutionState.Started) {
     115          Console.WriteLine("Pausing alg...");
     116          alg.Pause();
     117
     118          while (alg.ExecutionState != ExecutionState.Paused) {
     119            Thread.Sleep(100);
     120          }
     121
     122          communicator.Send<MPITransportWrapper<ResultCollection>>(
     123            new MPITransportWrapper<ResultCollection>(alg.Results), 0, 1);
     124
     125          Console.WriteLine("Sending update...");
     126
     127          Console.WriteLine("Resuming alg...");
     128          alg.Start();
     129        }
     130      }, null, updateInterval, updateInterval);
     131
    59132      waitHandle.WaitOne();
    60133
    61       XmlGenerator.Serialize(alg, resultName);
     134      communicator.Send<int>(communicator.Rank, 0, 2);
     135      communicator.Send<MPITransportWrapper<ResultCollection>>(
     136          new MPITransportWrapper<ResultCollection>(alg.Results), 0, 1);
    62137    }
    63138
  • branches/MPI/HeuristicLab.MPIEngine/3.3/HeuristicLab.MPIEngine-3.3.csproj

    r6349 r6354  
    122122  </ItemGroup>
    123123  <ItemGroup>
     124    <ProjectReference Include="..\..\HeuristicLab.Collections\3.3\HeuristicLab.Collections-3.3.csproj">
     125      <Project>{958B43BC-CC5C-4FA2-8628-2B3B01D890B6}</Project>
     126      <Name>HeuristicLab.Collections-3.3</Name>
     127    </ProjectReference>
    124128    <ProjectReference Include="..\..\HeuristicLab.Common\3.3\HeuristicLab.Common-3.3.csproj">
    125129      <Project>{A9AD58B9-3EF9-4CC1-97E5-8D909039FF5C}</Project>
     
    130134      <Name>HeuristicLab.Core-3.3</Name>
    131135    </ProjectReference>
     136    <ProjectReference Include="..\..\HeuristicLab.Optimization\3.3\HeuristicLab.Optimization-3.3.csproj">
     137      <Project>{14AB8D24-25BC-400C-A846-4627AA945192}</Project>
     138      <Name>HeuristicLab.Optimization-3.3</Name>
     139    </ProjectReference>
    132140    <ProjectReference Include="..\..\HeuristicLab.Persistence\3.3\HeuristicLab.Persistence-3.3.csproj">
    133141      <Project>{102BC7D3-0EF9-439C-8F6D-96FF0FDB8E1B}</Project>
     
    137145      <Project>{94186A6A-5176-4402-AE83-886557B53CCA}</Project>
    138146      <Name>HeuristicLab.PluginInfrastructure-3.3</Name>
     147    </ProjectReference>
     148    <ProjectReference Include="..\..\HeuristicLab.SequentialEngine\3.3\HeuristicLab.SequentialEngine-3.3.csproj">
     149      <Project>{DC3D7072-7999-4719-B65D-3997744D5DC1}</Project>
     150      <Name>HeuristicLab.SequentialEngine-3.3</Name>
    139151    </ProjectReference>
    140152  </ItemGroup>
  • branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs

    r6349 r6354  
    2222using System;
    2323using System.Collections.Generic;
    24 using System.Threading;
    25 using System.Threading.Tasks;
    2624using HeuristicLab.Common;
    2725using HeuristicLab.Core;
    2826using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
     27using System.Reflection;
     28using System.IO;
     29using HeuristicLab.Persistence.Default.Xml;
     30using System.Diagnostics;
     31using HeuristicLab.Optimization;
     32using System.Linq;
    2933
    3034namespace HeuristicLab.MPIEngine {
     
    3640  [Item("MPI Engine", "Engine for parallel execution of algorithms using multiple processes (suitable for distributed memory systems with multiple cores).")]
    3741  public class MPIEngine : Engine {
    38     private CancellationToken cancellationToken;
    39 
    4042    [StorableConstructor]
    4143    protected MPIEngine(bool deserializing) : base(deserializing) { }
     
    4749    }
    4850
    49     protected override void Run(CancellationToken cancellationToken) {
    50       this.cancellationToken = cancellationToken;
    51       Run(ExecutionStack);
     51    private string algFile;
     52
     53    public override void Start() {
     54      if (ExecutionStack.Count == 1) {
     55        ExecutionContext context = ExecutionStack.First() as ExecutionContext;
     56
     57        ExecutionContext algorithmContext = context.Parent as ExecutionContext;
     58
     59        EngineAlgorithm alg = typeof(ExecutionContext).InvokeMember("parameterizedItem",
     60          BindingFlags.GetField | BindingFlags.NonPublic |
     61          BindingFlags.Instance, null, algorithmContext, null) as EngineAlgorithm;
     62
     63        alg = alg.Clone() as EngineAlgorithm;
     64        alg.Engine = new SequentialEngine.SequentialEngine();
     65
     66        algFile = Path.GetTempFileName();
     67        XmlGenerator.Serialize(alg, algFile);
     68      }
     69
     70      base.Start();
    5271    }
    5372
    54     private void Run(object state) {
    55       Stack<IOperation> executionStack = (Stack<IOperation>)state;
    56       IOperation next;
    57       OperationCollection coll;
    58       IAtomicOperation operation;
     73    protected override void Run(System.Threading.CancellationToken cancellationToken) {
     74      if (ExecutionStack.Count == 1) {
     75        ExecutionContext context = ExecutionStack.Pop() as ExecutionContext;
    5976
    60       while (executionStack.Count > 0) {
    61         cancellationToken.ThrowIfCancellationRequested();
     77        IScope globalScope = context.Scope;
    6278
    63         next = executionStack.Pop();
    64         if (next is OperationCollection) {
    65           coll = (OperationCollection)next;
    66           if (coll.Parallel) {
    67             Task[] tasks = new Task[coll.Count];
    68             Stack<IOperation>[] stacks = new Stack<IOperation>[coll.Count];
    69             for (int i = 0; i < coll.Count; i++) {
    70               stacks[i] = new Stack<IOperation>();
    71               stacks[i].Push(coll[i]);
    72               tasks[i] = Task.Factory.StartNew(Run, stacks[i], cancellationToken);
     79        string resultFile = Path.GetTempFileName();
     80        ItemList<ResultCollection> empty = new ItemList<ResultCollection>();
     81        XmlGenerator.Serialize(empty, resultFile);
     82
     83        string exec = @"C:\Program Files\Microsoft Compute Cluster Pack\Bin\mpiexec.exe";
     84        string args = @"-n 3 HeuristicLab.MPIAlgorithmRunner-3.3.exe " + algFile + " " + resultFile + " 5000";
     85
     86        System.Threading.Thread pollThread = new System.Threading.Thread(delegate(object state) {
     87          while (true) {
     88            System.Threading.Thread.Sleep(5000);
     89
     90            lock (resultFile) {
     91              object results = XmlParser.Deserialize(resultFile);
     92              ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection);
     93
     94              if (resultCollection != null) {
     95                if (!resultCollection.ContainsKey("MPIResults"))
     96                  resultCollection.Add(new Result("MPIResults", results as IItem));
     97
     98                resultCollection["MPIResults"].Value = results as IItem;
     99              }
     100
    73101            }
    74             try {
    75               Task.WaitAll(tasks);
    76             }
    77             catch (AggregateException ex) {
    78               OperationCollection remaining = new OperationCollection() { Parallel = true };
    79               for (int i = 0; i < stacks.Length; i++) {
    80                 if (stacks[i].Count == 1)
    81                   remaining.Add(stacks[i].Pop());
    82                 if (stacks[i].Count > 1) {
    83                   OperationCollection ops = new OperationCollection();
    84                   while (stacks[i].Count > 0)
    85                     ops.Add(stacks[i].Pop());
    86                   remaining.Add(ops);
    87                 }
    88               }
    89               if (remaining.Count > 0) executionStack.Push(remaining);
    90               throw ex;
    91             }
    92           } else {
    93             for (int i = coll.Count - 1; i >= 0; i--)
    94               if (coll[i] != null) executionStack.Push(coll[i]);
    95102          }
    96         } else if (next is IAtomicOperation) {
    97           operation = (IAtomicOperation)next;
    98           try {
    99             next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
    100           }
    101           catch (Exception ex) {
    102             executionStack.Push(operation);
    103             if (ex is OperationCanceledException) throw ex;
    104             else throw new OperatorExecutionException(operation.Operator, ex);
    105           }
    106           if (next != null) executionStack.Push(next);
     103        });
     104        pollThread.Start();
    107105
    108           if (operation.Operator.Breakpoint) {
    109             string message = string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName);
    110             Log.LogMessage(message);
    111             throw new OperationCanceledException(message);
    112           }
    113         }
     106        Process p = Process.Start(exec, args);
     107        p.WaitForExit();
     108
     109        pollThread.Abort();
     110        File.Delete(algFile);
     111        File.Delete(resultFile);
    114112      }
    115113    }
  • branches/MPI/HeuristicLab.Operators.MPISupport/3.3/MPITransportWrapper.cs

    r6347 r6354  
    3434  /// </summary>
    3535  [Serializable]
    36   class MPITransportWrapper<T>: ISerializable where T: class, IItem {
     36  public class MPITransportWrapper<T>: ISerializable where T: class, IItem {
    3737    [NonSerialized]
    3838    private T innerItem;
  • branches/MPI/HeuristicLab.Operators.MPISupport/3.3/MPIUnidirectionalRingMigrator.cs

    r6347 r6354  
    4343    }
    4444
     45    public static int Channel = 100;
     46
    4547    public override IOperation Apply() {
    4648      if (MPI.Communicator.world != null) {
    47         int size = MPI.Communicator.world.Group.Size;
    48         if (size > 1) {
     49        int size = MPI.Communicator.world.Size;
     50        if (size > 2) {
    4951          int i = MPI.Communicator.world.Rank;
    5052          IScope scope = ExecutionContext.Scope;
     
    5254          IScope emigrants = scope.SubScopes[1];
    5355          scope.SubScopes.Remove(emigrants);
    54           int recipent = (i + 1) % size;
     56          int recipent = i + 1;
     57          if (recipent == size)
     58            recipent = 1;
    5559
    5660          Console.WriteLine("MIGRATE " + i + " TO " + recipent);
    5761          MPI.Communicator.world.Send<MPITransportWrapper<IScope>>(
    58             new MPITransportWrapper<IScope>(emigrants), recipent, 0);
     62            new MPITransportWrapper<IScope>(emigrants), recipent, Channel);
    5963
    6064          IScope immigrants = null;
    6165          int sender = i - 1;
    62           if (sender < 0)
     66          if (sender < 1)
    6367            sender = size - 1;
    6468
    6569          Console.WriteLine("MIGRATE " + i + " FROM " + sender);
    66           immigrants = MPI.Communicator.world.Receive<MPITransportWrapper<IScope>>(sender, 0).InnerItem;
     70          immigrants = MPI.Communicator.world.Receive<MPITransportWrapper<IScope>>(sender, Channel).InnerItem;
    6771
    6872          scope.SubScopes.Add(immigrants);
Note: See TracChangeset for help on using the changeset viewer.