Changeset 6354 for branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs
- Timestamp:
- 06/01/11 17:37:05 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/MPI/HeuristicLab.MPIEngine/3.3/MPIEngine.cs
r6349 r6354 22 22 using System; 23 23 using System.Collections.Generic; 24 using System.Threading;25 using System.Threading.Tasks;26 24 using HeuristicLab.Common; 27 25 using HeuristicLab.Core; 28 26 using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; 27 using System.Reflection; 28 using System.IO; 29 using HeuristicLab.Persistence.Default.Xml; 30 using System.Diagnostics; 31 using HeuristicLab.Optimization; 32 using System.Linq; 29 33 30 34 namespace HeuristicLab.MPIEngine { … … 36 40 [Item("MPI Engine", "Engine for parallel execution of algorithms using multiple processes (suitable for distributed memory systems with multiple cores).")] 37 41 public class MPIEngine : Engine { 38 private CancellationToken cancellationToken;39 40 42 [StorableConstructor] 41 43 protected MPIEngine(bool deserializing) : base(deserializing) { } … … 47 49 } 48 50 49 protected override void Run(CancellationToken cancellationToken) { 50 this.cancellationToken = cancellationToken; 51 Run(ExecutionStack); 51 private string algFile; 52 53 public override void Start() { 54 if (ExecutionStack.Count == 1) { 55 ExecutionContext context = ExecutionStack.First() as ExecutionContext; 56 57 ExecutionContext algorithmContext = context.Parent as ExecutionContext; 58 59 EngineAlgorithm alg = typeof(ExecutionContext).InvokeMember("parameterizedItem", 60 BindingFlags.GetField | BindingFlags.NonPublic | 61 BindingFlags.Instance, null, algorithmContext, null) as EngineAlgorithm; 62 63 alg = alg.Clone() as EngineAlgorithm; 64 alg.Engine = new SequentialEngine.SequentialEngine(); 65 66 algFile = Path.GetTempFileName(); 67 XmlGenerator.Serialize(alg, algFile); 68 } 69 70 base.Start(); 52 71 } 53 72 54 private void Run(object state) { 55 Stack<IOperation> executionStack = (Stack<IOperation>)state; 56 IOperation next; 57 OperationCollection coll; 58 IAtomicOperation operation; 73 protected override void Run(System.Threading.CancellationToken cancellationToken) { 74 if (ExecutionStack.Count == 1) { 75 ExecutionContext context = ExecutionStack.Pop() as ExecutionContext; 59 76 60 while (executionStack.Count > 0) { 61 cancellationToken.ThrowIfCancellationRequested(); 77 IScope globalScope = context.Scope; 62 78 63 next = executionStack.Pop(); 64 if (next is OperationCollection) { 65 coll = (OperationCollection)next; 66 if (coll.Parallel) { 67 Task[] tasks = new Task[coll.Count]; 68 Stack<IOperation>[] stacks = new Stack<IOperation>[coll.Count]; 69 for (int i = 0; i < coll.Count; i++) { 70 stacks[i] = new Stack<IOperation>(); 71 stacks[i].Push(coll[i]); 72 tasks[i] = Task.Factory.StartNew(Run, stacks[i], cancellationToken); 79 string resultFile = Path.GetTempFileName(); 80 ItemList<ResultCollection> empty = new ItemList<ResultCollection>(); 81 XmlGenerator.Serialize(empty, resultFile); 82 83 string exec = @"C:\Program Files\Microsoft Compute Cluster Pack\Bin\mpiexec.exe"; 84 string args = @"-n 3 HeuristicLab.MPIAlgorithmRunner-3.3.exe " + algFile + " " + resultFile + " 5000"; 85 86 System.Threading.Thread pollThread = new System.Threading.Thread(delegate(object state) { 87 while (true) { 88 System.Threading.Thread.Sleep(5000); 89 90 lock (resultFile) { 91 object results = XmlParser.Deserialize(resultFile); 92 ResultCollection resultCollection = (globalScope.Variables["Results"].Value as ResultCollection); 93 94 if (resultCollection != null) { 95 if (!resultCollection.ContainsKey("MPIResults")) 96 resultCollection.Add(new Result("MPIResults", results as IItem)); 97 98 resultCollection["MPIResults"].Value = results as IItem; 99 } 100 73 101 } 74 try {75 Task.WaitAll(tasks);76 }77 catch (AggregateException ex) {78 OperationCollection remaining = new OperationCollection() { Parallel = true };79 for (int i = 0; i < stacks.Length; i++) {80 if (stacks[i].Count == 1)81 remaining.Add(stacks[i].Pop());82 if (stacks[i].Count > 1) {83 OperationCollection ops = new OperationCollection();84 while (stacks[i].Count > 0)85 ops.Add(stacks[i].Pop());86 remaining.Add(ops);87 }88 }89 if (remaining.Count > 0) executionStack.Push(remaining);90 throw ex;91 }92 } else {93 for (int i = coll.Count - 1; i >= 0; i--)94 if (coll[i] != null) executionStack.Push(coll[i]);95 102 } 96 } else if (next is IAtomicOperation) { 97 operation = (IAtomicOperation)next; 98 try { 99 next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken); 100 } 101 catch (Exception ex) { 102 executionStack.Push(operation); 103 if (ex is OperationCanceledException) throw ex; 104 else throw new OperatorExecutionException(operation.Operator, ex); 105 } 106 if (next != null) executionStack.Push(next); 103 }); 104 pollThread.Start(); 107 105 108 if (operation.Operator.Breakpoint) {109 string message = string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName);110 Log.LogMessage(message); 111 throw new OperationCanceledException(message);112 }113 }106 Process p = Process.Start(exec, args); 107 p.WaitForExit(); 108 109 pollThread.Abort(); 110 File.Delete(algFile); 111 File.Delete(resultFile); 114 112 } 115 113 }
Note: See TracChangeset
for help on using the changeset viewer.