Free cookie consent management tool by TermsFeed Policy Generator

source: branches/MPI/HeuristicLab.Operators.MPISupport/3.3/MPIUniformSubscopesProcessor.cs @ 13508

Last change on this file since 13508 was 7566, checked in by svonolfe, 13 years ago

Adapted MPI operators (#1542)

File size: 4.8 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using HeuristicLab.Core;
6using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
7using HeuristicLab.Parameters;
8using HeuristicLab.Data;
9using HeuristicLab.Common;
10using MPI;
11using HeuristicLab.Operators.MPISupport.BinaryTransport;
12
13namespace HeuristicLab.Operators.MPISupport {
14  [Item("MPIUniformSubscopesProcessor", "An operator which applies a specified operator on all sub-scopes at the given depth of the current scope.")]
15  [StorableClass]
16  public class MPIUniformSubscopesProcessor : SingleSuccessorOperator {
17    private OperatorParameter OperatorParameter {
18      get { return (OperatorParameter)Parameters["Operator"]; }
19    }
20    public ValueLookupParameter<BoolValue> ParallelParameter {
21      get { return (ValueLookupParameter<BoolValue>)Parameters["Parallel"]; }
22    }
23
24    public IOperator Operator {
25      get { return OperatorParameter.Value; }
26      set { OperatorParameter.Value = value; }
27    }
28    public BoolValue Parallel {
29      get { return ParallelParameter.Value; }
30      set { ParallelParameter.Value = value; }
31    }
32
33    [StorableConstructor]
34    private MPIUniformSubscopesProcessor(bool deserializing) : base(deserializing) { }
35    private MPIUniformSubscopesProcessor(MPIUniformSubscopesProcessor original, Cloner cloner)
36      : base(original, cloner) {
37    }
38    public MPIUniformSubscopesProcessor()
39      : base() {
40      Parameters.Add(new OperatorParameter("Operator", "The operator which should be applied on all sub-scopes of the current scope."));
41      Parameters.Add(new ValueLookupParameter<BoolValue>("Parallel", "True if the operator should be applied in parallel on all sub-scopes, otherwise false.", new BoolValue(false)));
42    }
43
44    public override IDeepCloneable Clone(Cloner cloner) {
45      return new MPIUniformSubscopesProcessor(this, cloner);
46    }
47
48    public override IOperation Apply() {
49      OperationCollection next = new OperationCollection(base.Apply());
50      ScopeList scopes = ExecutionContext.Scope.SubScopes;
51
52      if (Operator != null) {
53        if (MPI.Communicator.world != null && MPI.Communicator.world.Size > 2) {
54          int rank = MPI.Communicator.world.Rank;
55          int size = MPI.Communicator.world.Size -1;
56
57          int count = scopes.Count / size;
58          int start = count * (rank - 1);
59          int end = (rank == size ? scopes.Count : start + count);
60
61          List<Request> requests = new List<Request>();
62          IScope parent = new Scope();
63
64          for (int i = start; i < end; i++) {
65            IAtomicOperation op = ExecutionContext.CreateOperation(Operator, scopes[i]);
66            MPIHelper.Execute(op);
67           
68            //SEND results to other clients
69            parent.SubScopes.Add(MPIHelper.ShallowCopy(scopes[i]));           
70          }
71
72          for (int dest = 1; dest <= size; dest++) {
73            if (dest != rank) {
74              var result = new MPIBinaryTransportWrapper(parent);
75              requests.Add(MPI.Communicator.world.ImmediateSend<MPIBinaryTransportWrapper>(result, dest, 0));
76            }
77          }
78
79          IExecutionContext globalScope = ExecutionContext;
80          while (globalScope.Parent != null) {
81            globalScope = globalScope.Parent;
82          }
83
84          //RECEIVE results from other clients
85          for (int source = 1; source <= size; source++) {
86            if (source != rank) {
87              var result = MPI.Communicator.world.Receive<MPIBinaryTransportWrapper>(source, 0).GetInnerItem(globalScope) as IScope;
88
89              /*Console.WriteLine(result.SubScopes.Count);
90              foreach (IVariable var in result.SubScopes[0].SubScopes[0].Variables) {
91                Console.WriteLine("VAR " + var.Name + " " + var.Value);
92              }*/
93
94              int offset = count * (source - 1);
95              for (int scopeIndex = 0; scopeIndex < result.SubScopes.Count; scopeIndex++) {
96                scopes[scopeIndex + offset] = result.SubScopes[scopeIndex];
97              }
98
99              /*for (int scopeIndex = 0; scopeIndex < count; scopeIndex++) {
100                IScope root = new Scope();
101                root.SubScopes.Add(scopes[scopeIndex + offset]);
102                scopes[scopeIndex + offset] = root;
103              }*/
104            }
105          }
106
107          foreach (Request request in requests) {
108            request.Wait();
109          }
110        } else {
111          OperationCollection inner = new OperationCollection();
112          inner.Parallel = Parallel == null ? false : Parallel.Value;
113          for (int i = 0; i < scopes.Count; i++) {
114            inner.Add(ExecutionContext.CreateOperation(Operator, scopes[i]));
115          }
116          next.Insert(0, inner);
117        }
118      }
119
120      return next;
121    }
122  }
123}
Note: See TracBrowser for help on using the repository browser.