Free cookie consent management tool by TermsFeed Policy Generator

Changeset 257 for trunk/sources


Ignore:
Timestamp:
05/14/08 16:37:39 (17 years ago)
Author:
gkronber
Message:

don't keep all zipped engines just in case we have to restart one of them. Instead keep the engine objects (we have to keep them anyway) and serialize - zip only if a restart is needed (#149)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs

    r256 r257  
    1515    private IGridServer server;
    1616    private string address;
    17     private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>();
    18     private Dictionary<Guid, byte[]> runningEngines = new Dictionary<Guid, byte[]>();
     17    private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
    1918    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
    2019    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
     
    3837        foreach(WaitHandle wh in waithandles.Values) wh.Close();
    3938        waithandles.Clear();
    40         engineOperations.Clear();
    41         runningEngines.Clear();
     39        engines.Clear();
    4240        results.Clear();
    4341      }
     
    6058    public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) {
    6159      ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed?
    62       MemoryStream memStream = new MemoryStream();
    63       GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
    64       PersistenceManager.Save(engine, stream);
    65       stream.Close();
    66       byte[] zippedEngine = memStream.ToArray();
    67       memStream.Close();
     60      byte[] zippedEngine = ZipEngine(engine);
    6861      Guid currentEngineGuid = Guid.Empty;
    6962      bool success = false;
     
    9487      } while(!success);
    9588      lock(dictionaryLock) {
    96         runningEngines[currentEngineGuid] = memStream.ToArray();
    97         engineOperations[currentEngineGuid] = operation;
     89        engines[currentEngineGuid] = engine;
    9890        waithandles[currentEngineGuid] = new ManualResetEvent(false);
    9991      }
    10092      ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
    10193      return waithandles[currentEngineGuid];
     94    }
     95
     96    private byte[] ZipEngine(ProcessingEngine engine) {
     97      MemoryStream memStream = new MemoryStream();
     98      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
     99      PersistenceManager.Save(engine, stream);
     100      stream.Close();
     101      byte[] zippedEngine = memStream.ToArray();
     102      memStream.Close();
     103      return zippedEngine;
    102104    }
    103105
     
    143145          lock(dictionaryLock) {
    144146            // store result
    145             results[engineOperations[engineGuid]] = zippedResult;
     147            results[engines[engineGuid].InitialOperation] = zippedResult;
    146148
    147149            // signal the wait handle and clean up then return
    148             engineOperations.Remove(engineGuid);
    149             runningEngines.Remove(engineGuid);
     150            engines.Remove(engineGuid);
    150151            waithandles[engineGuid].Set();
    151152            waithandles.Remove(engineGuid);
     
    176177          if(jobState == JobState.Unkown) {
    177178            // restart job
    178             byte[] packedEngine;
     179            ProcessingEngine engine;
    179180            lock(dictionaryLock) {
    180               packedEngine = runningEngines[engineGuid];
    181             }
     181              engine = engines[engineGuid];
     182            }
     183            byte[] zippedEngine = ZipEngine(engine);
    182184            success = false;
    183185            retries = 0;
     
    185187              try {
    186188                lock(connectionLock) {
    187                   server.BeginExecuteEngine(packedEngine);
     189                  server.BeginExecuteEngine(zippedEngine);
    188190                }
    189191                success = true;
Note: See TracChangeset for help on using the changeset viewer.