Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/01/11 17:37:05 (13 years ago)
Author:
svonolfe
Message:

Added first version of the MPIEngine (#1542)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs

    r6349 r6354  
    2222using System;
    2323using System.Collections.Generic;
    24 using System.Threading;
    25 using System.Threading.Tasks;
    2624using HeuristicLab.Common;
    2725using HeuristicLab.Core;
    2826using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
     27using System.Reflection;
     28using System.IO;
     29using HeuristicLab.Persistence.Default.Xml;
     30using System.Diagnostics;
     31using HeuristicLab.Optimization;
     32using System.Linq;
    2933
    3034namespace HeuristicLab.MPIEngine {
     
    3640  [Item("MPI Engine", "Engine for parallel execution of algorithms using multiple processes (suitable for distributed memory systems with multiple cores).")]
    3741  public class MPIEngine : Engine {
    38     private CancellationToken cancellationToken;
    39 
    4042    [StorableConstructor]
    4143    protected MPIEngine(bool deserializing) : base(deserializing) { }
     
    4749    }
    4850
    49     protected override void Run(CancellationToken cancellationToken) {
    50       this.cancellationToken = cancellationToken;
    51       Run(ExecutionStack);
     51    private string algFile;
     52
     53    public override void Start() {
     54      if (ExecutionStack.Count == 1) {
     55        ExecutionContext context = ExecutionStack.First() as ExecutionContext;
     56
     57        ExecutionContext algorithmContext = context.Parent as ExecutionContext;
     58
     59        EngineAlgorithm alg = typeof(ExecutionContext).InvokeMember("parameterizedItem",
     60          BindingFlags.GetField | BindingFlags.NonPublic |
     61          BindingFlags.Instance, null, algorithmContext, null) as EngineAlgorithm;
     62
     63        alg = alg.Clone() as EngineAlgorithm;
     64        alg.Engine = new SequentialEngine.SequentialEngine();
     65
     66        algFile = Path.GetTempFileName();
     67        XmlGenerator.Serialize(alg, algFile);
     68      }
     69
     70      base.Start();
    5271    }
    5372
    54     private void Run(object state) {
    55       Stack<IOperation> executionStack = (Stack<IOperation>)state;
    56       IOperation next;
    57       OperationCollection coll;
    58       IAtomicOperation operation;
     73    protected override void Run(System.Threading.CancellationToken cancellationToken) {
     74      if (ExecutionStack.Count == 1) {
     75        ExecutionContext context = ExecutionStack.Pop() as ExecutionContext;
    5976
    60       while (executionStack.Count > 0) {
    61         cancellationToken.ThrowIfCancellationRequested();
     77        IScope globalScope = context.Scope;
    6278
    63         next = executionStack.Pop();
    64         if (next is OperationCollection) {
    65           coll = (OperationCollection)next;
    66           if (coll.Parallel) {
    67             Task[] tasks = new Task[coll.Count];
    68             Stack<IOperation>[] stacks = new Stack<IOperation>[coll.Count];
    69             for (int i = 0; i < coll.Count; i++) {
    70               stacks[i] = new Stack<IOperation>();
    71               stacks[i].Push(coll[i]);
    72               tasks[i] = Task.Factory.StartNew(Run, stacks[i], cancellationToken);
     79        string resultFile = Path.GetTempFileName();
     80        ItemList<ResultCollection> empty = new ItemList<ResultCollection>();
     81        XmlGenerator.Serialize(empty, resultFile);
     82
     83        string exec = @"C:\Program Files\Microsoft Compute Cluster Pack\Bin\mpiexec.exe";
     84        string args = @"-n 3 HeuristicLab.MPIAlgorithmRunner-3.3.exe " + algFile + " " + resultFile + " 5000";
     85
     86        System.Threading.Thread pollThread = new System.Threading.Thread(delegate(object state) {
     87          while (true) {
     88            System.Threading.Thread.Sleep(5000);
     89
     90            lock (resultFile) {
     91              object results = XmlParser.Deserialize(resultFile);
     92              ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection);
     93
     94              if (resultCollection != null) {
     95                if (!resultCollection.ContainsKey("MPIResults"))
     96                  resultCollection.Add(new Result("MPIResults", results as IItem));
     97
     98                resultCollection["MPIResults"].Value = results as IItem;
     99              }
     100
    73101            }
    74             try {
    75               Task.WaitAll(tasks);
    76             }
    77             catch (AggregateException ex) {
    78               OperationCollection remaining = new OperationCollection() { Parallel = true };
    79               for (int i = 0; i < stacks.Length; i++) {
    80                 if (stacks[i].Count == 1)
    81                   remaining.Add(stacks[i].Pop());
    82                 if (stacks[i].Count > 1) {
    83                   OperationCollection ops = new OperationCollection();
    84                   while (stacks[i].Count > 0)
    85                     ops.Add(stacks[i].Pop());
    86                   remaining.Add(ops);
    87                 }
    88               }
    89               if (remaining.Count > 0) executionStack.Push(remaining);
    90               throw ex;
    91             }
    92           } else {
    93             for (int i = coll.Count - 1; i >= 0; i--)
    94               if (coll[i] != null) executionStack.Push(coll[i]);
    95102          }
    96         } else if (next is IAtomicOperation) {
    97           operation = (IAtomicOperation)next;
    98           try {
    99             next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
    100           }
    101           catch (Exception ex) {
    102             executionStack.Push(operation);
    103             if (ex is OperationCanceledException) throw ex;
    104             else throw new OperatorExecutionException(operation.Operator, ex);
    105           }
    106           if (next != null) executionStack.Push(next);
     103        });
     104        pollThread.Start();
    107105
    108           if (operation.Operator.Breakpoint) {
    109             string message = string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName);
    110             Log.LogMessage(message);
    111             throw new OperationCanceledException(message);
    112           }
    113         }
     106        Process p = Process.Start(exec, args);
     107        p.WaitForExit();
     108
     109        pollThread.Abort();
     110        File.Delete(algFile);
     111        File.Delete(resultFile);
    114112      }
    115113    }
Note: See TracChangeset for help on using the changeset viewer.