- Timestamp:
- 06/17/09 18:07:15 (15 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.CEDMA.Server/3.3/GridExecuter.cs
r2053 r2055 43 43 public class GridExecuter : ExecuterBase { 44 44 private JobManager jobManager; 45 private Dictionary< WaitHandle, IAlgorithm> activeAlgorithms;45 private Dictionary<AsyncGridResult, IAlgorithm> activeAlgorithms; 46 46 47 47 private TimeSpan StartJobInterval { … … 56 56 : base(dispatcher, store) { 57 57 this.jobManager = new JobManager(gridUrl); 58 activeAlgorithms = new Dictionary< WaitHandle, IAlgorithm>();58 activeAlgorithms = new Dictionary<AsyncGridResult, IAlgorithm>(); 59 59 jobManager.Reset(); 60 60 } 61 61 62 62 protected override void StartJobs() { 63 List<WaitHandle> wh = new List<WaitHandle>(); 64 Dictionary<WaitHandle, AtomicOperation> activeOperations = new Dictionary<WaitHandle, AtomicOperation>(); 63 Dictionary<WaitHandle, AsyncGridResult> asyncResults = new Dictionary<WaitHandle,AsyncGridResult>(); 65 64 while (true) { 66 65 try { 67 66 // start new jobs as long as there are less than MaxActiveJobs 68 while ( wh.Count < MaxActiveJobs) {67 while (asyncResults.Count < MaxActiveJobs) { 69 68 Thread.Sleep(StartJobInterval); 70 69 // get an execution from the dispatcher and execute in grid via job-manager … … 72 71 if (algorithm != null) { 73 72 AtomicOperation op = new AtomicOperation(algorithm.Engine.OperatorGraph.InitialOperator, algorithm.Engine.GlobalScope); 74 WaitHandle opWh = jobManager.BeginExecuteOperation(algorithm.Engine.GlobalScope, op); 75 wh.Add(opWh); 76 activeOperations.Add(opWh, op); 73 AsyncGridResult asyncResult = jobManager.BeginExecuteEngine(new ProcessingEngine(algorithm.Engine.GlobalScope, op)); 74 asyncResults.Add(asyncResult.WaitHandle, asyncResult); 77 75 lock (activeAlgorithms) { 78 activeAlgorithms.Add( opWh, algorithm);76 activeAlgorithms.Add(asyncResult, algorithm); 79 77 } 80 78 } 81 79 } 82 80 // wait until any job is finished 83 WaitHandle[] whArr = wh.ToArray();81 WaitHandle[] whArr = asyncResults.Keys.ToArray(); 84 82 int readyHandleIndex = WaitHandle.WaitAny(whArr, WaitForFinishedJobsTimeout); 85 83 if (readyHandleIndex != WaitHandle.WaitTimeout) { 86 84 WaitHandle readyHandle = whArr[readyHandleIndex]; 87 AtomicOperation finishedOp = activeOperations[readyHandle];88 wh.Remove(readyHandle);89 85 IAlgorithm finishedAlgorithm = null; 86 AsyncGridResult finishedResult = null; 90 87 lock (activeAlgorithms) { 91 finishedAlgorithm = activeAlgorithms[readyHandle]; 92 activeAlgorithms.Remove(readyHandle); 88 finishedResult = asyncResults[readyHandle]; 89 finishedAlgorithm = activeAlgorithms[finishedResult]; 90 activeAlgorithms.Remove(finishedResult); 91 asyncResults.Remove(readyHandle); 93 92 } 94 activeOperations.Remove(readyHandle);95 readyHandle.Close();96 93 try { 97 ProcessingEngine finishedEngine = jobManager.EndExecuteOperation(finishedOp); 94 IEngine finishedEngine = jobManager.EndExecuteEngine(finishedResult); 95 SetResults(finishedEngine.GlobalScope, finishedAlgorithm.Engine.GlobalScope); 98 96 StoreResults(finishedAlgorithm); 99 97 } … … 106 104 Trace.WriteLine("CEDMA Executer: Exception in job-management thread. " + ex.Message); 107 105 } 106 } 107 } 108 109 private void SetResults(IScope src, IScope target) { 110 foreach (IVariable v in src.Variables) { 111 target.AddVariable(v); 112 } 113 foreach (IScope subScope in src.SubScopes) { 114 target.AddSubScope(subScope); 115 } 116 foreach (KeyValuePair<string, string> alias in src.Aliases) { 117 target.AddAlias(alias.Key, alias.Value); 108 118 } 109 119 }
Note: See TracChangeset
for help on using the changeset viewer.