Changeset 2055


Ignore:
Timestamp:
06/17/09 18:07:15 (12 years ago)
Author:
gkronber
Message:

Refactored JobManager and DistributedEngine to fix bugs in the GridExecuter. #644.

Location:
trunk/sources
Files:
1 added
6 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.CEDMA.Server/3.3/GridExecuter.cs

    r2053 r2055  
    4343  public class GridExecuter : ExecuterBase {
    4444    private JobManager jobManager;
    45     private Dictionary<WaitHandle, IAlgorithm> activeAlgorithms;
     45    private Dictionary<AsyncGridResult, IAlgorithm> activeAlgorithms;
    4646
    4747    private TimeSpan StartJobInterval {
     
    5656      : base(dispatcher, store) {
    5757      this.jobManager = new JobManager(gridUrl);
    58       activeAlgorithms = new Dictionary<WaitHandle, IAlgorithm>();
     58      activeAlgorithms = new Dictionary<AsyncGridResult, IAlgorithm>();
    5959      jobManager.Reset();
    6060    }
    6161
    6262    protected override void StartJobs() {
    63       List<WaitHandle> wh = new List<WaitHandle>();
    64       Dictionary<WaitHandle, AtomicOperation> activeOperations = new Dictionary<WaitHandle, AtomicOperation>();
     63      Dictionary<WaitHandle, AsyncGridResult> asyncResults = new Dictionary<WaitHandle,AsyncGridResult>();
    6564      while (true) {
    6665        try {
    6766          // start new jobs as long as there are less than MaxActiveJobs
    68           while (wh.Count < MaxActiveJobs) {
     67          while (asyncResults.Count < MaxActiveJobs) {
    6968            Thread.Sleep(StartJobInterval);
    7069            // get an execution from the dispatcher and execute in grid via job-manager
     
    7271            if (algorithm != null) {
    7372              AtomicOperation op = new AtomicOperation(algorithm.Engine.OperatorGraph.InitialOperator, algorithm.Engine.GlobalScope);
    74               WaitHandle opWh = jobManager.BeginExecuteOperation(algorithm.Engine.GlobalScope, op);
    75               wh.Add(opWh);
    76               activeOperations.Add(opWh, op);
     73              AsyncGridResult asyncResult = jobManager.BeginExecuteEngine(new ProcessingEngine(algorithm.Engine.GlobalScope, op));
     74              asyncResults.Add(asyncResult.WaitHandle, asyncResult);
    7775              lock (activeAlgorithms) {
    78                 activeAlgorithms.Add(opWh, algorithm);
     76                activeAlgorithms.Add(asyncResult, algorithm);
    7977              }
    8078            }
    8179          }
    8280          // wait until any job is finished
    83           WaitHandle[] whArr = wh.ToArray();
     81          WaitHandle[] whArr = asyncResults.Keys.ToArray();
    8482          int readyHandleIndex = WaitHandle.WaitAny(whArr, WaitForFinishedJobsTimeout);
    8583          if (readyHandleIndex != WaitHandle.WaitTimeout) {
    8684            WaitHandle readyHandle = whArr[readyHandleIndex];
    87             AtomicOperation finishedOp = activeOperations[readyHandle];
    88             wh.Remove(readyHandle);
    8985            IAlgorithm finishedAlgorithm = null;
     86            AsyncGridResult finishedResult = null;
    9087            lock (activeAlgorithms) {
    91               finishedAlgorithm = activeAlgorithms[readyHandle];
    92               activeAlgorithms.Remove(readyHandle);
     88              finishedResult = asyncResults[readyHandle];
     89              finishedAlgorithm = activeAlgorithms[finishedResult];
     90              activeAlgorithms.Remove(finishedResult);
     91              asyncResults.Remove(readyHandle);
    9392            }
    94             activeOperations.Remove(readyHandle);
    95             readyHandle.Close();
    9693            try {
    97               ProcessingEngine finishedEngine = jobManager.EndExecuteOperation(finishedOp);
     94              IEngine finishedEngine = jobManager.EndExecuteEngine(finishedResult);
     95              SetResults(finishedEngine.GlobalScope, finishedAlgorithm.Engine.GlobalScope);
    9896              StoreResults(finishedAlgorithm);
    9997            }
     
    106104          Trace.WriteLine("CEDMA Executer: Exception in job-management thread. " + ex.Message);
    107105        }
     106      }
     107    }
     108
     109    private void SetResults(IScope src, IScope target) {
     110      foreach (IVariable v in src.Variables) {
     111        target.AddVariable(v);
     112      }
     113      foreach (IScope subScope in src.SubScopes) {
     114        target.AddSubScope(subScope);
     115      }
     116      foreach (KeyValuePair<string, string> alias in src.Aliases) {
     117        target.AddAlias(alias.Key, alias.Value);
    108118      }
    109119    }
  • trunk/sources/HeuristicLab.DistributedEngine/3.2/DistributedEngine.cs

    r1529 r2055  
    4242      get { return serverAddress; }
    4343      set {
    44         if(value != serverAddress) {
     44        if (value != serverAddress) {
    4545          serverAddress = value;
    4646        }
     
    7272
    7373    public override void Execute() {
    74       if(jobManager == null) this.jobManager = new JobManager(serverAddress);
     74      if (jobManager == null) this.jobManager = new JobManager(serverAddress);
    7575      jobManager.Reset();
    7676      base.Execute();
     
    8282
    8383    protected override void ProcessNextOperation() {
    84       if(suspendedEngines.Count > 0) {
     84      if (suspendedEngines.Count > 0) {
    8585        ProcessSuspendedEngines();
    8686      } else {
     
    9191
    9292    private void ProcessSuspendedEngines() {
    93       WaitHandle[] waitHandles = new WaitHandle[suspendedEngines.Count];
     93      AsyncGridResult[] asyncResults = new AsyncGridResult[suspendedEngines.Count];
    9494      int i = 0;
    95       foreach(KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
    96         waitHandles[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key);
    97       }
    98       WaitForAll(waitHandles);
     95      foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
     96        asyncResults[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key);
     97      }
     98      WaitForAll(asyncResults);
    9999      // collect results
    100100      List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
    101101      try {
    102         foreach(KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
     102        int resultIndex = 0;
     103        foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {
    103104          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
    104               jobManager.EndExecuteOperation(suspendedPair.Value),
     105              (ProcessingEngine)jobManager.EndExecuteEngine(asyncResults[resultIndex++]),
    105106              suspendedPair.Value);
    106107          results.Add(p);
    107108        }
    108       } catch(Exception e) {
     109      }
     110      catch (Exception e) {
    109111        // this exception means there was a problem with the underlying communication infrastructure
    110112        // -> show message dialog and abort engine
     
    118120
    119121    private void ProcessOperation(IOperation operation) {
    120       if(operation is AtomicOperation) {
     122      if (operation is AtomicOperation) {
    121123        AtomicOperation atomicOperation = (AtomicOperation)operation;
    122124        IOperation next = null;
    123125        try {
    124126          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
    125         } catch(Exception ex) {
     127        }
     128        catch (Exception ex) {
    126129          // push operation on stack again
    127130          myExecutionStack.Push(atomicOperation);
     
    129132          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    130133        }
    131         if(next != null)
     134        if (next != null)
    132135          myExecutionStack.Push(next);
    133136        OnOperationExecuted(atomicOperation);
    134         if(atomicOperation.Operator.Breakpoint) Abort();
    135       } else if(operation is CompositeOperation) {
     137        if (atomicOperation.Operator.Breakpoint) Abort();
     138      } else if (operation is CompositeOperation) {
    136139        CompositeOperation compositeOperation = (CompositeOperation)operation;
    137         if(compositeOperation.ExecuteInParallel) {
     140        if (compositeOperation.ExecuteInParallel) {
    138141          ProcessParallelOperation(compositeOperation);
    139142          OnOperationExecuted(compositeOperation);
    140143        } else {
    141           for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
     144          for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
    142145            myExecutionStack.Push(compositeOperation.Operations[i]);
    143146        }
     
    147150    private void ProcessParallelOperation(CompositeOperation compositeOperation) {
    148151      // send operations to grid
    149       WaitHandle[] waithandles = BeginExecuteOperations(compositeOperation);
    150       WaitForAll(waithandles);
     152      AsyncGridResult[] asyncResults = BeginExecuteOperations(compositeOperation);
     153      WaitForAll(asyncResults);
    151154      // collect results
    152155      List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>();
    153156      try {
    154         foreach(AtomicOperation parOperation in compositeOperation.Operations) {
    155           KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
    156             jobManager.EndExecuteOperation(parOperation), parOperation);
    157           results.Add(p);
    158         }
    159       } catch(Exception e) {
     157        int i = 0;
     158        foreach (AtomicOperation parOperation in compositeOperation.Operations) {
     159          results.Add(new KeyValuePair<ProcessingEngine, AtomicOperation>(
     160              (ProcessingEngine)jobManager.EndExecuteEngine(asyncResults[i++]), parOperation));
     161        }
     162      }
     163      catch (Exception e) {
    160164        // this exception means there was a problem with the underlying communication infrastructure
    161165        // -> show message dialog, abort engine, requeue the whole composite operation again and return
     
    169173    }
    170174
    171     private WaitHandle[] BeginExecuteOperations(CompositeOperation compositeOperation) {
    172       WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
     175    private AsyncGridResult[] BeginExecuteOperations(CompositeOperation compositeOperation) {
     176      AsyncGridResult[] asyncResults = new AsyncGridResult[compositeOperation.Operations.Count];
    173177      int i = 0;
    174178      // HACK: assume that all atomicOperations have the same parent scope.
     
    182186      PruneToParentScope(GlobalScope, parentScope, prunedScopes);
    183187      List<IScope> subScopes = new List<IScope>(parentScope.SubScopes);
    184       foreach(IScope scope in subScopes) {
     188      foreach (IScope scope in subScopes) {
    185189        parentScope.RemoveSubScope(scope);
    186190      }
    187191      // start all parallel jobs
    188       foreach(AtomicOperation parOperation in compositeOperation.Operations) {
     192      foreach (AtomicOperation parOperation in compositeOperation.Operations) {
    189193        parentScope.AddSubScope(parOperation.Scope);
    190         waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation);
     194        asyncResults[i++] = jobManager.BeginExecuteEngine(new ProcessingEngine(GlobalScope, parOperation));
    191195        parentScope.RemoveSubScope(parOperation.Scope);
    192196      }
    193       foreach(IScope scope in subScopes) {
     197      foreach (IScope scope in subScopes) {
    194198        parentScope.AddSubScope(scope);
    195199      }
     
    197201      RestoreFullTree(GlobalScope, prunedScopes);
    198202
    199       return waithandles;
    200     }
    201 
    202     private void WaitForAll(WaitHandle[] waithandles) {
     203      return asyncResults;
     204    }
     205
     206    private void WaitForAll(AsyncGridResult[] asyncResults) {
    203207      // wait until all jobs are finished
    204208      // WaitAll works only with maximally 64 waithandles
    205       if(waithandles.Length <= 64) {
    206         WaitHandle.WaitAll(waithandles);
     209      if (asyncResults.Length <= 64) {
     210        WaitHandle[] waitHandles = new WaitHandle[asyncResults.Length];
     211        for (int i = 0; i < asyncResults.Length; i++) {
     212          waitHandles[i] = asyncResults[i].WaitHandle;
     213        }
     214        WaitHandle.WaitAll(waitHandles);
    207215      } else {
    208216        int i;
    209         for(i = 0; i < waithandles.Length; i++) {
    210           waithandles[i].WaitOne();
    211           waithandles[i].Close();
     217        for (i = 0; i < asyncResults.Length; i++) {
     218          asyncResults[i].WaitHandle.WaitOne();
    212219        }
    213220      }
     
    221228      suspendedEngines.Clear();
    222229      // retrieve results and merge into scope-tree
    223       foreach(KeyValuePair<ProcessingEngine, AtomicOperation> p in results) {
     230      foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in results) {
    224231        ProcessingEngine resultEngine = p.Key;
    225232        AtomicOperation parOperation = p.Value;
    226         if(resultEngine.Canceled && !resultEngine.Suspended) {
     233        if (resultEngine.Canceled && !resultEngine.Suspended) {
    227234          // when an engine was canceled but not suspended this means there was a problem
    228235          // show error message and queue the operation for restart (again parallel)
     
    230237          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(new JobExecutionException(resultEngine.ErrorMessage)); });
    231238          canceledOperations.AddOperation(parOperation);
    232         } else if(resultEngine.Suspended) {
     239        } else if (resultEngine.Suspended) {
    233240          // when the engine was suspended it means it was stopped because of a breakpoint
    234241          // -> merge the partial results and queue the engine (which has operations remaining in the execution stack) to be resumed (parallel)
     
    243250      }
    244251      // if there were exceptions -> abort
    245       if(canceledOperations.Operations.Count > 0) {
     252      if (canceledOperations.Operations.Count > 0) {
    246253        // requeue the aborted operations
    247254        myExecutionStack.Push(canceledOperations);
     
    249256      }
    250257      // if there were breakpoints -> abort
    251       if(suspendedEngines.Count > 0) {
     258      if (suspendedEngines.Count > 0) {
    252259        Abort();
    253260      }
     
    255262
    256263    private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) {
    257       if(savedScopes.Count == 0) return;
     264      if (savedScopes.Count == 0) return;
    258265      IScope remainingBranch = currentScope.SubScopes[0];
    259266      currentScope.RemoveSubScope(remainingBranch);
    260267      IList<IScope> savedScopesForCurrent = savedScopes[0];
    261       foreach(IScope savedScope in savedScopesForCurrent) {
     268      foreach (IScope savedScope in savedScopesForCurrent) {
    262269        currentScope.AddSubScope(savedScope);
    263270      }
     
    267274
    268275    private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) {
    269       if(currentScope == scope) return currentScope;
    270       if(currentScope.SubScopes.Count == 0) return null;
     276      if (currentScope == scope) return currentScope;
     277      if (currentScope.SubScopes.Count == 0) return null;
    271278      IScope foundScope = null;
    272279      // try to find the searched scope in all my sub-scopes
    273       foreach(IScope subScope in currentScope.SubScopes) {
     280      foreach (IScope subScope in currentScope.SubScopes) {
    274281        foundScope = PruneToParentScope(subScope, scope, prunedScopes);
    275         if(foundScope != null) break; // we can stop as soon as we find the scope in a branch
    276       }
    277       if(foundScope != null) { // when we found the scopes in my sub-scopes
     282        if (foundScope != null) break; // we can stop as soon as we find the scope in a branch
     283      }
     284      if (foundScope != null) { // when we found the scopes in my sub-scopes
    278285        List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes
    279286        prunedScopes.Add(subScopes);
    280287        // remove all my sub-scopes
    281         foreach(IScope subScope in subScopes) {
     288        foreach (IScope subScope in subScopes) {
    282289          currentScope.RemoveSubScope(subScope);
    283290        }
     
    291298
    292299    private IScope FindParentScope(IScope currentScope, IScope childScope) {
    293       if(currentScope.SubScopes.Contains(childScope)) return currentScope;
    294       foreach(IScope subScope in currentScope.SubScopes) {
     300      if (currentScope.SubScopes.Contains(childScope)) return currentScope;
     301      foreach (IScope subScope in currentScope.SubScopes) {
    295302        IScope result = FindParentScope(subScope, childScope);
    296         if(result != null) return result;
     303        if (result != null) return result;
    297304      }
    298305      return null;
     
    302309      // merge the results
    303310      original.Clear();
    304       foreach(IVariable variable in result.Variables) {
     311      foreach (IVariable variable in result.Variables) {
    305312        original.AddVariable(variable);
    306313      }
    307       foreach(IScope subScope in result.SubScopes) {
     314      foreach (IScope subScope in result.SubScopes) {
    308315        original.AddSubScope(subScope);
    309316      }
    310       foreach(KeyValuePair<string, string> alias in result.Aliases) {
     317      foreach (KeyValuePair<string, string> alias in result.Aliases) {
    311318        original.AddAlias(alias.Key, alias.Value);
    312319      }
     
    319326      addressAttribute.Value = ServerAddress;
    320327      node.Attributes.Append(addressAttribute);
    321       if(suspendedEngines.Count > 0) {
     328      if (suspendedEngines.Count > 0) {
    322329        XmlNode suspendedEnginesNode = document.CreateElement("SuspendedEngines");
    323         foreach(KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) {
     330        foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) {
    324331          XmlNode n = document.CreateElement("Entry");
    325332          n.AppendChild(PersistenceManager.Persist(p.Key, document, persistedObjects));
     
    335342      ServerAddress = node.Attributes["ServerAddress"].Value;
    336343      XmlNode suspendedEnginesNode = node.SelectSingleNode("SuspendedEngines");
    337       if(suspendedEnginesNode != null) {
    338         foreach(XmlNode n in suspendedEnginesNode.ChildNodes) {
     344      if (suspendedEnginesNode != null) {
     345        foreach (XmlNode n in suspendedEnginesNode.ChildNodes) {
    339346          KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>(
    340347            (ProcessingEngine)PersistenceManager.Restore(n.ChildNodes[0], restoredObjects),
  • trunk/sources/HeuristicLab.Grid/3.2/HeuristicLab.Grid-3.2.csproj

    r1534 r2055  
    9898      <DependentUpon>ClientForm.cs</DependentUpon>
    9999    </Compile>
     100    <Compile Include="AsyncGridResult.cs" />
    100101    <Compile Include="Database.cs" />
    101102    <Compile Include="EngineRunner.cs" />
  • trunk/sources/HeuristicLab.Grid/3.2/IGridServer.cs

    r1529 r2055  
    2727namespace HeuristicLab.Grid {
    2828  public enum JobState {
    29     Unknown,
     29    Unknown = 0, // default value
    3030    Waiting,
    3131    Busy,
  • trunk/sources/HeuristicLab.Grid/3.2/JobManager.cs

    r1529 r2055  
    4343    private const int RESULT_POLLING_TIMEOUT = 5;
    4444
    45     private class Job {
    46       public Guid guid;
    47       public ProcessingEngine engine;
    48       public ManualResetEvent waitHandle;
    49       public int restarts;
    50     }
    51 
    5245    private IGridServer server;
    5346    private string address;
    5447    private object waitingQueueLock = new object();
    55     private Queue<Job> waitingJobs = new Queue<Job>();
     48    private Queue<AsyncGridResult> waitingJobs = new Queue<AsyncGridResult>();
    5649    private object runningQueueLock = new object();
    57     private Queue<Job> runningJobs = new Queue<Job>();
    58     private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
    59 
    60     private List<IOperation> erroredOperations = new List<IOperation>();
     50    private Queue<AsyncGridResult> runningJobs = new Queue<AsyncGridResult>();
    6151    private object connectionLock = new object();
    62     private object dictionaryLock = new object();
    6352
    6453    private AutoResetEvent runningWaitHandle = new AutoResetEvent(false);
     
    7766    public void Reset() {
    7867      ResetConnection();
    79       lock(dictionaryLock) {
    80         foreach(Job j in waitingJobs) {
    81           j.waitHandle.Close();
     68      lock (waitingQueueLock) {
     69        foreach (AsyncGridResult r in waitingJobs) {
     70          r.WaitHandle.Close();
    8271        }
    8372        waitingJobs.Clear();
    84         foreach(Job j in runningJobs) {
    85           j.waitHandle.Close();
     73      }
     74      lock (runningQueueLock) {
     75        foreach (AsyncGridResult r in runningJobs) {
     76          r.WaitHandle.Close();
    8677        }
    8778        runningJobs.Clear();
    88         results.Clear();
    89         erroredOperations.Clear();
    9079      }
    9180    }
     
    9382    private void ResetConnection() {
    9483      Trace.TraceInformation("Reset connection in JobManager");
    95       lock(connectionLock) {
     84      lock (connectionLock) {
    9685        // open a new channel
    9786        NetTcpBinding binding = new NetTcpBinding();
     
    10796    public void StartEngines() {
    10897      try {
    109         while(true) {
    110           Job job = null;
    111           lock(waitingQueueLock) {
    112             if(waitingJobs.Count > 0) job = waitingJobs.Dequeue();
    113           }
    114           if(job==null) waitingWaitHandle.WaitOne(); // no jobs waiting
     98        while (true) {
     99          AsyncGridResult job = null;
     100          lock (waitingQueueLock) {
     101            if (waitingJobs.Count > 0) job = waitingJobs.Dequeue();
     102          }
     103          if (job == null) waitingWaitHandle.WaitOne(); // no jobs waiting
    115104          else {
    116             Guid currentEngineGuid = TryStartExecuteEngine(job.engine);
    117             if(currentEngineGuid == Guid.Empty) {
     105            Guid currentEngineGuid = TryStartExecuteEngine(job.Engine);
     106            if (currentEngineGuid == Guid.Empty) {
    118107              // couldn't start the job -> requeue
    119               if(job.restarts < MAX_RESTARTS) {
    120                 job.restarts++;
    121                 lock(waitingQueueLock) waitingJobs.Enqueue(job);
     108              if (job.Restarts < MAX_RESTARTS) {
     109                job.Restarts++;
     110                lock (waitingQueueLock) waitingJobs.Enqueue(job);
    122111                waitingWaitHandle.Set();
    123112              } else {
    124113                // max restart count reached -> give up on this job and flag error
    125                 lock(dictionaryLock) {
    126                   erroredOperations.Add(job.engine.InitialOperation);
    127                   job.waitHandle.Set();
    128                 }
     114                job.Aborted = true;
     115                job.SignalFinished();
    129116              }
    130117            } else {
    131118              // job started successfully
    132               job.guid = currentEngineGuid;
    133               lock(runningQueueLock) {
     119              job.Guid = currentEngineGuid;
     120              lock (runningQueueLock) {
    134121                runningJobs.Enqueue(job);
    135122                runningWaitHandle.Set();
     
    138125          }
    139126        }
    140       } catch(Exception e) {
    141         Trace.TraceError("Exception "+e+" in JobManager.StartEngines() killed the start-engine thread\n"+e.StackTrace);
     127      }
     128      catch (Exception e) {
     129        Trace.TraceError("Exception " + e + " in JobManager.StartEngines() killed the start-engine thread\n" + e.StackTrace);
    142130      }
    143131    }
     
    146134    public void GetResults() {
    147135      try {
    148         while(true) {
    149           Job job = null;
    150           lock(runningQueueLock) {
    151             if(runningJobs.Count > 0) job = runningJobs.Dequeue();
    152           }
    153           if(job == null) runningWaitHandle.WaitOne(); // no jobs running
     136        while (true) {
     137          AsyncGridResult job = null;
     138          lock (runningQueueLock) {
     139            if (runningJobs.Count > 0) job = runningJobs.Dequeue();
     140          }
     141          if (job == null) runningWaitHandle.WaitOne(); // no jobs running
    154142          else {
    155             byte[] zippedResult = TryEndExecuteEngine(server, job.guid);
    156             if(zippedResult != null) { // successful
    157               lock(dictionaryLock) {
    158                 // store result
    159                 results[job.engine.InitialOperation] = zippedResult;
    160                 // notify consumer that result is ready
    161                 job.waitHandle.Set();
    162               }
     143            byte[] zippedResult = TryEndExecuteEngine(server, job.Guid);
     144            if (zippedResult != null) {
     145              // successful => store result
     146              job.ZippedResult = zippedResult;
     147              // notify consumer that result is ready
     148              job.SignalFinished();
    163149            } else {
    164150              // there was a problem -> check the state of the job and restart if necessary
    165               JobState jobState = TryGetJobState(server, job.guid);
    166               if(jobState == JobState.Unknown) {
    167                 job.restarts++;
    168                 lock(waitingQueueLock) {
     151              JobState jobState = TryGetJobState(server, job.Guid);
     152              if (jobState == JobState.Unknown) {
     153                job.Restarts++;
     154                lock (waitingQueueLock) {
    169155                  waitingJobs.Enqueue(job);
    170156                  waitingWaitHandle.Set();
     
    172158              } else {
    173159                // job still active at the server
    174                 lock(runningQueueLock) {
     160                lock (runningQueueLock) {
    175161                  runningJobs.Enqueue(job);
    176162                  runningWaitHandle.Set();
     
    181167          }
    182168        }
    183       } catch(Exception e) {
    184         Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n"+ e.StackTrace);
    185       }
    186     }
    187 
    188     public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
    189       return BeginExecuteEngine(new ProcessingEngine(globalScope, operation));
    190     }
    191 
    192     public WaitHandle BeginExecuteEngine(ProcessingEngine engine) {
    193       Job job = new Job();
    194       job.engine = engine;
    195       job.waitHandle = new ManualResetEvent(false);
    196       job.restarts = 0;
    197       lock(waitingQueueLock) {
    198         waitingJobs.Enqueue(job);
     169      }
     170      catch (Exception e) {
     171        Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n" + e.StackTrace);
     172      }
     173    }
     174
     175    public AsyncGridResult BeginExecuteEngine(ProcessingEngine engine) {
     176      AsyncGridResult asyncResult = new AsyncGridResult(engine);
     177      asyncResult.Engine = engine;
     178      lock (waitingQueueLock) {
     179        waitingJobs.Enqueue(asyncResult);
    199180      }
    200181      waitingWaitHandle.Set();
    201       return job.waitHandle;
    202     }
    203 
    204     private byte[] ZipEngine(ProcessingEngine engine) {
     182      return asyncResult;
     183    }
     184
     185    private byte[] ZipEngine(IEngine engine) {
    205186      return PersistenceManager.SaveToGZip(engine);
    206187    }
    207188
    208     public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
    209       if(erroredOperations.Contains(operation)) {
    210         erroredOperations.Remove(operation);
     189    public IEngine EndExecuteEngine(AsyncGridResult asyncResult) {
     190      if (asyncResult.Aborted) {
    211191        throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
    212192      } else {
    213         byte[] zippedResult = null;
    214         lock(dictionaryLock) {
    215           zippedResult = results[operation];
    216           results.Remove(operation);
    217         }
    218193        // restore the engine
    219         return (ProcessingEngine)PersistenceManager.RestoreFromGZip(zippedResult);
    220       }
    221     }
    222 
    223     private Guid TryStartExecuteEngine(ProcessingEngine engine) {
     194        return (IEngine)PersistenceManager.RestoreFromGZip(asyncResult.ZippedResult);
     195      }
     196    }
     197
     198    private Guid TryStartExecuteEngine(IEngine engine) {
    224199      byte[] zippedEngine = ZipEngine(engine);
     200      return SavelyExecute(() => server.BeginExecuteEngine(zippedEngine));
     201    }
     202
     203    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
     204      return SavelyExecute(() => {
     205        byte[] zippedResult = server.TryEndExecuteEngine(engineGuid);
     206        return zippedResult;
     207      });
     208    }
     209
     210    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
     211      return SavelyExecute(() => server.JobState(engineGuid));
     212    }
     213
     214    private TResult SavelyExecute<TResult>(Func<TResult> a) {
    225215      int retries = 0;
    226       Guid guid = Guid.Empty;
    227216      do {
    228217        try {
    229           lock(connectionLock) {
    230             guid = server.BeginExecuteEngine(zippedEngine);
    231           }
    232           return guid;
    233         } catch(TimeoutException) {
     218          lock (connectionLock) {
     219            return a();
     220          }
     221        }
     222        catch (TimeoutException) {
    234223          retries++;
    235224          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    236         } catch(CommunicationException) {
     225        }
     226        catch (CommunicationException) {
    237227          ResetConnection();
    238228          retries++;
    239229          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    240230        }
    241       } while(retries < MAX_CONNECTION_RETRIES);
    242       Trace.TraceWarning("Reached max connection retries in TryStartExecuteEngine");
    243       return Guid.Empty;
    244     }
    245 
    246     private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
    247       int retries = 0;
    248       do {
    249         try {
    250           lock(connectionLock) {
    251             byte[] zippedResult = server.TryEndExecuteEngine(engineGuid);
    252             return zippedResult;
    253           }
    254         } catch(TimeoutException) {
    255           retries++;
    256           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    257         } catch(CommunicationException) {
    258           ResetConnection();
    259           retries++;
    260           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    261         }
    262       } while(retries < MAX_CONNECTION_RETRIES);
    263       Trace.TraceWarning("Reached max connection retries in TryEndExecuteEngine");
    264       return null;
    265     }
    266 
    267     private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
    268       // check if the server is still working on the job
    269       int retries = 0;
    270       do {
    271         try {
    272           lock(connectionLock) {
    273             JobState jobState = server.JobState(engineGuid);
    274             return jobState;
    275           }
    276         } catch(TimeoutException) {
    277           retries++;
    278           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    279         } catch(CommunicationException) {
    280           ResetConnection();
    281           retries++;
    282           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    283         }
    284       } while(retries < MAX_CONNECTION_RETRIES);
    285       Trace.TraceWarning("Reached max connection retries in TryGetJobState");
    286       return JobState.Unknown;
     231      } while (retries < MAX_CONNECTION_RETRIES);
     232      Trace.TraceWarning("Reached max connection retries");
     233      return default(TResult);
    287234    }
    288235  }
  • trunk/sources/HeuristicLab.SupportVectorMachines/3.2/SupportVectorRegression.cs

    r2051 r2055  
    361361      model.TestVarianceAccountedFor = bestModelScope.GetVariableValue<DoubleData>("TestVAF", false).Data;
    362362
    363       model.Data = bestModelScope.GetVariableValue<SVMModel>("BestValidationModel", false);
     363      model.Data = bestModelScope.GetVariableValue<SVMModel>("Model", false);
    364364      HeuristicLab.DataAnalysis.Dataset ds = bestModelScope.GetVariableValue<Dataset>("Dataset", true);
    365365      model.Dataset = ds;
Note: See TracChangeset for help on using the changeset viewer.