Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/17/09 18:07:15 (15 years ago)
Author:
gkronber
Message:

Refactored JobManager and DistributedEngine to fix bugs in the GridExecuter. #644.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.CEDMA.Server/3.3/GridExecuter.cs

    r2053 r2055  
    4343  public class GridExecuter : ExecuterBase {
    4444    private JobManager jobManager;
    45     private Dictionary<WaitHandle, IAlgorithm> activeAlgorithms;
     45    private Dictionary<AsyncGridResult, IAlgorithm> activeAlgorithms;
    4646
    4747    private TimeSpan StartJobInterval {
     
    5656      : base(dispatcher, store) {
    5757      this.jobManager = new JobManager(gridUrl);
    58       activeAlgorithms = new Dictionary<WaitHandle, IAlgorithm>();
     58      activeAlgorithms = new Dictionary<AsyncGridResult, IAlgorithm>();
    5959      jobManager.Reset();
    6060    }
    6161
    6262    protected override void StartJobs() {
    63       List<WaitHandle> wh = new List<WaitHandle>();
    64       Dictionary<WaitHandle, AtomicOperation> activeOperations = new Dictionary<WaitHandle, AtomicOperation>();
     63      Dictionary<WaitHandle, AsyncGridResult> asyncResults = new Dictionary<WaitHandle,AsyncGridResult>();
    6564      while (true) {
    6665        try {
    6766          // start new jobs as long as there are less than MaxActiveJobs
    68           while (wh.Count < MaxActiveJobs) {
     67          while (asyncResults.Count < MaxActiveJobs) {
    6968            Thread.Sleep(StartJobInterval);
    7069            // get an execution from the dispatcher and execute in grid via job-manager
     
    7271            if (algorithm != null) {
    7372              AtomicOperation op = new AtomicOperation(algorithm.Engine.OperatorGraph.InitialOperator, algorithm.Engine.GlobalScope);
    74               WaitHandle opWh = jobManager.BeginExecuteOperation(algorithm.Engine.GlobalScope, op);
    75               wh.Add(opWh);
    76               activeOperations.Add(opWh, op);
     73              AsyncGridResult asyncResult = jobManager.BeginExecuteEngine(new ProcessingEngine(algorithm.Engine.GlobalScope, op));
     74              asyncResults.Add(asyncResult.WaitHandle, asyncResult);
    7775              lock (activeAlgorithms) {
    78                 activeAlgorithms.Add(opWh, algorithm);
     76                activeAlgorithms.Add(asyncResult, algorithm);
    7977              }
    8078            }
    8179          }
    8280          // wait until any job is finished
    83           WaitHandle[] whArr = wh.ToArray();
     81          WaitHandle[] whArr = asyncResults.Keys.ToArray();
    8482          int readyHandleIndex = WaitHandle.WaitAny(whArr, WaitForFinishedJobsTimeout);
    8583          if (readyHandleIndex != WaitHandle.WaitTimeout) {
    8684            WaitHandle readyHandle = whArr[readyHandleIndex];
    87             AtomicOperation finishedOp = activeOperations[readyHandle];
    88             wh.Remove(readyHandle);
    8985            IAlgorithm finishedAlgorithm = null;
     86            AsyncGridResult finishedResult = null;
    9087            lock (activeAlgorithms) {
    91               finishedAlgorithm = activeAlgorithms[readyHandle];
    92               activeAlgorithms.Remove(readyHandle);
     88              finishedResult = asyncResults[readyHandle];
     89              finishedAlgorithm = activeAlgorithms[finishedResult];
     90              activeAlgorithms.Remove(finishedResult);
     91              asyncResults.Remove(readyHandle);
    9392            }
    94             activeOperations.Remove(readyHandle);
    95             readyHandle.Close();
    9693            try {
    97               ProcessingEngine finishedEngine = jobManager.EndExecuteOperation(finishedOp);
     94              IEngine finishedEngine = jobManager.EndExecuteEngine(finishedResult);
     95              SetResults(finishedEngine.GlobalScope, finishedAlgorithm.Engine.GlobalScope);
    9896              StoreResults(finishedAlgorithm);
    9997            }
     
    106104          Trace.WriteLine("CEDMA Executer: Exception in job-management thread. " + ex.Message);
    107105        }
     106      }
     107    }
     108
     109    private void SetResults(IScope src, IScope target) {
     110      foreach (IVariable v in src.Variables) {
     111        target.AddVariable(v);
     112      }
     113      foreach (IScope subScope in src.SubScopes) {
     114        target.AddSubScope(subScope);
     115      }
     116      foreach (KeyValuePair<string, string> alias in src.Aliases) {
     117        target.AddAlias(alias.Key, alias.Value);
    108118      }
    109119    }
Note: See TracChangeset for help on using the changeset viewer.