- Timestamp:
- 02/12/09 14:05:33 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/CEDMA-Refactoring-Ticket419/HeuristicLab.CEDMA.Server/Executer.cs
r1156 r1216 41 41 namespace HeuristicLab.CEDMA.Server { 42 42 public class Executer { 43 private static int MaxActiveJobs {44 get { return 100; }45 }46 43 private Dispatcher dispatcher; 47 44 private JobManager jobManager; 48 45 private IStore store; 46 private Dictionary<WaitHandle, Execution> activeExecutions; 47 48 private TimeSpan StartJobInterval { 49 get { return TimeSpan.FromMilliseconds(500); } 50 } 51 52 private TimeSpan WaitForFinishedJobsTimeout { 53 get { return TimeSpan.FromMilliseconds(100); } 54 } 55 56 private int maxActiveJobs; 57 public int MaxActiveJobs { 58 get { return maxActiveJobs; } 59 set { 60 if (value < 0) throw new ArgumentException("Only positive values are allowed for MaxActiveJobs"); 61 maxActiveJobs = value; 62 } 63 } 49 64 50 65 public Executer(Dispatcher dispatcher, IStore store, string gridUrl) { 66 activeExecutions = new Dictionary<WaitHandle, Execution>(); 67 maxActiveJobs = 10; 51 68 this.dispatcher = dispatcher; 52 69 this.store = store; … … 61 78 private void StartJobs() { 62 79 List<WaitHandle> wh = new List<WaitHandle>(); 63 Dictionary<WaitHandle, AtomicOperation> activeOperations = new Dictionary<WaitHandle,AtomicOperation>(); 64 Dictionary<WaitHandle, Execution> activeExecutions = new Dictionary<WaitHandle,Execution>(); 80 Dictionary<WaitHandle, AtomicOperation> activeOperations = new Dictionary<WaitHandle, AtomicOperation>(); 65 81 while (true) { 66 82 try { 67 83 // start new jobs as long as there are less than MaxActiveJobs 68 84 while (wh.Count < MaxActiveJobs) { 85 Thread.Sleep(StartJobInterval); 69 86 // get an execution from the dispatcher and execute in grid via job-manager 70 87 Execution execution = dispatcher.GetNextJob(); … … 74 91 wh.Add(opWh); 75 92 activeOperations.Add(opWh, op); 76 activeExecutions.Add(opWh, execution); 93 lock (activeExecutions) { 94 activeExecutions.Add(opWh, execution); 95 } 77 96 } 78 97 } 79 98 // wait until any job is finished 80 99 WaitHandle[] whArr = wh.ToArray(); 81 int readyHandleIndex = WaitHandle.WaitAny(whArr); 82 WaitHandle readyHandle = whArr[readyHandleIndex]; 83 AtomicOperation finishedOp = activeOperations[readyHandle]; 84 Execution finishedExecution = activeExecutions[readyHandle]; 85 wh.Remove(readyHandle); 86 activeExecutions.Remove(readyHandle); 87 activeOperations.Remove(readyHandle); 88 ProcessingEngine finishedEngine = null; 89 try { 90 finishedEngine = jobManager.EndExecuteOperation(finishedOp); 91 } catch (Exception badEx) { 92 Trace.WriteLine("CEDMA Executer: Exception in job execution thread. " + badEx.Message); 93 } 94 if (finishedEngine != null) { 95 StoreResults(finishedExecution, finishedEngine); 100 int readyHandleIndex = WaitHandle.WaitAny(whArr, WaitForFinishedJobsTimeout); 101 if (readyHandleIndex != WaitHandle.WaitTimeout) { 102 WaitHandle readyHandle = whArr[readyHandleIndex]; 103 AtomicOperation finishedOp = activeOperations[readyHandle]; 104 wh.Remove(readyHandle); 105 Execution finishedExecution = null; 106 lock (activeExecutions) { 107 finishedExecution = activeExecutions[readyHandle]; 108 activeExecutions.Remove(readyHandle); 109 } 110 activeOperations.Remove(readyHandle); 111 ProcessingEngine finishedEngine = null; 112 try { 113 finishedEngine = jobManager.EndExecuteOperation(finishedOp); 114 } 115 catch (Exception badEx) { 116 Trace.WriteLine("CEDMA Executer: Exception in job execution thread. " + badEx.Message); 117 } 118 if (finishedEngine != null) { 119 StoreResults(finishedExecution, finishedEngine); 120 } 96 121 } 97 122 } … … 115 140 StoreModelAttribute(model, Ontology.TreeHeight, bestModelScope.GetVariableValue<IntData>("TreeHeight", false).Data); 116 141 StoreModelAttribute(model, Ontology.EvaluatedSolutions, bestModelScope.GetVariableValue<IntData>("EvaluatedSolutions", false).Data); 117 142 118 143 byte[] serializedModel = PersistenceManager.SaveToGZip(bestModelScope.GetVariableValue("FunctionTree", false)); 119 144 store.Add(new Statement(model, Ontology.PredicateSerializedData, new Literal(Convert.ToBase64String(serializedModel)))); 120 }145 } 121 146 122 147 private void StoreModelAttribute(Entity model, Entity predicate, object value) { 123 148 store.Add(new Statement(model, predicate, new Literal(value))); 124 149 } 150 151 internal string[] GetJobs() { 152 lock (activeExecutions) { 153 string[] retVal = new string[activeExecutions.Count]; 154 int i = 0; 155 foreach (Execution e in activeExecutions.Values) { 156 retVal[i++] = "Target-Variable: "+e.TargetVariable+" "+e.Description; 157 } 158 return retVal; 159 } 160 } 125 161 } 126 162 }
Note: See TracChangeset
for help on using the changeset viewer.