Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
07/31/08 13:08:41 (16 years ago)
Author:
gkronber
Message:

implemented #216 (ProcessingEngine should terminate on breakpoints)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs

    r413 r414  
    3636namespace HeuristicLab.DistributedEngine {
    3737  public class DistributedEngine : EngineBase, IEditable {
     38    private List<KeyValuePair<ProcessingEngine, AtomicOperation>> suspendedEngines = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
    3839    private JobManager jobManager;
    3940    private string serverAddress;
     
    5960    }
    6061
     62    public override bool Terminated {
     63      get {
     64        return base.Terminated && suspendedEngines.Count == 0;
     65      }
     66    }
     67
     68    public override void Reset() {
     69      suspendedEngines.Clear();
     70      base.Reset();
     71    }
     72
    6173    public override void Execute() {
    6274      if(jobManager == null) this.jobManager = new JobManager(serverAddress);
     
    7082
    7183    protected override void ProcessNextOperation() {
    72       IOperation operation = myExecutionStack.Pop();
     84      if(suspendedEngines.Count > 0) {
     85        ProcessSuspendedEngines();
     86      } else {
     87        IOperation operation = myExecutionStack.Pop();
     88        ProcessOperation(operation);
     89      }
     90    }
     91
     92    private void ProcessSuspendedEngines() {
     93      WaitHandle[] waitHandles = new WaitHandle[suspendedEngines.Count];
     94      int i = 0;
     95      foreach(KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
     96        waitHandles[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key);
     97      }
     98      WaitForAll(waitHandles);
     99      // collect results
     100      List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
     101      try {
     102        foreach(KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
     103          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
     104              jobManager.EndExecuteOperation(suspendedPair.Value),
     105              suspendedPair.Value);
     106          results.Add(p);
     107        }
     108      } catch(Exception e) {
     109        // this exception means there was a problem with the underlying communication infrastructure
     110        // -> show message dialog and abort engine
     111        Abort();
     112        ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
     113        return;
     114      }
     115      // got all result engines without an exception -> merge results
     116      ProcessResults(results);
     117    }
     118
     119    private void ProcessOperation(IOperation operation) {
    73120      if(operation is AtomicOperation) {
    74121        AtomicOperation atomicOperation = (AtomicOperation)operation;
     
    89136        CompositeOperation compositeOperation = (CompositeOperation)operation;
    90137        if(compositeOperation.ExecuteInParallel) {
    91           try {
    92             WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
    93             int i = 0;
    94             // HACK: assume that all atomicOperations have the same parent scope.
    95             // 1) find that parent scope
    96             // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation
    97             // 3) keep the branches to 'repair' the scope-tree later
    98             // 4) for each parallel job attach only the sub-scope that this operation uses
    99             // 5) after starting all parallel jobs restore the whole scope-tree
    100             IScope parentScope = FindParentScope(GlobalScope, ((AtomicOperation)compositeOperation.Operations[0]).Scope);
    101             List<IList<IScope>> prunedScopes = new List<IList<IScope>>();
    102             PruneToParentScope(GlobalScope, parentScope, prunedScopes);
    103             List<IScope> subScopes = new List<IScope>(parentScope.SubScopes);
    104             foreach(IScope scope in subScopes) {
    105               parentScope.RemoveSubScope(scope);
    106             }
    107             // start all parallel jobs
    108             foreach(AtomicOperation parOperation in compositeOperation.Operations) {
    109               parentScope.AddSubScope(parOperation.Scope);
    110               waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation);
    111               parentScope.RemoveSubScope(parOperation.Scope);
    112             }
    113             foreach(IScope scope in subScopes) {
    114               parentScope.AddSubScope(scope);
    115             }
    116             prunedScopes.Reverse();
    117             RestoreFullTree(GlobalScope, prunedScopes);
    118 
    119             // wait until all jobs are finished
    120             // WaitAll works only with maximally 64 waithandles
    121             if(waithandles.Length <= 64) {
    122               WaitHandle.WaitAll(waithandles);
    123             } else {
    124               for(i = 0; i < waithandles.Length; i++) {
    125                 waithandles[i].WaitOne();
    126                 waithandles[i].Close();
    127               }
    128             }
    129 
    130             CompositeOperation canceledOperations = new CompositeOperation();
    131             canceledOperations.ExecuteInParallel = true;
    132             // retrieve results and merge into scope-tree
    133             foreach(AtomicOperation parOperation in compositeOperation.Operations) {
    134               ProcessingEngine resultEngine = jobManager.EndExecuteOperation(parOperation);
    135               if(resultEngine.Canceled) {
    136                 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(new JobExecutionException(resultEngine.ErrorMessage)); });
    137                 canceledOperations.AddOperation(parOperation);
    138               } else {
    139                 // if everything went fine we can merge the results into our local scope-tree
    140                 MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope);
    141               }
    142             }
    143 
    144             if(canceledOperations.Operations.Count > 0) {
    145               myExecutionStack.Push(canceledOperations);
    146               Abort();
    147             }
    148           } catch(Exception e) {
    149             myExecutionStack.Push(compositeOperation);
    150             Abort();
    151             ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
    152           }
     138          ProcessParallelOperation(compositeOperation);
    153139          OnOperationExecuted(compositeOperation);
    154140        } else {
     
    156142            myExecutionStack.Push(compositeOperation.Operations[i]);
    157143        }
     144      }
     145    }
     146
     147    private void ProcessParallelOperation(CompositeOperation compositeOperation) {
     148      // send operations to grid
     149      WaitHandle[] waithandles = BeginExecuteOperations(compositeOperation);
     150      WaitForAll(waithandles);
     151      // collect results
     152      List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
     153      try {
     154        foreach(AtomicOperation parOperation in compositeOperation.Operations) {
     155          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
     156            jobManager.EndExecuteOperation(parOperation), parOperation);
     157          results.Add(p);
     158        }
     159      } catch(Exception e) {
     160        // this exception means there was a problem with the underlying communication infrastructure
     161        // -> show message dialog, abort engine, requeue the whole composite operation again and return
     162        myExecutionStack.Push(compositeOperation);
     163        Abort();
     164        ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
     165        return;
     166      }
     167      // got all result engines without an exception -> merge results
     168      ProcessResults(results);
     169    }
     170
     171    private WaitHandle[] BeginExecuteOperations(CompositeOperation compositeOperation) {
     172      WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
     173      int i = 0;
     174      // HACK: assume that all atomicOperations have the same parent scope.
     175      // 1) find that parent scope
     176      // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation
     177      // 3) keep the branches to 'repair' the scope-tree later
     178      // 4) for each parallel job attach only the sub-scope that this operation uses
     179      // 5) after starting all parallel jobs restore the whole scope-tree
     180      IScope parentScope = FindParentScope(GlobalScope, ((AtomicOperation)compositeOperation.Operations[0]).Scope);
     181      List<IList<IScope>> prunedScopes = new List<IList<IScope>>();
     182      PruneToParentScope(GlobalScope, parentScope, prunedScopes);
     183      List<IScope> subScopes = new List<IScope>(parentScope.SubScopes);
     184      foreach(IScope scope in subScopes) {
     185        parentScope.RemoveSubScope(scope);
     186      }
     187      // start all parallel jobs
     188      foreach(AtomicOperation parOperation in compositeOperation.Operations) {
     189        parentScope.AddSubScope(parOperation.Scope);
     190        waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation);
     191        parentScope.RemoveSubScope(parOperation.Scope);
     192      }
     193      foreach(IScope scope in subScopes) {
     194        parentScope.AddSubScope(scope);
     195      }
     196      prunedScopes.Reverse();
     197      RestoreFullTree(GlobalScope, prunedScopes);
     198
     199      return waithandles;
     200    }
     201
     202    private void WaitForAll(WaitHandle[] waithandles) {
     203      // wait until all jobs are finished
     204      // WaitAll works only with maximally 64 waithandles
     205      if(waithandles.Length <= 64) {
     206        WaitHandle.WaitAll(waithandles);
     207      } else {
     208        int i;
     209        for(i = 0; i < waithandles.Length; i++) {
     210          waithandles[i].WaitOne();
     211          waithandles[i].Close();
     212        }
     213      }
     214    }
     215
     216    private void ProcessResults(List<KeyValuePair<ProcessingEngine, AtomicOperation>> results) {
     217      // create a new compositeOperation to hold canceled operations that should be restarted
     218      CompositeOperation canceledOperations = new CompositeOperation();
     219      canceledOperations.ExecuteInParallel = true;
     220
     221      suspendedEngines.Clear();
     222      // retrieve results and merge into scope-tree
     223      foreach(KeyValuePair<ProcessingEngine, AtomicOperation> p in results) {
     224        ProcessingEngine resultEngine = p.Key;
     225        AtomicOperation parOperation = p.Value;
     226        if(resultEngine.Canceled && !resultEngine.Suspended) {
     227          // when an engine was canceled but not suspended this means there was a problem
     228          // show error message and queue the operation for restart (again parallel)
     229          // but don't merge the results of the aborted engine
     230          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(new JobExecutionException(resultEngine.ErrorMessage)); });
     231          canceledOperations.AddOperation(parOperation);
     232        } else if(resultEngine.Suspended) {
     233          // when the engine was suspended it means it was stopped because of a breakpoint
     234          // -> merge the partial results and queue the engine (which has operations remaining in the execution stack) to be resumed (parallel)
     235          MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope);
     236          resultEngine.InitialOperation = parOperation;
     237          suspendedEngines.Add(new KeyValuePair<ProcessingEngine, AtomicOperation>(resultEngine, parOperation));
     238        } else {
     239          // engine is finished ->
     240          // simply merge the results into our local scope-tree
     241          MergeScope(parOperation.Scope, resultEngine.InitialOperation.Scope);
     242        }
     243      }
     244      // if there were exceptions -> abort
     245      if(canceledOperations.Operations.Count > 0) {
     246        // requeue the aborted operations
     247        myExecutionStack.Push(canceledOperations);
     248        Abort();
     249      }
     250      // if there were breakpoints -> abort
     251      if(suspendedEngines.Count > 0) {
     252        Abort();
    158253      }
    159254    }
     
    223318      XmlAttribute addressAttribute = document.CreateAttribute("ServerAddress");
    224319      addressAttribute.Value = ServerAddress;
    225       node.Attributes.Append(addressAttribute);
     320      if(suspendedEngines.Count > 0) {
     321        node.Attributes.Append(addressAttribute);
     322        XmlNode suspendedEnginesNode = document.CreateElement("SuspendedEngines");
     323        foreach(KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) {
     324          XmlNode n = document.CreateElement("Entry");
     325          n.AppendChild(PersistenceManager.Persist(p.Key, document, persistedObjects));
     326          n.AppendChild(PersistenceManager.Persist(p.Value, document, persistedObjects));
     327          suspendedEnginesNode.AppendChild(n);
     328        }
     329        node.AppendChild(suspendedEnginesNode);
     330      }
    226331      return node;
    227332    }
     
    229334      base.Populate(node, restoredObjects);
    230335      ServerAddress = node.Attributes["ServerAddress"].Value;
     336      XmlNode suspendedEnginesNode = node.SelectSingleNode("SuspendedEngines");
     337      if(suspendedEnginesNode != null) {
     338        foreach(XmlNode n in suspendedEnginesNode.ChildNodes) {
     339          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
     340            (ProcessingEngine)PersistenceManager.Restore(n.ChildNodes[0], restoredObjects),
     341            (AtomicOperation)PersistenceManager.Restore(n.ChildNodes[1], restoredObjects));
     342          suspendedEngines.Add(p);
     343        }
     344      }
    231345    }
    232346    #endregion
Note: See TracChangeset for help on using the changeset viewer.