Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
05/30/08 13:41:27 (16 years ago)
Author:
gkronber
Message:

merged changesets r219:r228, r240, r241:258, r263:265, r267,r268, r269 from trunk into the HL3 stable branch

File:
1 copied

Legend:

Unmodified
Added
Removed
  • branches/3.0/sources/HeuristicLab.DistributedEngine/JobManager.cs

    r228 r279  
    1 using System;
     1#region License Information
     2/* HeuristicLab
     3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
     4 *
     5 * This file is part of HeuristicLab.
     6 *
     7 * HeuristicLab is free software: you can redistribute it and/or modify
     8 * it under the terms of the GNU General Public License as published by
     9 * the Free Software Foundation, either version 3 of the License, or
     10 * (at your option) any later version.
     11 *
     12 * HeuristicLab is distributed in the hope that it will be useful,
     13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
     14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     15 * GNU General Public License for more details.
     16 *
     17 * You should have received a copy of the GNU General Public License
     18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
     19 */
     20#endregion
     21
     22using System;
    223using System.Collections.Generic;
    324using System.Linq;
     
    1536    private IGridServer server;
    1637    private string address;
    17     private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>();
    18     private Dictionary<Guid, byte[]> runningEngines = new Dictionary<Guid, byte[]>();
     38    private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
    1939    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
    20     private object locker = new object();
     40    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
     41    private object connectionLock = new object();
     42    private object dictionaryLock = new object();
     43
    2144    private const int MAX_RESTARTS = 5;
    22     private Exception exception;
     45    private const int MAX_CONNECTION_RETRIES = 10;
     46    private const int RETRY_TIMEOUT_SEC = 10;
     47    private const int CHECK_RESULTS_TIMEOUT = 10;
     48
    2349    private ChannelFactory<IGridServer> factory;
    24     public Exception Exception {
    25       get { return exception; }
    26     }
    2750
    2851    public JobManager(string address) {
     
    3154
    3255    internal void Reset() {
    33       lock(locker) {
    34         ResetConnection();
     56      ResetConnection();
     57      lock(dictionaryLock) {
    3558        foreach(WaitHandle wh in waithandles.Values) wh.Close();
    3659        waithandles.Clear();
    37         engineOperations.Clear();
    38         runningEngines.Clear();
    39         exception = null;
     60        engines.Clear();
     61        results.Clear();
    4062      }
    4163    }
    4264
    4365    private void ResetConnection() {
    44       // open a new channel
    45       NetTcpBinding binding = new NetTcpBinding();
    46       binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
    47       binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
    48       binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
    49       binding.Security.Mode = SecurityMode.None;
    50       factory = new ChannelFactory<IGridServer>(binding);
    51       server = factory.CreateChannel(new EndpointAddress(address));
    52     }
    53 
    54     public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) {
    55       ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed?
     66      lock(connectionLock) {
     67        // open a new channel
     68        NetTcpBinding binding = new NetTcpBinding();
     69        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
     70        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
     71        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
     72        binding.Security.Mode = SecurityMode.None;
     73
     74        factory = new ChannelFactory<IGridServer>(binding);
     75        server = factory.CreateChannel(new EndpointAddress(address));
     76      }
     77    }
     78
     79    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
     80      ProcessingEngine engine = new ProcessingEngine(globalScope, operation);
     81      byte[] zippedEngine = ZipEngine(engine);
     82      Guid currentEngineGuid = Guid.Empty;
     83      bool success = false;
     84      int retryCount = 0;
     85      do {
     86        lock(connectionLock) {
     87          try {
     88            currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
     89            success = true;
     90          } catch(TimeoutException timeoutException) {
     91            if(retryCount < MAX_CONNECTION_RETRIES) {
     92              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     93              retryCount++;
     94            } else {
     95              throw new ApplicationException("Max retries reached.", timeoutException);
     96            }
     97          } catch(CommunicationException communicationException) {
     98            ResetConnection();
     99            // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution)
     100            if(retryCount < MAX_CONNECTION_RETRIES) {
     101              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     102              retryCount++;
     103            } else {
     104              throw new ApplicationException("Max retries reached.", communicationException);
     105            }
     106          }
     107        }
     108      } while(!success);
     109      lock(dictionaryLock) {
     110        engines[currentEngineGuid] = engine;
     111        waithandles[currentEngineGuid] = new ManualResetEvent(false);
     112      }
     113      ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
     114      return waithandles[currentEngineGuid];
     115    }
     116
     117    private byte[] ZipEngine(ProcessingEngine engine) {
    56118      MemoryStream memStream = new MemoryStream();
    57119      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
    58120      PersistenceManager.Save(engine, stream);
    59121      stream.Close();
    60       if(factory.State != CommunicationState.Opened)
    61         ResetConnection();
    62       Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
    63       lock(locker) {
    64         runningEngines[currentEngineGuid] = memStream.ToArray();
    65         engineOperations[currentEngineGuid] = operation;
    66         waithandles[currentEngineGuid] = new ManualResetEvent(false);
    67         ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
    68       }
    69       return waithandles[currentEngineGuid];
     122      byte[] zippedEngine = memStream.ToArray();
     123      memStream.Close();
     124      return zippedEngine;
     125    }
     126
     127    public IScope EndExecuteOperation(AtomicOperation operation) {
     128      byte[] zippedResult = null;
     129      lock(dictionaryLock) {
     130        zippedResult = results[operation];
     131        results.Remove(operation);
     132      }
     133      // restore the engine
     134      using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
     135        ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
     136        return resultEngine.InitialOperation.Scope;
     137      }     
    70138    }
    71139
     
    74142      int restartCounter = 0;
    75143      do {
    76         try {
    77           if(factory.State != CommunicationState.Opened) ResetConnection();
    78           byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100);
    79           if(resultXml != null) {
    80             // restore the engine
    81             GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress);
    82             ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
    83 
    84             // merge the results
    85             IScope oldScope = engineOperations[engineGuid].Scope;
    86             oldScope.Clear();
    87             foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) {
    88               oldScope.AddVariable(variable);
    89             }
    90             foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) {
    91               oldScope.AddSubScope(subScope);
    92             }
    93             foreach(KeyValuePair<string, string> alias in resultEngine.InitialOperation.Scope.Aliases) {
    94               oldScope.AddAlias(alias.Key, alias.Value);
    95             }
    96 
    97             lock(locker) {
    98               // signal the wait handle and clean up then return
    99               waithandles[engineGuid].Set();
    100               engineOperations.Remove(engineGuid);
    101               waithandles.Remove(engineGuid);
    102               runningEngines.Remove(engineGuid);
    103             }
    104             return;
    105           } else {
    106             // check if the server is still working on the job
    107             JobState jobState = server.JobState(engineGuid);
    108             if(jobState == JobState.Unkown) {
    109               // restart job
    110               byte[] packedEngine;
    111               lock(locker) {
    112                 packedEngine = runningEngines[engineGuid];
     144        Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
     145        byte[] zippedResult = null;
     146        lock(connectionLock) {
     147          bool success = false;
     148          int retries = 0;
     149          do {
     150            try {
     151              zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
     152              success = true;
     153            } catch(TimeoutException timeoutException) {
     154              success = false;
     155              retries++;
     156              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     157            } catch(CommunicationException communicationException) {
     158              ResetConnection();
     159              success = false;
     160              retries++;
     161              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     162            }
     163
     164          } while(!success && retries < MAX_CONNECTION_RETRIES);
     165        }
     166        if(zippedResult != null) {
     167          lock(dictionaryLock) {
     168            // store result
     169            results[engines[engineGuid].InitialOperation] = zippedResult;
     170
     171            // signal the wait handle and clean up then return
     172            engines.Remove(engineGuid);
     173            waithandles[engineGuid].Set();
     174            waithandles.Remove(engineGuid);
     175          }
     176          return;
     177        } else {
     178          // check if the server is still working on the job
     179          bool success = false;
     180          int retries = 0;
     181          JobState jobState = JobState.Unkown;
     182          do {
     183            try {
     184              lock(connectionLock) {
     185                jobState = server.JobState(engineGuid);
    113186              }
    114               server.BeginExecuteEngine(packedEngine);
    115               restartCounter++;
    116             }
     187              success = true;
     188            } catch(TimeoutException timeoutException) {
     189              retries++;
     190              success = false;
     191              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     192            } catch(CommunicationException communicationException) {
     193              ResetConnection();
     194              retries++;
     195              success = false;
     196              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     197            }
     198          } while(!success && retries < MAX_CONNECTION_RETRIES);
     199          if(jobState == JobState.Unkown) {
     200            // restart job
     201            ProcessingEngine engine;
     202            lock(dictionaryLock) {
     203              engine = engines[engineGuid];
     204            }
     205            byte[] zippedEngine = ZipEngine(engine);
     206            success = false;
     207            retries = 0;
     208            do {
     209              try {
     210                lock(connectionLock) {
     211                  server.BeginExecuteEngine(zippedEngine);
     212                }
     213                success = true;
     214              } catch(TimeoutException timeoutException) {
     215                success = false;
     216                retries++;
     217                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     218              } catch(CommunicationException communicationException) {
     219                ResetConnection();
     220                success = false;
     221                retries++;
     222                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     223              }
     224            } while(!success && retries < MAX_CONNECTION_RETRIES);
     225            restartCounter++;
    117226          }
    118         } catch(Exception e) {
    119           // catch all exceptions set an exception flag, signal the wait-handle and exit the routine
    120           this.exception = new Exception("There was a problem with parallel execution", e);
    121           waithandles[engineGuid].Set();
    122           return;
    123227        }
    124228
    125229        // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem
    126230        if(restartCounter > MAX_RESTARTS) {
    127           this.exception = new Exception("Maximal number of parallel job restarts reached");
    128           waithandles[engineGuid].Set();
    129           return;
    130         }
    131 
    132         Thread.Sleep(TimeSpan.FromSeconds(10.0));
     231          throw new ApplicationException("Maximum number of job restarts reached.");
     232        }
    133233      } while(true);
    134234    }
Note: See TracChangeset for help on using the changeset viewer.