Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/17/09 18:07:15 (15 years ago)
Author:
gkronber
Message:

Refactored JobManager and DistributedEngine to fix bugs in the GridExecuter. #644.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/3.2/DistributedEngine.cs

    r1529 r2055  
    4242      get { return serverAddress; }
    4343      set {
    44         if(value != serverAddress) {
     44        if (value != serverAddress) {
    4545          serverAddress = value;
    4646        }
     
    7272
    7373    public override void Execute() {
    74       if(jobManager == null) this.jobManager = new JobManager(serverAddress);
     74      if (jobManager == null) this.jobManager = new JobManager(serverAddress);
    7575      jobManager.Reset();
    7676      base.Execute();
     
    8282
    8383    protected override void ProcessNextOperation() {
    84       if(suspendedEngines.Count > 0) {
     84      if (suspendedEngines.Count > 0) {
    8585        ProcessSuspendedEngines();
    8686      } else {
     
    9191
    9292    private void ProcessSuspendedEngines() {
    93       WaitHandle[] waitHandles = new WaitHandle[suspendedEngines.Count];
     93      AsyncGridResult[] asyncResults = new AsyncGridResult[suspendedEngines.Count];
    9494      int i = 0;
    95       foreach(KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
    96         waitHandles[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key);
    97       }
    98       WaitForAll(waitHandles);
     95      foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
     96        asyncResults[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key);
     97      }
     98      WaitForAll(asyncResults);
    9999      // collect results
    100100      List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
    101101      try {
    102         foreach(KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
     102        int resultIndex = 0;
     103        foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
    103104          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
    104               jobManager.EndExecuteOperation(suspendedPair.Value),
     105              (ProcessingEngine)jobManager.EndExecuteEngine(asyncResults[resultIndex++]),
    105106              suspendedPair.Value);
    106107          results.Add(p);
    107108        }
    108       } catch(Exception e) {
     109      }
     110      catch (Exception e) {
    109111        // this exception means there was a problem with the underlying communication infrastructure
    110112        // -> show message dialog and abort engine
     
    118120
    119121    private void ProcessOperation(IOperation operation) {
    120       if(operation is AtomicOperation) {
     122      if (operation is AtomicOperation) {
    121123        AtomicOperation atomicOperation = (AtomicOperation)operation;
    122124        IOperation next = null;
    123125        try {
    124126          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
    125         } catch(Exception ex) {
     127        }
     128        catch (Exception ex) {
    126129          // push operation on stack again
    127130          myExecutionStack.Push(atomicOperation);
     
    129132          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    130133        }
    131         if(next != null)
     134        if (next != null)
    132135          myExecutionStack.Push(next);
    133136        OnOperationExecuted(atomicOperation);
    134         if(atomicOperation.Operator.Breakpoint) Abort();
    135       } else if(operation is CompositeOperation) {
     137        if (atomicOperation.Operator.Breakpoint) Abort();
     138      } else if (operation is CompositeOperation) {
    136139        CompositeOperation compositeOperation = (CompositeOperation)operation;
    137         if(compositeOperation.ExecuteInParallel) {
     140        if (compositeOperation.ExecuteInParallel) {
    138141          ProcessParallelOperation(compositeOperation);
    139142          OnOperationExecuted(compositeOperation);
    140143        } else {
    141           for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
     144          for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
    142145            myExecutionStack.Push(compositeOperation.Operations[i]);
    143146        }
     
    147150    private void ProcessParallelOperation(CompositeOperation compositeOperation) {
    148151      // send operations to grid
    149       WaitHandle[] waithandles = BeginExecuteOperations(compositeOperation);
    150       WaitForAll(waithandles);
     152      AsyncGridResult[] asyncResults = BeginExecuteOperations(compositeOperation);
     153      WaitForAll(asyncResults);
    151154      // collect results
    152155      List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
    153156      try {
    154         foreach(AtomicOperation parOperation in compositeOperation.Operations) {
    155           KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
    156             jobManager.EndExecuteOperation(parOperation), parOperation);
    157           results.Add(p);
    158         }
    159       } catch(Exception e) {
     157        int i = 0;
     158        foreach (AtomicOperation parOperation in compositeOperation.Operations) {
     159          results.Add(new KeyValuePair<ProcessingEngine, AtomicOperation>(
     160              (ProcessingEngine)jobManager.EndExecuteEngine(asyncResults[i++]), parOperation));
     161        }
     162      }
     163      catch (Exception e) {
    160164        // this exception means there was a problem with the underlying communication infrastructure
    161165        // -> show message dialog, abort engine, requeue the whole composite operation again and return
     
    169173    }
    170174
    171     private WaitHandle[] BeginExecuteOperations(CompositeOperation compositeOperation) {
    172       WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
     175    private AsyncGridResult[] BeginExecuteOperations(CompositeOperation compositeOperation) {
     176      AsyncGridResult[] asyncResults = new AsyncGridResult[compositeOperation.Operations.Count];
    173177      int i = 0;
    174178      // HACK: assume that all atomicOperations have the same parent scope.
     
    182186      PruneToParentScope(GlobalScope, parentScope, prunedScopes);
    183187      List<IScope> subScopes = new List<IScope>(parentScope.SubScopes);
    184       foreach(IScope scope in subScopes) {
     188      foreach (IScope scope in subScopes) {
    185189        parentScope.RemoveSubScope(scope);
    186190      }
    187191      // start all parallel jobs
    188       foreach(AtomicOperation parOperation in compositeOperation.Operations) {
     192      foreach (AtomicOperation parOperation in compositeOperation.Operations) {
    189193        parentScope.AddSubScope(parOperation.Scope);
    190         waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation);
     194        asyncResults[i++] = jobManager.BeginExecuteEngine(new ProcessingEngine(GlobalScope, parOperation));
    191195        parentScope.RemoveSubScope(parOperation.Scope);
    192196      }
    193       foreach(IScope scope in subScopes) {
     197      foreach (IScope scope in subScopes) {
    194198        parentScope.AddSubScope(scope);
    195199      }
     
    197201      RestoreFullTree(GlobalScope, prunedScopes);
    198202
    199       return waithandles;
    200     }
    201 
    202     private void WaitForAll(WaitHandle[] waithandles) {
     203      return asyncResults;
     204    }
     205
     206    private void WaitForAll(AsyncGridResult[] asyncResults) {
    203207      // wait until all jobs are finished
    204208      // WaitAll works only with maximally 64 waithandles
    205       if(waithandles.Length <= 64) {
    206         WaitHandle.WaitAll(waithandles);
     209      if (asyncResults.Length <= 64) {
     210        WaitHandle[] waitHandles = new WaitHandle[asyncResults.Length];
     211        for (int i = 0; i < asyncResults.Length; i++) {
     212          waitHandles[i] = asyncResults[i].WaitHandle;
     213        }
     214        WaitHandle.WaitAll(waitHandles);
    207215      } else {
    208216        int i;
    209         for(i = 0; i < waithandles.Length; i++) {
    210           waithandles[i].WaitOne();
    211           waithandles[i].Close();
     217        for (i = 0; i < asyncResults.Length; i++) {
     218          asyncResults[i].WaitHandle.WaitOne();
    212219        }
    213220      }
     
    221228      suspendedEngines.Clear();
    222229      // retrieve results and merge into scope-tree
    223       foreach(KeyValuePair<ProcessingEngine, AtomicOperation> p in results) {
     230      foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in results) {
    224231        ProcessingEngine resultEngine = p.Key;
    225232        AtomicOperation parOperation = p.Value;
    226         if(resultEngine.Canceled && !resultEngine.Suspended) {
     233        if (resultEngine.Canceled && !resultEngine.Suspended) {
    227234          // when an engine was canceled but not suspended this means there was a problem
    228235          // show error message and queue the operation for restart (again parallel)
     
    230237          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(new JobExecutionException(resultEngine.ErrorMessage)); });
    231238          canceledOperations.AddOperation(parOperation);
    232         } else if(resultEngine.Suspended) {
     239        } else if (resultEngine.Suspended) {
    233240          // when the engine was suspended it means it was stopped because of a breakpoint
    234241          // -> merge the partial results and queue the engine (which has operations remaining in the execution stack) to be resumed (parallel)
     
    243250      }
    244251      // if there were exceptions -> abort
    245       if(canceledOperations.Operations.Count > 0) {
     252      if (canceledOperations.Operations.Count > 0) {
    246253        // requeue the aborted operations
    247254        myExecutionStack.Push(canceledOperations);
     
    249256      }
    250257      // if there were breakpoints -> abort
    251       if(suspendedEngines.Count > 0) {
     258      if (suspendedEngines.Count > 0) {
    252259        Abort();
    253260      }
     
    255262
    256263    private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) {
    257       if(savedScopes.Count == 0) return;
     264      if (savedScopes.Count == 0) return;
    258265      IScope remainingBranch = currentScope.SubScopes[0];
    259266      currentScope.RemoveSubScope(remainingBranch);
    260267      IList<IScope> savedScopesForCurrent = savedScopes[0];
    261       foreach(IScope savedScope in savedScopesForCurrent) {
     268      foreach (IScope savedScope in savedScopesForCurrent) {
    262269        currentScope.AddSubScope(savedScope);
    263270      }
     
    267274
    268275    private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) {
    269       if(currentScope == scope) return currentScope;
    270       if(currentScope.SubScopes.Count == 0) return null;
     276      if (currentScope == scope) return currentScope;
     277      if (currentScope.SubScopes.Count == 0) return null;
    271278      IScope foundScope = null;
    272279      // try to find the searched scope in all my sub-scopes
    273       foreach(IScope subScope in currentScope.SubScopes) {
     280      foreach (IScope subScope in currentScope.SubScopes) {
    274281        foundScope = PruneToParentScope(subScope, scope, prunedScopes);
    275         if(foundScope != null) break; // we can stop as soon as we find the scope in a branch
    276       }
    277       if(foundScope != null) { // when we found the scopes in my sub-scopes
     282        if (foundScope != null) break; // we can stop as soon as we find the scope in a branch
     283      }
     284      if (foundScope != null) { // when we found the scopes in my sub-scopes
    278285        List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes
    279286        prunedScopes.Add(subScopes);
    280287        // remove all my sub-scopes
    281         foreach(IScope subScope in subScopes) {
     288        foreach (IScope subScope in subScopes) {
    282289          currentScope.RemoveSubScope(subScope);
    283290        }
     
    291298
    292299    private IScope FindParentScope(IScope currentScope, IScope childScope) {
    293       if(currentScope.SubScopes.Contains(childScope)) return currentScope;
    294       foreach(IScope subScope in currentScope.SubScopes) {
     300      if (currentScope.SubScopes.Contains(childScope)) return currentScope;
     301      foreach (IScope subScope in currentScope.SubScopes) {
    295302        IScope result = FindParentScope(subScope, childScope);
    296         if(result != null) return result;
     303        if (result != null) return result;
    297304      }
    298305      return null;
     
    302309      // merge the results
    303310      original.Clear();
    304       foreach(IVariable variable in result.Variables) {
     311      foreach (IVariable variable in result.Variables) {
    305312        original.AddVariable(variable);
    306313      }
    307       foreach(IScope subScope in result.SubScopes) {
     314      foreach (IScope subScope in result.SubScopes) {
    308315        original.AddSubScope(subScope);
    309316      }
    310       foreach(KeyValuePair<string, string> alias in result.Aliases) {
     317      foreach (KeyValuePair<string, string> alias in result.Aliases) {
    311318        original.AddAlias(alias.Key, alias.Value);
    312319      }
     
    319326      addressAttribute.Value = ServerAddress;
    320327      node.Attributes.Append(addressAttribute);
    321       if(suspendedEngines.Count > 0) {
     328      if (suspendedEngines.Count > 0) {
    322329        XmlNode suspendedEnginesNode = document.CreateElement("SuspendedEngines");
    323         foreach(KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) {
     330        foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) {
    324331          XmlNode n = document.CreateElement("Entry");
    325332          n.AppendChild(PersistenceManager.Persist(p.Key, document, persistedObjects));
     
    335342      ServerAddress = node.Attributes["ServerAddress"].Value;
    336343      XmlNode suspendedEnginesNode = node.SelectSingleNode("SuspendedEngines");
    337       if(suspendedEnginesNode != null) {
    338         foreach(XmlNode n in suspendedEnginesNode.ChildNodes) {
     344      if (suspendedEnginesNode != null) {
     345        foreach (XmlNode n in suspendedEnginesNode.ChildNodes) {
    339346          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
    340347            (ProcessingEngine)PersistenceManager.Restore(n.ChildNodes[0], restoredObjects),
Note: See TracChangeset for help on using the changeset viewer.