Changeset 257
- Timestamp:
- 05/14/08 16:37:39 (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs
r256 r257 15 15 private IGridServer server; 16 16 private string address; 17 private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>(); 18 private Dictionary<Guid, byte[]> runningEngines = new Dictionary<Guid, byte[]>(); 17 private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>(); 19 18 private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>(); 20 19 private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>(); … … 38 37 foreach(WaitHandle wh in waithandles.Values) wh.Close(); 39 38 waithandles.Clear(); 40 engineOperations.Clear(); 41 runningEngines.Clear(); 39 engines.Clear(); 42 40 results.Clear(); 43 41 } … … 60 58 public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) { 61 59 ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed? 62 MemoryStream memStream = new MemoryStream(); 63 GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); 64 PersistenceManager.Save(engine, stream); 65 stream.Close(); 66 byte[] zippedEngine = memStream.ToArray(); 67 memStream.Close(); 60 byte[] zippedEngine = ZipEngine(engine); 68 61 Guid currentEngineGuid = Guid.Empty; 69 62 bool success = false; … … 94 87 } while(!success); 95 88 lock(dictionaryLock) { 96 runningEngines[currentEngineGuid] = memStream.ToArray(); 97 engineOperations[currentEngineGuid] = operation; 89 engines[currentEngineGuid] = engine; 98 90 waithandles[currentEngineGuid] = new ManualResetEvent(false); 99 91 } 100 92 ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid); 101 93 return waithandles[currentEngineGuid]; 94 } 95 96 private byte[] ZipEngine(ProcessingEngine engine) { 97 MemoryStream memStream = new MemoryStream(); 98 GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); 99 PersistenceManager.Save(engine, stream); 100 stream.Close(); 101 byte[] zippedEngine = memStream.ToArray(); 102 memStream.Close(); 103 return zippedEngine; 102 104 } 103 105 … … 143 145 lock(dictionaryLock) { 144 146 // store result 145 results[engine Operations[engineGuid]] = zippedResult;147 results[engines[engineGuid].InitialOperation] = zippedResult; 146 148 147 149 // signal the wait handle and clean up then return 148 engineOperations.Remove(engineGuid); 149 runningEngines.Remove(engineGuid); 150 engines.Remove(engineGuid); 150 151 waithandles[engineGuid].Set(); 151 152 waithandles.Remove(engineGuid); … … 176 177 if(jobState == JobState.Unkown) { 177 178 // restart job 178 byte[] packedEngine;179 ProcessingEngine engine; 179 180 lock(dictionaryLock) { 180 packedEngine = runningEngines[engineGuid]; 181 } 181 engine = engines[engineGuid]; 182 } 183 byte[] zippedEngine = ZipEngine(engine); 182 184 success = false; 183 185 retries = 0; … … 185 187 try { 186 188 lock(connectionLock) { 187 server.BeginExecuteEngine( packedEngine);189 server.BeginExecuteEngine(zippedEngine); 188 190 } 189 191 success = true;
Note: See TracChangeset
for help on using the changeset viewer.