- Timestamp:
- 05/30/08 13:41:27 (16 years ago)
- File:
-
- 1 copied
Legend:
- Unmodified
- Added
- Removed
-
branches/3.0/sources/HeuristicLab.DistributedEngine/JobManager.cs
r228 r279 1 using System; 1 #region License Information 2 /* HeuristicLab 3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL) 4 * 5 * This file is part of HeuristicLab. 6 * 7 * HeuristicLab is free software: you can redistribute it and/or modify 8 * it under the terms of the GNU General Public License as published by 9 * the Free Software Foundation, either version 3 of the License, or 10 * (at your option) any later version. 11 * 12 * HeuristicLab is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 * GNU General Public License for more details. 16 * 17 * You should have received a copy of the GNU General Public License 18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>. 19 */ 20 #endregion 21 22 using System; 2 23 using System.Collections.Generic; 3 24 using System.Linq; … … 15 36 private IGridServer server; 16 37 private string address; 17 private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>(); 18 private Dictionary<Guid, byte[]> runningEngines = new Dictionary<Guid, byte[]>(); 38 private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>(); 19 39 private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>(); 20 private object locker = new object(); 40 private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>(); 41 private object connectionLock = new object(); 42 private object dictionaryLock = new object(); 43 21 44 private const int MAX_RESTARTS = 5; 22 private Exception exception; 45 private const int MAX_CONNECTION_RETRIES = 10; 46 private const int RETRY_TIMEOUT_SEC = 10; 47 private const int CHECK_RESULTS_TIMEOUT = 10; 48 23 49 private ChannelFactory<IGridServer> factory; 24 public Exception Exception {25 get { return exception; }26 }27 50 28 51 public JobManager(string address) { … … 31 54 32 55 internal void Reset() { 33 lock(locker) {34 ResetConnection();56 ResetConnection(); 57 lock(dictionaryLock) { 35 58 foreach(WaitHandle wh in waithandles.Values) wh.Close(); 36 59 waithandles.Clear(); 37 engineOperations.Clear(); 38 runningEngines.Clear(); 39 exception = null; 60 engines.Clear(); 61 results.Clear(); 40 62 } 41 63 } 42 64 43 65 private void ResetConnection() { 44 // open a new channel 45 NetTcpBinding binding = new NetTcpBinding(); 46 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes 47 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 48 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 49 binding.Security.Mode = SecurityMode.None; 50 factory = new ChannelFactory<IGridServer>(binding); 51 server = factory.CreateChannel(new EndpointAddress(address)); 52 } 53 54 public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) { 55 ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed? 66 lock(connectionLock) { 67 // open a new channel 68 NetTcpBinding binding = new NetTcpBinding(); 69 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes 70 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 71 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 72 binding.Security.Mode = SecurityMode.None; 73 74 factory = new ChannelFactory<IGridServer>(binding); 75 server = factory.CreateChannel(new EndpointAddress(address)); 76 } 77 } 78 79 public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) { 80 ProcessingEngine engine = new ProcessingEngine(globalScope, operation); 81 byte[] zippedEngine = ZipEngine(engine); 82 Guid currentEngineGuid = Guid.Empty; 83 bool success = false; 84 int retryCount = 0; 85 do { 86 lock(connectionLock) { 87 try { 88 currentEngineGuid = server.BeginExecuteEngine(zippedEngine); 89 success = true; 90 } catch(TimeoutException timeoutException) { 91 if(retryCount < MAX_CONNECTION_RETRIES) { 92 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 93 retryCount++; 94 } else { 95 throw new ApplicationException("Max retries reached.", timeoutException); 96 } 97 } catch(CommunicationException communicationException) { 98 ResetConnection(); 99 // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution) 100 if(retryCount < MAX_CONNECTION_RETRIES) { 101 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 102 retryCount++; 103 } else { 104 throw new ApplicationException("Max retries reached.", communicationException); 105 } 106 } 107 } 108 } while(!success); 109 lock(dictionaryLock) { 110 engines[currentEngineGuid] = engine; 111 waithandles[currentEngineGuid] = new ManualResetEvent(false); 112 } 113 ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid); 114 return waithandles[currentEngineGuid]; 115 } 116 117 private byte[] ZipEngine(ProcessingEngine engine) { 56 118 MemoryStream memStream = new MemoryStream(); 57 119 GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); 58 120 PersistenceManager.Save(engine, stream); 59 121 stream.Close(); 60 if(factory.State != CommunicationState.Opened) 61 ResetConnection(); 62 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 63 lock(locker) { 64 runningEngines[currentEngineGuid] = memStream.ToArray(); 65 engineOperations[currentEngineGuid] = operation; 66 waithandles[currentEngineGuid] = new ManualResetEvent(false); 67 ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid); 68 } 69 return waithandles[currentEngineGuid]; 122 byte[] zippedEngine = memStream.ToArray(); 123 memStream.Close(); 124 return zippedEngine; 125 } 126 127 public IScope EndExecuteOperation(AtomicOperation operation) { 128 byte[] zippedResult = null; 129 lock(dictionaryLock) { 130 zippedResult = results[operation]; 131 results.Remove(operation); 132 } 133 // restore the engine 134 using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) { 135 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 136 return resultEngine.InitialOperation.Scope; 137 } 70 138 } 71 139 … … 74 142 int restartCounter = 0; 75 143 do { 76 try { 77 if(factory.State != CommunicationState.Opened) ResetConnection(); 78 byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100); 79 if(resultXml != null) { 80 // restore the engine 81 GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress); 82 ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); 83 84 // merge the results 85 IScope oldScope = engineOperations[engineGuid].Scope; 86 oldScope.Clear(); 87 foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) { 88 oldScope.AddVariable(variable); 89 } 90 foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) { 91 oldScope.AddSubScope(subScope); 92 } 93 foreach(KeyValuePair<string, string> alias in resultEngine.InitialOperation.Scope.Aliases) { 94 oldScope.AddAlias(alias.Key, alias.Value); 95 } 96 97 lock(locker) { 98 // signal the wait handle and clean up then return 99 waithandles[engineGuid].Set(); 100 engineOperations.Remove(engineGuid); 101 waithandles.Remove(engineGuid); 102 runningEngines.Remove(engineGuid); 103 } 104 return; 105 } else { 106 // check if the server is still working on the job 107 JobState jobState = server.JobState(engineGuid); 108 if(jobState == JobState.Unkown) { 109 // restart job 110 byte[] packedEngine; 111 lock(locker) { 112 packedEngine = runningEngines[engineGuid]; 144 Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT)); 145 byte[] zippedResult = null; 146 lock(connectionLock) { 147 bool success = false; 148 int retries = 0; 149 do { 150 try { 151 zippedResult = server.TryEndExecuteEngine(engineGuid, 100); 152 success = true; 153 } catch(TimeoutException timeoutException) { 154 success = false; 155 retries++; 156 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 157 } catch(CommunicationException communicationException) { 158 ResetConnection(); 159 success = false; 160 retries++; 161 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 162 } 163 164 } while(!success && retries < MAX_CONNECTION_RETRIES); 165 } 166 if(zippedResult != null) { 167 lock(dictionaryLock) { 168 // store result 169 results[engines[engineGuid].InitialOperation] = zippedResult; 170 171 // signal the wait handle and clean up then return 172 engines.Remove(engineGuid); 173 waithandles[engineGuid].Set(); 174 waithandles.Remove(engineGuid); 175 } 176 return; 177 } else { 178 // check if the server is still working on the job 179 bool success = false; 180 int retries = 0; 181 JobState jobState = JobState.Unkown; 182 do { 183 try { 184 lock(connectionLock) { 185 jobState = server.JobState(engineGuid); 113 186 } 114 server.BeginExecuteEngine(packedEngine); 115 restartCounter++; 116 } 187 success = true; 188 } catch(TimeoutException timeoutException) { 189 retries++; 190 success = false; 191 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 192 } catch(CommunicationException communicationException) { 193 ResetConnection(); 194 retries++; 195 success = false; 196 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 197 } 198 } while(!success && retries < MAX_CONNECTION_RETRIES); 199 if(jobState == JobState.Unkown) { 200 // restart job 201 ProcessingEngine engine; 202 lock(dictionaryLock) { 203 engine = engines[engineGuid]; 204 } 205 byte[] zippedEngine = ZipEngine(engine); 206 success = false; 207 retries = 0; 208 do { 209 try { 210 lock(connectionLock) { 211 server.BeginExecuteEngine(zippedEngine); 212 } 213 success = true; 214 } catch(TimeoutException timeoutException) { 215 success = false; 216 retries++; 217 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 218 } catch(CommunicationException communicationException) { 219 ResetConnection(); 220 success = false; 221 retries++; 222 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); 223 } 224 } while(!success && retries < MAX_CONNECTION_RETRIES); 225 restartCounter++; 117 226 } 118 } catch(Exception e) {119 // catch all exceptions set an exception flag, signal the wait-handle and exit the routine120 this.exception = new Exception("There was a problem with parallel execution", e);121 waithandles[engineGuid].Set();122 return;123 227 } 124 228 125 229 // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem 126 230 if(restartCounter > MAX_RESTARTS) { 127 this.exception = new Exception("Maximal number of parallel job restarts reached"); 128 waithandles[engineGuid].Set(); 129 return; 130 } 131 132 Thread.Sleep(TimeSpan.FromSeconds(10.0)); 231 throw new ApplicationException("Maximum number of job restarts reached."); 232 } 133 233 } while(true); 134 234 }
Note: See TracChangeset
for help on using the changeset viewer.