Changeset 279 for branches/3.0/sources/HeuristicLab.DistributedEngine
- Timestamp:
- 05/30/08 13:41:27 (16 years ago)
- Location:
- branches/3.0/sources/HeuristicLab.DistributedEngine
- Files:
-
- 2 edited
- 1 copied
Legend:
- Unmodified
- Added
- Removed
-
branches/3.0/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs
r36 r279 30 30 using System.IO; 31 31 using System.IO.Compression; 32 using HeuristicLab.PluginInfrastructure; 33 using System.Windows.Forms; 34 using System.Diagnostics; 32 35 33 36 namespace HeuristicLab.DistributedEngine { 34 37 public class DistributedEngine : EngineBase, IEditable { 35 private IGridServer server; 36 private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>(); 37 private List<Guid> runningEngines = new List<Guid>(); 38 private JobManager jobManager; 39 private CompositeOperation waitingOperations; 38 40 private string serverAddress; 39 private bool cancelRequested;40 private CompositeOperation waitingOperations;41 41 public string ServerAddress { 42 42 get { return serverAddress; } … … 47 47 } 48 48 } 49 public override bool Terminated {50 get {51 return myExecutionStack.Count == 0 && runningEngines.Count == 0 && waitingOperations==null;52 }53 }54 49 public override object Clone(IDictionary<Guid, object> clonedObjects) { 55 50 DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects); … … 66 61 67 62 public override void Execute() { 68 NetTcpBinding binding = new NetTcpBinding(); 69 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes 70 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 71 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 72 binding.Security.Mode = SecurityMode.None; 73 ChannelFactory<IGridServer> factory = new ChannelFactory<IGridServer>(binding); 74 server = factory.CreateChannel(new EndpointAddress(serverAddress)); 75 63 if(jobManager == null) this.jobManager = new JobManager(serverAddress); 64 jobManager.Reset(); 76 65 base.Execute(); 77 66 } … … 81 70 } 82 71 83 public override void Abort() {84 lock(runningEngines) {85 cancelRequested = true;86 foreach(Guid engineGuid in runningEngines) {87 server.AbortEngine(engineGuid);88 }89 }90 }91 public override void Reset() {92 base.Reset();93 engineOperations.Clear();94 runningEngines.Clear();95 cancelRequested = false;96 }97 98 72 protected override void ProcessNextOperation() { 99 lock(runningEngines) { 100 if(runningEngines.Count == 0 && cancelRequested) { 101 base.Abort(); 102 cancelRequested = false; 103 if(waitingOperations != null && waitingOperations.Operations.Count != 0) { 104 myExecutionStack.Push(waitingOperations); 105 waitingOperations = null; 73 IOperation operation = myExecutionStack.Pop(); 74 if(operation is AtomicOperation) { 75 AtomicOperation atomicOperation = (AtomicOperation)operation; 76 IOperation next = null; 77 try { 78 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 79 } catch(Exception ex) { 80 // push operation on stack again 81 myExecutionStack.Push(atomicOperation); 82 Abort(); 83 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 84 } 85 if(next != null) 86 myExecutionStack.Push(next); 87 OnOperationExecuted(atomicOperation); 88 if(atomicOperation.Operator.Breakpoint) Abort(); 89 } else if(operation is CompositeOperation) { 90 CompositeOperation compositeOperation = (CompositeOperation)operation; 91 if(compositeOperation.ExecuteInParallel) { 92 try { 93 WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count]; 94 int i = 0; 95 // HACK: assume that all atomicOperations have the same parent scope. 96 // 1) find that parent scope 97 // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation 98 // 3) keep the branches to 'repair' the scope-tree later 99 // 4) for each parallel job attach only the sub-scope that this operation uses 100 // 5) after starting all parallel jobs restore the whole scope-tree 101 IScope parentScope = FindParentScope(GlobalScope, compositeOperation); 102 List<IList<IScope>> prunedScopes = new List<IList<IScope>>(); 103 PruneToParentScope(GlobalScope, parentScope, prunedScopes); 104 List<IScope> subScopes = new List<IScope>(parentScope.SubScopes); 105 foreach(IScope scope in subScopes) { 106 parentScope.RemoveSubScope(scope); 107 } 108 // start all parallel jobs 109 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 110 parentScope.AddSubScope(parOperation.Scope); 111 waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation); 112 parentScope.RemoveSubScope(parOperation.Scope); 113 } 114 foreach(IScope scope in subScopes) { 115 parentScope.AddSubScope(scope); 116 } 117 prunedScopes.Reverse(); 118 RestoreFullTree(GlobalScope, prunedScopes); 119 120 // wait until all jobs are finished 121 // WaitAll works only with maximally 64 waithandles 122 if(waithandles.Length <= 64) { 123 WaitHandle.WaitAll(waithandles); 124 } else { 125 for(i = 0; i < waithandles.Length; i++) { 126 waithandles[i].WaitOne(); 127 waithandles[i].Close(); 128 } 129 } 130 // retrieve results and merge into scope-tree 131 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 132 IScope result = jobManager.EndExecuteOperation(parOperation); 133 MergeScope(parOperation.Scope, result); 134 } 135 } catch(Exception e) { 136 myExecutionStack.Push(compositeOperation); 137 Abort(); 138 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); }); 106 139 } 107 return; 108 } 109 if(runningEngines.Count != 0) { 110 Guid engineGuid = runningEngines[0]; 111 byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100); 112 if(resultXml != null) { 113 GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress); 114 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 115 IScope oldScope = engineOperations[engineGuid].Scope; 116 oldScope.Clear(); 117 foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) { 118 oldScope.AddVariable(variable); 119 } 120 foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) { 121 oldScope.AddSubScope(subScope); 122 } 123 OnOperationExecuted(engineOperations[engineGuid]); 124 125 if(cancelRequested & resultEngine.ExecutionStack.Count != 0) { 126 if(waitingOperations == null) { 127 waitingOperations = new CompositeOperation(); 128 waitingOperations.ExecuteInParallel = false; 129 } 130 CompositeOperation task = new CompositeOperation(); 131 while(resultEngine.ExecutionStack.Count > 0) { 132 AtomicOperation oldOperation = (AtomicOperation)resultEngine.ExecutionStack.Pop(); 133 if(oldOperation.Scope == resultEngine.InitialOperation.Scope) { 134 oldOperation = new AtomicOperation(oldOperation.Operator, oldScope); 135 } 136 task.AddOperation(oldOperation); 137 } 138 waitingOperations.AddOperation(task); 139 } 140 runningEngines.Remove(engineGuid); 141 engineOperations.Remove(engineGuid); 142 } 143 return; 144 } 145 IOperation operation = myExecutionStack.Pop(); 146 if(operation is AtomicOperation) { 147 AtomicOperation atomicOperation = (AtomicOperation)operation; 148 IOperation next = null; 149 try { 150 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 151 } catch(Exception ex) { 152 // push operation on stack again 153 myExecutionStack.Push(atomicOperation); 154 Abort(); 155 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); 156 } 157 if(next != null) 158 myExecutionStack.Push(next); 159 OnOperationExecuted(atomicOperation); 160 if(atomicOperation.Operator.Breakpoint) Abort(); 161 } else if(operation is CompositeOperation) { 162 CompositeOperation compositeOperation = (CompositeOperation)operation; 163 if(compositeOperation.ExecuteInParallel) { 164 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 165 ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed? 166 MemoryStream memStream = new MemoryStream(); 167 GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); 168 PersistenceManager.Save(engine, stream); 169 stream.Close(); 170 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 171 runningEngines.Add(currentEngineGuid); 172 engineOperations[currentEngineGuid] = parOperation; 173 } 174 } else { 175 for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--) 176 myExecutionStack.Push(compositeOperation.Operations[i]); 177 } 178 } 140 } else { 141 for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--) 142 myExecutionStack.Push(compositeOperation.Operations[i]); 143 } 144 } 145 } 146 147 private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) { 148 if(savedScopes.Count == 0) return; 149 IScope remainingBranch = currentScope.SubScopes[0]; 150 currentScope.RemoveSubScope(remainingBranch); 151 IList<IScope> savedScopesForCurrent = savedScopes[0]; 152 foreach(IScope savedScope in savedScopesForCurrent) { 153 currentScope.AddSubScope(savedScope); 154 } 155 savedScopes.RemoveAt(0); 156 RestoreFullTree(remainingBranch, savedScopes); 157 } 158 159 private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) { 160 if(currentScope == scope) return currentScope; 161 if(currentScope.SubScopes.Count == 0) return null; 162 IScope foundScope = null; 163 // try to find the searched scope in all my sub-scopes 164 foreach(IScope subScope in currentScope.SubScopes) { 165 foundScope = PruneToParentScope(subScope, scope, prunedScopes); 166 if(foundScope != null) break; // we can stop as soon as we find the scope in a branch 167 } 168 if(foundScope != null) { // when we found the scopes in my sub-scopes 169 List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes 170 prunedScopes.Add(subScopes); 171 // remove all my sub-scopes 172 foreach(IScope subScope in subScopes) { 173 currentScope.RemoveSubScope(subScope); 174 } 175 // add only the branch that leads to the scope that I search for 176 currentScope.AddSubScope(foundScope); 177 return currentScope; // return that this scope contains the branch that leads to the searched scopes 178 } else { 179 return null; // otherwise we didn't find the searched scope and we can return null 180 } 181 } 182 183 private IScope FindParentScope(IScope currentScope, CompositeOperation compositeOperation) { 184 AtomicOperation currentOperation = (AtomicOperation)compositeOperation.Operations[0]; 185 if(currentScope.SubScopes.Contains(currentOperation.Scope)) return currentScope; 186 foreach(IScope subScope in currentScope.SubScopes) { 187 IScope result = FindParentScope(subScope, compositeOperation); 188 if(result != null) return result; 189 } 190 return null; 191 } 192 193 private void MergeScope(IScope original, IScope result) { 194 // merge the results 195 original.Clear(); 196 foreach(IVariable variable in result.Variables) { 197 original.AddVariable(variable); 198 } 199 foreach(IScope subScope in result.SubScopes) { 200 original.AddSubScope(subScope); 201 } 202 foreach(KeyValuePair<string, string> alias in result.Aliases) { 203 original.AddAlias(alias.Key, alias.Value); 179 204 } 180 205 } -
branches/3.0/sources/HeuristicLab.DistributedEngine/HeuristicLab.DistributedEngine.csproj
r30 r279 50 50 <ItemGroup> 51 51 <Compile Include="HeuristicLabDistributedEnginePlugin.cs" /> 52 <Compile Include="JobManager.cs" /> 52 53 <Compile Include="Properties\AssemblyInfo.cs" /> 53 54 <Compile Include="DistributedEngine.cs" /> -
branches/3.0/sources/HeuristicLab.DistributedEngine/JobManager.cs
r228 r279 1 using System; 1 #region License Information 2 /* HeuristicLab 3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL) 4 * 5 * This file is part of HeuristicLab. 6 * 7 * HeuristicLab is free software: you can redistribute it and/or modify 8 * it under the terms of the GNU General Public License as published by 9 * the Free Software Foundation, either version 3 of the License, or 10 * (at your option) any later version. 11 * 12 * HeuristicLab is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 * GNU General Public License for more details. 16 * 17 * You should have received a copy of the GNU General Public License 18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>. 19 */ 20 #endregion 21 22 using System; 2 23 using System.Collections.Generic; 3 24 using System.Linq; … … 15 36 private IGridServer server; 16 37 private string address; 17 private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>(); 18 private Dictionary<Guid, byte[]> runningEngines = new Dictionary<Guid, byte[]>(); 38 private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>(); 19 39 private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>(); 20 private object locker = new object(); 40 private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>(); 41 private object connectionLock = new object(); 42 private object dictionaryLock = new object(); 43 21 44 private const int MAX_RESTARTS = 5; 22 private Exception exception; 45 private const int MAX_CONNECTION_RETRIES = 10; 46 private const int RETRY_TIMEOUT_SEC = 10; 47 private const int CHECK_RESULTS_TIMEOUT = 10; 48 23 49 private ChannelFactory<IGridServer> factory; 24 public Exception Exception {25 get { return exception; }26 }27 50 28 51 public JobManager(string address) { … … 31 54 32 55 internal void Reset() { 33 lock(locker) {34 ResetConnection();56 ResetConnection(); 57 lock(dictionaryLock) { 35 58 foreach(WaitHandle wh in waithandles.Values) wh.Close(); 36 59 waithandles.Clear(); 37 engineOperations.Clear(); 38 runningEngines.Clear(); 39 exception = null; 60 engines.Clear(); 61 results.Clear(); 40 62 } 41 63 } 42 64 43 65 private void ResetConnection() { 44 // open a new channel 45 NetTcpBinding binding = new NetTcpBinding(); 46 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes 47 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 48 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 49 binding.Security.Mode = SecurityMode.None; 50 factory = new ChannelFactory<IGridServer>(binding); 51 server = factory.CreateChannel(new EndpointAddress(address)); 52 } 53 54 public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) { 55 ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed? 66 lock(connectionLock) { 67 // open a new channel 68 NetTcpBinding binding = new NetTcpBinding(); 69 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes 70 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 71 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 72 binding.Security.Mode = SecurityMode.None; 73 74 factory = new ChannelFactory<IGridServer>(binding); 75 server = factory.CreateChannel(new EndpointAddress(address)); 76 } 77 } 78 79 public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) { 80 ProcessingEngine engine = new ProcessingEngine(globalScope, operation); 81 byte[] zippedEngine = ZipEngine(engine); 82 Guid currentEngineGuid = Guid.Empty; 83 bool success = false; 84 int retryCount = 0; 85 do { 86 lock(connectionLock) { 87 try { 88 currentEngineGuid = server.BeginExecuteEngine(zippedEngine); 89 success = true; 90 } catch(TimeoutException timeoutException) { 91 if(retryCount < MAX_CONNECTION_RETRIES) { 92 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 93 retryCount++; 94 } else { 95 throw new ApplicationException("Max retries reached.", timeoutException); 96 } 97 } catch(CommunicationException communicationException) { 98 ResetConnection(); 99 // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution) 100 if(retryCount < MAX_CONNECTION_RETRIES) { 101 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 102 retryCount++; 103 } else { 104 throw new ApplicationException("Max retries reached.", communicationException); 105 } 106 } 107 } 108 } while(!success); 109 lock(dictionaryLock) { 110 engines[currentEngineGuid] = engine; 111 waithandles[currentEngineGuid] = new ManualResetEvent(false); 112 } 113 ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid); 114 return waithandles[currentEngineGuid]; 115 } 116 117 private byte[] ZipEngine(ProcessingEngine engine) { 56 118 MemoryStream memStream = new MemoryStream(); 57 119 GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); 58 120 PersistenceManager.Save(engine, stream); 59 121 stream.Close(); 60 if(factory.State != CommunicationState.Opened) 61 ResetConnection(); 62 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 63 lock(locker) { 64 runningEngines[currentEngineGuid] = memStream.ToArray(); 65 engineOperations[currentEngineGuid] = operation; 66 waithandles[currentEngineGuid] = new ManualResetEvent(false); 67 ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid); 68 } 69 return waithandles[currentEngineGuid]; 122 byte[] zippedEngine = memStream.ToArray(); 123 memStream.Close(); 124 return zippedEngine; 125 } 126 127 public IScope EndExecuteOperation(AtomicOperation operation) { 128 byte[] zippedResult = null; 129 lock(dictionaryLock) { 130 zippedResult = results[operation]; 131 results.Remove(operation); 132 } 133 // restore the engine 134 using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) { 135 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 136 return resultEngine.InitialOperation.Scope; 137 } 70 138 } 71 139 … … 74 142 int restartCounter = 0; 75 143 do { 76 try { 77 if(factory.State != CommunicationState.Opened) ResetConnection(); 78 byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100); 79 if(resultXml != null) { 80 // restore the engine 81 GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress); 82 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 83 84 // merge the results 85 IScope oldScope = engineOperations[engineGuid].Scope; 86 oldScope.Clear(); 87 foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) { 88 oldScope.AddVariable(variable); 89 } 90 foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) { 91 oldScope.AddSubScope(subScope); 92 } 93 foreach(KeyValuePair<string, string> alias in resultEngine.InitialOperation.Scope.Aliases) { 94 oldScope.AddAlias(alias.Key, alias.Value); 95 } 96 97 lock(locker) { 98 // signal the wait handle and clean up then return 99 waithandles[engineGuid].Set(); 100 engineOperations.Remove(engineGuid); 101 waithandles.Remove(engineGuid); 102 runningEngines.Remove(engineGuid); 103 } 104 return; 105 } else { 106 // check if the server is still working on the job 107 JobState jobState = server.JobState(engineGuid); 108 if(jobState == JobState.Unkown) { 109 // restart job 110 byte[] packedEngine; 111 lock(locker) { 112 packedEngine = runningEngines[engineGuid]; 144 Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT)); 145 byte[] zippedResult = null; 146 lock(connectionLock) { 147 bool success = false; 148 int retries = 0; 149 do { 150 try { 151 zippedResult = server.TryEndExecuteEngine(engineGuid, 100); 152 success = true; 153 } catch(TimeoutException timeoutException) { 154 success = false; 155 retries++; 156 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 157 } catch(CommunicationException communicationException) { 158 ResetConnection(); 159 success = false; 160 retries++; 161 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 162 } 163 164 } while(!success && retries < MAX_CONNECTION_RETRIES); 165 } 166 if(zippedResult != null) { 167 lock(dictionaryLock) { 168 // store result 169 results[engines[engineGuid].InitialOperation] = zippedResult; 170 171 // signal the wait handle and clean up then return 172 engines.Remove(engineGuid); 173 waithandles[engineGuid].Set(); 174 waithandles.Remove(engineGuid); 175 } 176 return; 177 } else { 178 // check if the server is still working on the job 179 bool success = false; 180 int retries = 0; 181 JobState jobState = JobState.Unkown; 182 do { 183 try { 184 lock(connectionLock) { 185 jobState = server.JobState(engineGuid); 113 186 } 114 server.BeginExecuteEngine(packedEngine); 115 restartCounter++; 116 } 187 success = true; 188 } catch(TimeoutException timeoutException) { 189 retries++; 190 success = false; 191 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 192 } catch(CommunicationException communicationException) { 193 ResetConnection(); 194 retries++; 195 success = false; 196 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 197 } 198 } while(!success && retries < MAX_CONNECTION_RETRIES); 199 if(jobState == JobState.Unkown) { 200 // restart job 201 ProcessingEngine engine; 202 lock(dictionaryLock) { 203 engine = engines[engineGuid]; 204 } 205 byte[] zippedEngine = ZipEngine(engine); 206 success = false; 207 retries = 0; 208 do { 209 try { 210 lock(connectionLock) { 211 server.BeginExecuteEngine(zippedEngine); 212 } 213 success = true; 214 } catch(TimeoutException timeoutException) { 215 success = false; 216 retries++; 217 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 218 } catch(CommunicationException communicationException) { 219 ResetConnection(); 220 success = false; 221 retries++; 222 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 223 } 224 } while(!success && retries < MAX_CONNECTION_RETRIES); 225 restartCounter++; 117 226 } 118 } catch(Exception e) {119 // catch all exceptions set an exception flag, signal the wait-handle and exit the routine120 this.exception = new Exception("There was a problem with parallel execution", e);121 waithandles[engineGuid].Set();122 return;123 227 } 124 228 125 229 // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem 126 230 if(restartCounter > MAX_RESTARTS) { 127 this.exception = new Exception("Maximal number of parallel job restarts reached"); 128 waithandles[engineGuid].Set(); 129 return; 130 } 131 132 Thread.Sleep(TimeSpan.FromSeconds(10.0)); 231 throw new ApplicationException("Maximum number of job restarts reached."); 232 } 133 233 } while(true); 134 234 }
Note: See TracChangeset
for help on using the changeset viewer.