1 | using System;
2 | using System.Collections.Generic;
3 | using System.Linq;
4 | using System.Text;
5 | using HeuristicLab.Core;
6 | using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
7 | using HeuristicLab.Parameters;
8 | using HeuristicLab.Data;
9 | using HeuristicLab.Common;
10 | using MPI;
11 | using HeuristicLab.Operators.MPISupport.BinaryTransport;
12 |
13 | namespace HeuristicLab.Operators.MPISupport {
14 | [Item("MPIUniformSubscopesProcessor", "An operator which applies a specified operator on all sub-scopes at the given depth of the current scope.")]
15 | [StorableClass]
16 | public class MPIUniformSubscopesProcessor : SingleSuccessorOperator {
17 | private OperatorParameter OperatorParameter {
18 | get { return (OperatorParameter)Parameters["Operator"]; }
19 | }
20 | public ValueLookupParameter<BoolValue> ParallelParameter {
21 | get { return (ValueLookupParameter<BoolValue>)Parameters["Parallel"]; }
22 | }
23 |
24 | public IOperator Operator {
25 | get { return OperatorParameter.Value; }
26 | set { OperatorParameter.Value = value; }
27 | }
28 | public BoolValue Parallel {
29 | get { return ParallelParameter.Value; }
30 | set { ParallelParameter.Value = value; }
31 | }
32 |
33 | [StorableConstructor]
34 | private MPIUniformSubscopesProcessor(bool deserializing) : base(deserializing) { }
35 | private MPIUniformSubscopesProcessor(MPIUniformSubscopesProcessor original, Cloner cloner)
36 | : base(original, cloner) {
37 | }
38 | public MPIUniformSubscopesProcessor()
39 | : base() {
40 | Parameters.Add(new OperatorParameter("Operator", "The operator which should be applied on all sub-scopes of the current scope."));
41 | Parameters.Add(new ValueLookupParameter<BoolValue>("Parallel", "True if the operator should be applied in parallel on all sub-scopes, otherwise false.", new BoolValue(false)));
42 | }
43 |
44 | public override IDeepCloneable Clone(Cloner cloner) {
45 | return new MPIUniformSubscopesProcessor(this, cloner);
46 | }
47 |
48 | public override IOperation Apply() {
49 | OperationCollection next = new OperationCollection(base.Apply());
50 | ScopeList scopes = ExecutionContext.Scope.SubScopes;
51 |
52 | if (Operator != null) {
53 | if (MPI.Communicator.world != null && MPI.Communicator.world.Size > 2) {
54 | int rank = MPI.Communicator.world.Rank;
55 | int size = MPI.Communicator.world.Size -1;
56 |
57 | int count = scopes.Count / size;
58 | int start = count * (rank - 1);
59 | int end = (rank == size ? scopes.Count : start + count);
60 |
61 | List<Request> requests = new List<Request>();
62 | IScope parent = new Scope();
63 |
64 | for (int i = start; i < end; i++) {
65 | IAtomicOperation op = ExecutionContext.CreateOperation(Operator, scopes[i]);
66 | MPIHelper.Execute(op);
67 |
68 | //SEND results to other clients
69 | parent.SubScopes.Add(MPIHelper.ShallowCopy(scopes[i]));
70 | }
71 |
72 | for (int dest = 1; dest <= size; dest++) {
73 | if (dest != rank) {
74 | var result = new MPIBinaryTransportWrapper(parent);
75 | requests.Add(MPI.Communicator.world.ImmediateSend<MPIBinaryTransportWrapper>(result, dest, 0));
76 | }
77 | }
78 |
79 | IExecutionContext globalScope = ExecutionContext;
80 | while (globalScope.Parent != null) {
81 | globalScope = globalScope.Parent;
82 | }
83 |
84 | //RECEIVE results from other clients
85 | for (int source = 1; source <= size; source++) {
86 | if (source != rank) {
87 | var result = MPI.Communicator.world.Receive<MPIBinaryTransportWrapper>(source, 0).GetInnerItem(globalScope) as IScope;
88 |
89 | /*Console.WriteLine(result.SubScopes.Count);
90 | foreach (IVariable var in result.SubScopes[0].SubScopes[0].Variables) {
91 | Console.WriteLine("VAR " + var.Name + " " + var.Value);
92 | }*/
93 |
94 | int offset = count * (source - 1);
95 | for (int scopeIndex = 0; scopeIndex < result.SubScopes.Count; scopeIndex++) {
96 | scopes[scopeIndex + offset] = result.SubScopes[scopeIndex];
97 | }
98 |
99 | /*for (int scopeIndex = 0; scopeIndex < count; scopeIndex++) {
100 | IScope root = new Scope();
101 | root.SubScopes.Add(scopes[scopeIndex + offset]);
102 | scopes[scopeIndex + offset] = root;
103 | }*/
104 | }
105 | }
106 |
107 | foreach (Request request in requests) {
108 | request.Wait();
109 | }
110 | } else {
111 | OperationCollection inner = new OperationCollection();
112 | inner.Parallel = Parallel == null ? false : Parallel.Value;
113 | for (int i = 0; i < scopes.Count; i++) {
114 | inner.Add(ExecutionContext.CreateOperation(Operator, scopes[i]));
115 | }
116 | next.Insert(0, inner);
117 | }
118 | }
119 |
120 | return next;
121 | }
122 | }
123 | }