Changeset 2055
- Timestamp:
- 06/17/09 18:07:15 (16 years ago)
- Location:
- trunk/sources
- Files:
-
- 1 added
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.CEDMA.Server/3.3/GridExecuter.cs
r2053 r2055 43 43 public class GridExecuter : ExecuterBase { 44 44 private JobManager jobManager; 45 private Dictionary< WaitHandle, IAlgorithm> activeAlgorithms;45 private Dictionary<AsyncGridResult, IAlgorithm> activeAlgorithms; 46 46 47 47 private TimeSpan StartJobInterval { … … 56 56 : base(dispatcher, store) { 57 57 this.jobManager = new JobManager(gridUrl); 58 activeAlgorithms = new Dictionary< WaitHandle, IAlgorithm>();58 activeAlgorithms = new Dictionary<AsyncGridResult, IAlgorithm>(); 59 59 jobManager.Reset(); 60 60 } 61 61 62 62 protected override void StartJobs() { 63 List<WaitHandle> wh = new List<WaitHandle>(); 64 Dictionary<WaitHandle, AtomicOperation> activeOperations = new Dictionary<WaitHandle, AtomicOperation>(); 63 Dictionary<WaitHandle, AsyncGridResult> asyncResults = new Dictionary<WaitHandle,AsyncGridResult>(); 65 64 while (true) { 66 65 try { 67 66 // start new jobs as long as there are less than MaxActiveJobs 68 while ( wh.Count < MaxActiveJobs) {67 while (asyncResults.Count < MaxActiveJobs) { 69 68 Thread.Sleep(StartJobInterval); 70 69 // get an execution from the dispatcher and execute in grid via job-manager … … 72 71 if (algorithm != null) { 73 72 AtomicOperation op = new AtomicOperation(algorithm.Engine.OperatorGraph.InitialOperator, algorithm.Engine.GlobalScope); 74 WaitHandle opWh = jobManager.BeginExecuteOperation(algorithm.Engine.GlobalScope, op); 75 wh.Add(opWh); 76 activeOperations.Add(opWh, op); 73 AsyncGridResult asyncResult = jobManager.BeginExecuteEngine(new ProcessingEngine(algorithm.Engine.GlobalScope, op)); 74 asyncResults.Add(asyncResult.WaitHandle, asyncResult); 77 75 lock (activeAlgorithms) { 78 activeAlgorithms.Add( opWh, algorithm);76 activeAlgorithms.Add(asyncResult, algorithm); 79 77 } 80 78 } 81 79 } 82 80 // wait until any job is finished 83 WaitHandle[] whArr = wh.ToArray();81 WaitHandle[] whArr = asyncResults.Keys.ToArray(); 84 82 int readyHandleIndex = WaitHandle.WaitAny(whArr, WaitForFinishedJobsTimeout); 85 83 if (readyHandleIndex != WaitHandle.WaitTimeout) { 86 84 WaitHandle readyHandle = whArr[readyHandleIndex]; 87 AtomicOperation finishedOp = activeOperations[readyHandle];88 wh.Remove(readyHandle);89 85 IAlgorithm finishedAlgorithm = null; 86 AsyncGridResult finishedResult = null; 90 87 lock (activeAlgorithms) { 91 finishedAlgorithm = activeAlgorithms[readyHandle]; 92 activeAlgorithms.Remove(readyHandle); 88 finishedResult = asyncResults[readyHandle]; 89 finishedAlgorithm = activeAlgorithms[finishedResult]; 90 activeAlgorithms.Remove(finishedResult); 91 asyncResults.Remove(readyHandle); 93 92 } 94 activeOperations.Remove(readyHandle);95 readyHandle.Close();96 93 try { 97 ProcessingEngine finishedEngine = jobManager.EndExecuteOperation(finishedOp); 94 IEngine finishedEngine = jobManager.EndExecuteEngine(finishedResult); 95 SetResults(finishedEngine.GlobalScope, finishedAlgorithm.Engine.GlobalScope); 98 96 StoreResults(finishedAlgorithm); 99 97 } … … 106 104 Trace.WriteLine("CEDMA Executer: Exception in job-management thread. " + ex.Message); 107 105 } 106 } 107 } 108 109 private void SetResults(IScope src, IScope target) { 110 foreach (IVariable v in src.Variables) { 111 target.AddVariable(v); 112 } 113 foreach (IScope subScope in src.SubScopes) { 114 target.AddSubScope(subScope); 115 } 116 foreach (KeyValuePair<string, string> alias in src.Aliases) { 117 target.AddAlias(alias.Key, alias.Value); 108 118 } 109 119 } -
trunk/sources/HeuristicLab.DistributedEngine/3.2/DistributedEngine.cs
r1529 r2055 42 42 get { return serverAddress; } 43 43 set { 44 if (value != serverAddress) {44 if (value != serverAddress) { 45 45 serverAddress = value; 46 46 } … … 72 72 73 73 public override void Execute() { 74 if (jobManager == null) this.jobManager = new JobManager(serverAddress);74 if (jobManager == null) this.jobManager = new JobManager(serverAddress); 75 75 jobManager.Reset(); 76 76 base.Execute(); … … 82 82 83 83 protected override void ProcessNextOperation() { 84 if (suspendedEngines.Count > 0) {84 if (suspendedEngines.Count > 0) { 85 85 ProcessSuspendedEngines(); 86 86 } else { … … 91 91 92 92 private void ProcessSuspendedEngines() { 93 WaitHandle[] waitHandles = new WaitHandle[suspendedEngines.Count];93 AsyncGridResult[] asyncResults = new AsyncGridResult[suspendedEngines.Count]; 94 94 int i = 0; 95 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) {96 waitHandles[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key);97 } 98 WaitForAll( waitHandles);95 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) { 96 asyncResults[i++] = jobManager.BeginExecuteEngine(suspendedPair.Key); 97 } 98 WaitForAll(asyncResults); 99 99 // collect results 100 100 List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>(); 101 101 try { 102 foreach(KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) { 102 int resultIndex = 0; 103 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> suspendedPair in suspendedEngines) { 103 104 KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>( 104 jobManager.EndExecuteOperation(suspendedPair.Value),105 (ProcessingEngine)jobManager.EndExecuteEngine(asyncResults[resultIndex++]), 105 106 suspendedPair.Value); 106 107 results.Add(p); 107 108 } 108 } catch(Exception e) { 109 } 110 catch (Exception e) { 109 111 // this exception means there was a problem with the underlying communication infrastructure 110 112 // -> show message dialog and abort engine … … 118 120 119 121 private void ProcessOperation(IOperation operation) { 120 if (operation is AtomicOperation) {122 if (operation is AtomicOperation) { 121 123 AtomicOperation atomicOperation = (AtomicOperation)operation; 122 124 IOperation next = null; 123 125 try { 124 126 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 125 } catch(Exception ex) { 127 } 128 catch (Exception ex) { 126 129 // push operation on stack again 127 130 myExecutionStack.Push(atomicOperation); … … 129 132 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 130 133 } 131 if (next != null)134 if (next != null) 132 135 myExecutionStack.Push(next); 133 136 OnOperationExecuted(atomicOperation); 134 if (atomicOperation.Operator.Breakpoint) Abort();135 } else if (operation is CompositeOperation) {137 if (atomicOperation.Operator.Breakpoint) Abort(); 138 } else if (operation is CompositeOperation) { 136 139 CompositeOperation compositeOperation = (CompositeOperation)operation; 137 if (compositeOperation.ExecuteInParallel) {140 if (compositeOperation.ExecuteInParallel) { 138 141 ProcessParallelOperation(compositeOperation); 139 142 OnOperationExecuted(compositeOperation); 140 143 } else { 141 for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--)144 for (int i = compositeOperation.Operations.Count - 1; i >= 0; i--) 142 145 myExecutionStack.Push(compositeOperation.Operations[i]); 143 146 } … … 147 150 private void ProcessParallelOperation(CompositeOperation compositeOperation) { 148 151 // send operations to grid 149 WaitHandle[] waithandles = BeginExecuteOperations(compositeOperation);150 WaitForAll( waithandles);152 AsyncGridResult[] asyncResults = BeginExecuteOperations(compositeOperation); 153 WaitForAll(asyncResults); 151 154 // collect results 152 155 List<KeyValuePair<ProcessingEngine, AtomicOperation>> results = new List<KeyValuePair<ProcessingEngine, AtomicOperation>>(); 153 156 try { 154 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 155 KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>( 156 jobManager.EndExecuteOperation(parOperation), parOperation); 157 results.Add(p); 158 } 159 } catch(Exception e) { 157 int i = 0; 158 foreach (AtomicOperation parOperation in compositeOperation.Operations) { 159 results.Add(new KeyValuePair<ProcessingEngine, AtomicOperation>( 160 (ProcessingEngine)jobManager.EndExecuteEngine(asyncResults[i++]), parOperation)); 161 } 162 } 163 catch (Exception e) { 160 164 // this exception means there was a problem with the underlying communication infrastructure 161 165 // -> show message dialog, abort engine, requeue the whole composite operation again and return … … 169 173 } 170 174 171 private WaitHandle[] BeginExecuteOperations(CompositeOperation compositeOperation) {172 WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];175 private AsyncGridResult[] BeginExecuteOperations(CompositeOperation compositeOperation) { 176 AsyncGridResult[] asyncResults = new AsyncGridResult[compositeOperation.Operations.Count]; 173 177 int i = 0; 174 178 // HACK: assume that all atomicOperations have the same parent scope. … … 182 186 PruneToParentScope(GlobalScope, parentScope, prunedScopes); 183 187 List<IScope> subScopes = new List<IScope>(parentScope.SubScopes); 184 foreach (IScope scope in subScopes) {188 foreach (IScope scope in subScopes) { 185 189 parentScope.RemoveSubScope(scope); 186 190 } 187 191 // start all parallel jobs 188 foreach (AtomicOperation parOperation in compositeOperation.Operations) {192 foreach (AtomicOperation parOperation in compositeOperation.Operations) { 189 193 parentScope.AddSubScope(parOperation.Scope); 190 waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation);194 asyncResults[i++] = jobManager.BeginExecuteEngine(new ProcessingEngine(GlobalScope, parOperation)); 191 195 parentScope.RemoveSubScope(parOperation.Scope); 192 196 } 193 foreach (IScope scope in subScopes) {197 foreach (IScope scope in subScopes) { 194 198 parentScope.AddSubScope(scope); 195 199 } … … 197 201 RestoreFullTree(GlobalScope, prunedScopes); 198 202 199 return waithandles;200 } 201 202 private void WaitForAll( WaitHandle[] waithandles) {203 return asyncResults; 204 } 205 206 private void WaitForAll(AsyncGridResult[] asyncResults) { 203 207 // wait until all jobs are finished 204 208 // WaitAll works only with maximally 64 waithandles 205 if(waithandles.Length <= 64) { 206 WaitHandle.WaitAll(waithandles); 209 if (asyncResults.Length <= 64) { 210 WaitHandle[] waitHandles = new WaitHandle[asyncResults.Length]; 211 for (int i = 0; i < asyncResults.Length; i++) { 212 waitHandles[i] = asyncResults[i].WaitHandle; 213 } 214 WaitHandle.WaitAll(waitHandles); 207 215 } else { 208 216 int i; 209 for(i = 0; i < waithandles.Length; i++) { 210 waithandles[i].WaitOne(); 211 waithandles[i].Close(); 217 for (i = 0; i < asyncResults.Length; i++) { 218 asyncResults[i].WaitHandle.WaitOne(); 212 219 } 213 220 } … … 221 228 suspendedEngines.Clear(); 222 229 // retrieve results and merge into scope-tree 223 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in results) {230 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in results) { 224 231 ProcessingEngine resultEngine = p.Key; 225 232 AtomicOperation parOperation = p.Value; 226 if (resultEngine.Canceled && !resultEngine.Suspended) {233 if (resultEngine.Canceled && !resultEngine.Suspended) { 227 234 // when an engine was canceled but not suspended this means there was a problem 228 235 // show error message and queue the operation for restart (again parallel) … … 230 237 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(new JobExecutionException(resultEngine.ErrorMessage)); }); 231 238 canceledOperations.AddOperation(parOperation); 232 } else if (resultEngine.Suspended) {239 } else if (resultEngine.Suspended) { 233 240 // when the engine was suspended it means it was stopped because of a breakpoint 234 241 // -> merge the partial results and queue the engine (which has operations remaining in the execution stack) to be resumed (parallel) … … 243 250 } 244 251 // if there were exceptions -> abort 245 if (canceledOperations.Operations.Count > 0) {252 if (canceledOperations.Operations.Count > 0) { 246 253 // requeue the aborted operations 247 254 myExecutionStack.Push(canceledOperations); … … 249 256 } 250 257 // if there were breakpoints -> abort 251 if (suspendedEngines.Count > 0) {258 if (suspendedEngines.Count > 0) { 252 259 Abort(); 253 260 } … … 255 262 256 263 private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) { 257 if (savedScopes.Count == 0) return;264 if (savedScopes.Count == 0) return; 258 265 IScope remainingBranch = currentScope.SubScopes[0]; 259 266 currentScope.RemoveSubScope(remainingBranch); 260 267 IList<IScope> savedScopesForCurrent = savedScopes[0]; 261 foreach (IScope savedScope in savedScopesForCurrent) {268 foreach (IScope savedScope in savedScopesForCurrent) { 262 269 currentScope.AddSubScope(savedScope); 263 270 } … … 267 274 268 275 private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) { 269 if (currentScope == scope) return currentScope;270 if (currentScope.SubScopes.Count == 0) return null;276 if (currentScope == scope) return currentScope; 277 if (currentScope.SubScopes.Count == 0) return null; 271 278 IScope foundScope = null; 272 279 // try to find the searched scope in all my sub-scopes 273 foreach (IScope subScope in currentScope.SubScopes) {280 foreach (IScope subScope in currentScope.SubScopes) { 274 281 foundScope = PruneToParentScope(subScope, scope, prunedScopes); 275 if (foundScope != null) break; // we can stop as soon as we find the scope in a branch276 } 277 if (foundScope != null) { // when we found the scopes in my sub-scopes282 if (foundScope != null) break; // we can stop as soon as we find the scope in a branch 283 } 284 if (foundScope != null) { // when we found the scopes in my sub-scopes 278 285 List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes 279 286 prunedScopes.Add(subScopes); 280 287 // remove all my sub-scopes 281 foreach (IScope subScope in subScopes) {288 foreach (IScope subScope in subScopes) { 282 289 currentScope.RemoveSubScope(subScope); 283 290 } … … 291 298 292 299 private IScope FindParentScope(IScope currentScope, IScope childScope) { 293 if (currentScope.SubScopes.Contains(childScope)) return currentScope;294 foreach (IScope subScope in currentScope.SubScopes) {300 if (currentScope.SubScopes.Contains(childScope)) return currentScope; 301 foreach (IScope subScope in currentScope.SubScopes) { 295 302 IScope result = FindParentScope(subScope, childScope); 296 if (result != null) return result;303 if (result != null) return result; 297 304 } 298 305 return null; … … 302 309 // merge the results 303 310 original.Clear(); 304 foreach (IVariable variable in result.Variables) {311 foreach (IVariable variable in result.Variables) { 305 312 original.AddVariable(variable); 306 313 } 307 foreach (IScope subScope in result.SubScopes) {314 foreach (IScope subScope in result.SubScopes) { 308 315 original.AddSubScope(subScope); 309 316 } 310 foreach (KeyValuePair<string, string> alias in result.Aliases) {317 foreach (KeyValuePair<string, string> alias in result.Aliases) { 311 318 original.AddAlias(alias.Key, alias.Value); 312 319 } … … 319 326 addressAttribute.Value = ServerAddress; 320 327 node.Attributes.Append(addressAttribute); 321 if (suspendedEngines.Count > 0) {328 if (suspendedEngines.Count > 0) { 322 329 XmlNode suspendedEnginesNode = document.CreateElement("SuspendedEngines"); 323 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) {330 foreach (KeyValuePair<ProcessingEngine, AtomicOperation> p in suspendedEngines) { 324 331 XmlNode n = document.CreateElement("Entry"); 325 332 n.AppendChild(PersistenceManager.Persist(p.Key, document, persistedObjects)); … … 335 342 ServerAddress = node.Attributes["ServerAddress"].Value; 336 343 XmlNode suspendedEnginesNode = node.SelectSingleNode("SuspendedEngines"); 337 if (suspendedEnginesNode != null) {338 foreach (XmlNode n in suspendedEnginesNode.ChildNodes) {344 if (suspendedEnginesNode != null) { 345 foreach (XmlNode n in suspendedEnginesNode.ChildNodes) { 339 346 KeyValuePair<ProcessingEngine, AtomicOperation> p = new KeyValuePair<ProcessingEngine, AtomicOperation>( 340 347 (ProcessingEngine)PersistenceManager.Restore(n.ChildNodes[0], restoredObjects), -
trunk/sources/HeuristicLab.Grid/3.2/HeuristicLab.Grid-3.2.csproj
r1534 r2055 98 98 <DependentUpon>ClientForm.cs</DependentUpon> 99 99 </Compile> 100 <Compile Include="AsyncGridResult.cs" /> 100 101 <Compile Include="Database.cs" /> 101 102 <Compile Include="EngineRunner.cs" /> -
trunk/sources/HeuristicLab.Grid/3.2/IGridServer.cs
r1529 r2055 27 27 namespace HeuristicLab.Grid { 28 28 public enum JobState { 29 Unknown ,29 Unknown = 0, // default value 30 30 Waiting, 31 31 Busy, -
trunk/sources/HeuristicLab.Grid/3.2/JobManager.cs
r1529 r2055 43 43 private const int RESULT_POLLING_TIMEOUT = 5; 44 44 45 private class Job {46 public Guid guid;47 public ProcessingEngine engine;48 public ManualResetEvent waitHandle;49 public int restarts;50 }51 52 45 private IGridServer server; 53 46 private string address; 54 47 private object waitingQueueLock = new object(); 55 private Queue< Job> waitingJobs = new Queue<Job>();48 private Queue<AsyncGridResult> waitingJobs = new Queue<AsyncGridResult>(); 56 49 private object runningQueueLock = new object(); 57 private Queue<Job> runningJobs = new Queue<Job>(); 58 private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>(); 59 60 private List<IOperation> erroredOperations = new List<IOperation>(); 50 private Queue<AsyncGridResult> runningJobs = new Queue<AsyncGridResult>(); 61 51 private object connectionLock = new object(); 62 private object dictionaryLock = new object();63 52 64 53 private AutoResetEvent runningWaitHandle = new AutoResetEvent(false); … … 77 66 public void Reset() { 78 67 ResetConnection(); 79 lock (dictionaryLock) {80 foreach (Job jin waitingJobs) {81 j.waitHandle.Close();68 lock (waitingQueueLock) { 69 foreach (AsyncGridResult r in waitingJobs) { 70 r.WaitHandle.Close(); 82 71 } 83 72 waitingJobs.Clear(); 84 foreach(Job j in runningJobs) { 85 j.waitHandle.Close(); 73 } 74 lock (runningQueueLock) { 75 foreach (AsyncGridResult r in runningJobs) { 76 r.WaitHandle.Close(); 86 77 } 87 78 runningJobs.Clear(); 88 results.Clear();89 erroredOperations.Clear();90 79 } 91 80 } … … 93 82 private void ResetConnection() { 94 83 Trace.TraceInformation("Reset connection in JobManager"); 95 lock (connectionLock) {84 lock (connectionLock) { 96 85 // open a new channel 97 86 NetTcpBinding binding = new NetTcpBinding(); … … 107 96 public void StartEngines() { 108 97 try { 109 while (true) {110 Jobjob = null;111 lock (waitingQueueLock) {112 if (waitingJobs.Count > 0) job = waitingJobs.Dequeue();113 } 114 if (job==null) waitingWaitHandle.WaitOne(); // no jobs waiting98 while (true) { 99 AsyncGridResult job = null; 100 lock (waitingQueueLock) { 101 if (waitingJobs.Count > 0) job = waitingJobs.Dequeue(); 102 } 103 if (job == null) waitingWaitHandle.WaitOne(); // no jobs waiting 115 104 else { 116 Guid currentEngineGuid = TryStartExecuteEngine(job. engine);117 if (currentEngineGuid == Guid.Empty) {105 Guid currentEngineGuid = TryStartExecuteEngine(job.Engine); 106 if (currentEngineGuid == Guid.Empty) { 118 107 // couldn't start the job -> requeue 119 if (job.restarts < MAX_RESTARTS) {120 job. restarts++;121 lock (waitingQueueLock) waitingJobs.Enqueue(job);108 if (job.Restarts < MAX_RESTARTS) { 109 job.Restarts++; 110 lock (waitingQueueLock) waitingJobs.Enqueue(job); 122 111 waitingWaitHandle.Set(); 123 112 } else { 124 113 // max restart count reached -> give up on this job and flag error 125 lock(dictionaryLock) { 126 erroredOperations.Add(job.engine.InitialOperation); 127 job.waitHandle.Set(); 128 } 114 job.Aborted = true; 115 job.SignalFinished(); 129 116 } 130 117 } else { 131 118 // job started successfully 132 job. guid = currentEngineGuid;133 lock (runningQueueLock) {119 job.Guid = currentEngineGuid; 120 lock (runningQueueLock) { 134 121 runningJobs.Enqueue(job); 135 122 runningWaitHandle.Set(); … … 138 125 } 139 126 } 140 } catch(Exception e) { 141 Trace.TraceError("Exception "+e+" in JobManager.StartEngines() killed the start-engine thread\n"+e.StackTrace); 127 } 128 catch (Exception e) { 129 Trace.TraceError("Exception " + e + " in JobManager.StartEngines() killed the start-engine thread\n" + e.StackTrace); 142 130 } 143 131 } … … 146 134 public void GetResults() { 147 135 try { 148 while (true) {149 Jobjob = null;150 lock (runningQueueLock) {151 if (runningJobs.Count > 0) job = runningJobs.Dequeue();152 } 153 if (job == null) runningWaitHandle.WaitOne(); // no jobs running136 while (true) { 137 AsyncGridResult job = null; 138 lock (runningQueueLock) { 139 if (runningJobs.Count > 0) job = runningJobs.Dequeue(); 140 } 141 if (job == null) runningWaitHandle.WaitOne(); // no jobs running 154 142 else { 155 byte[] zippedResult = TryEndExecuteEngine(server, job.guid); 156 if(zippedResult != null) { // successful 157 lock(dictionaryLock) { 158 // store result 159 results[job.engine.InitialOperation] = zippedResult; 160 // notify consumer that result is ready 161 job.waitHandle.Set(); 162 } 143 byte[] zippedResult = TryEndExecuteEngine(server, job.Guid); 144 if (zippedResult != null) { 145 // successful => store result 146 job.ZippedResult = zippedResult; 147 // notify consumer that result is ready 148 job.SignalFinished(); 163 149 } else { 164 150 // there was a problem -> check the state of the job and restart if necessary 165 JobState jobState = TryGetJobState(server, job. guid);166 if (jobState == JobState.Unknown) {167 job. restarts++;168 lock (waitingQueueLock) {151 JobState jobState = TryGetJobState(server, job.Guid); 152 if (jobState == JobState.Unknown) { 153 job.Restarts++; 154 lock (waitingQueueLock) { 169 155 waitingJobs.Enqueue(job); 170 156 waitingWaitHandle.Set(); … … 172 158 } else { 173 159 // job still active at the server 174 lock (runningQueueLock) {160 lock (runningQueueLock) { 175 161 runningJobs.Enqueue(job); 176 162 runningWaitHandle.Set(); … … 181 167 } 182 168 } 183 } catch(Exception e) { 184 Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n"+ e.StackTrace); 185 } 186 } 187 188 public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) { 189 return BeginExecuteEngine(new ProcessingEngine(globalScope, operation)); 190 } 191 192 public WaitHandle BeginExecuteEngine(ProcessingEngine engine) { 193 Job job = new Job(); 194 job.engine = engine; 195 job.waitHandle = new ManualResetEvent(false); 196 job.restarts = 0; 197 lock(waitingQueueLock) { 198 waitingJobs.Enqueue(job); 169 } 170 catch (Exception e) { 171 Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n" + e.StackTrace); 172 } 173 } 174 175 public AsyncGridResult BeginExecuteEngine(ProcessingEngine engine) { 176 AsyncGridResult asyncResult = new AsyncGridResult(engine); 177 asyncResult.Engine = engine; 178 lock (waitingQueueLock) { 179 waitingJobs.Enqueue(asyncResult); 199 180 } 200 181 waitingWaitHandle.Set(); 201 return job.waitHandle;202 } 203 204 private byte[] ZipEngine( ProcessingEngine engine) {182 return asyncResult; 183 } 184 185 private byte[] ZipEngine(IEngine engine) { 205 186 return PersistenceManager.SaveToGZip(engine); 206 187 } 207 188 208 public ProcessingEngine EndExecuteOperation(AtomicOperation operation) { 209 if(erroredOperations.Contains(operation)) { 210 erroredOperations.Remove(operation); 189 public IEngine EndExecuteEngine(AsyncGridResult asyncResult) { 190 if (asyncResult.Aborted) { 211 191 throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server."); 212 192 } else { 213 byte[] zippedResult = null;214 lock(dictionaryLock) {215 zippedResult = results[operation];216 results.Remove(operation);217 }218 193 // restore the engine 219 return ( ProcessingEngine)PersistenceManager.RestoreFromGZip(zippedResult);220 } 221 } 222 223 private Guid TryStartExecuteEngine( ProcessingEngine engine) {194 return (IEngine)PersistenceManager.RestoreFromGZip(asyncResult.ZippedResult); 195 } 196 } 197 198 private Guid TryStartExecuteEngine(IEngine engine) { 224 199 byte[] zippedEngine = ZipEngine(engine); 200 return SavelyExecute(() => server.BeginExecuteEngine(zippedEngine)); 201 } 202 203 private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) { 204 return SavelyExecute(() => { 205 byte[] zippedResult = server.TryEndExecuteEngine(engineGuid); 206 return zippedResult; 207 }); 208 } 209 210 private JobState TryGetJobState(IGridServer server, Guid engineGuid) { 211 return SavelyExecute(() => server.JobState(engineGuid)); 212 } 213 214 private TResult SavelyExecute<TResult>(Func<TResult> a) { 225 215 int retries = 0; 226 Guid guid = Guid.Empty;227 216 do { 228 217 try { 229 lock (connectionLock) {230 guid = server.BeginExecuteEngine(zippedEngine);231 } 232 return guid;233 } catch(TimeoutException) {218 lock (connectionLock) { 219 return a(); 220 } 221 } 222 catch (TimeoutException) { 234 223 retries++; 235 224 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 236 } catch(CommunicationException) { 225 } 226 catch (CommunicationException) { 237 227 ResetConnection(); 238 228 retries++; 239 229 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 240 230 } 241 } while(retries < MAX_CONNECTION_RETRIES); 242 Trace.TraceWarning("Reached max connection retries in TryStartExecuteEngine"); 243 return Guid.Empty; 244 } 245 246 private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) { 247 int retries = 0; 248 do { 249 try { 250 lock(connectionLock) { 251 byte[] zippedResult = server.TryEndExecuteEngine(engineGuid); 252 return zippedResult; 253 } 254 } catch(TimeoutException) { 255 retries++; 256 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 257 } catch(CommunicationException) { 258 ResetConnection(); 259 retries++; 260 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 261 } 262 } while(retries < MAX_CONNECTION_RETRIES); 263 Trace.TraceWarning("Reached max connection retries in TryEndExecuteEngine"); 264 return null; 265 } 266 267 private JobState TryGetJobState(IGridServer server, Guid engineGuid) { 268 // check if the server is still working on the job 269 int retries = 0; 270 do { 271 try { 272 lock(connectionLock) { 273 JobState jobState = server.JobState(engineGuid); 274 return jobState; 275 } 276 } catch(TimeoutException) { 277 retries++; 278 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 279 } catch(CommunicationException) { 280 ResetConnection(); 281 retries++; 282 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 283 } 284 } while(retries < MAX_CONNECTION_RETRIES); 285 Trace.TraceWarning("Reached max connection retries in TryGetJobState"); 286 return JobState.Unknown; 231 } while (retries < MAX_CONNECTION_RETRIES); 232 Trace.TraceWarning("Reached max connection retries"); 233 return default(TResult); 287 234 } 288 235 } -
trunk/sources/HeuristicLab.SupportVectorMachines/3.2/SupportVectorRegression.cs
r2051 r2055 361 361 model.TestVarianceAccountedFor = bestModelScope.GetVariableValue<DoubleData>("TestVAF", false).Data; 362 362 363 model.Data = bestModelScope.GetVariableValue<SVMModel>(" BestValidationModel", false);363 model.Data = bestModelScope.GetVariableValue<SVMModel>("Model", false); 364 364 HeuristicLab.DataAnalysis.Dataset ds = bestModelScope.GetVariableValue<Dataset>("Dataset", true); 365 365 model.Dataset = ds;
Note: See TracChangeset
for help on using the changeset viewer.