Free cookie consent management tool by TermsFeed Policy Generator

Changeset 248 for trunk/sources


Ignore:
Timestamp:
05/13/08 23:06:12 (17 years ago)
Author:
gkronber
Message:

improved stability of distributed engine (#149)

Location:
trunk/sources/HeuristicLab.DistributedEngine
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs

    r228 r248  
    9292            WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
    9393            int i = 0;
     94            // start all parallel jobs
    9495            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
    9596              waithandles[i++] = jobManager.BeginExecuteOperation(OperatorGraph, GlobalScope, parOperation);
    9697            }
     98
     99            // wait until all jobs are finished
    97100            // WaitAll works only with maximally 64 waithandles
    98101            if(waithandles.Length <= 64) {
     
    103106              }
    104107            }
    105             if(jobManager.Exception != null) {
    106               myExecutionStack.Push(compositeOperation);
    107               Abort();
    108               ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(jobManager.Exception); });
     108            // retrieve results and merge into scope-tree
     109            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
     110              IScope result = jobManager.EndExecuteOperation(parOperation);
     111              MergeScope(parOperation.Scope, result);
    109112            }
    110113          } catch(Exception e) {
    111114            myExecutionStack.Push(compositeOperation);
    112115            Abort();
    113             ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(jobManager.Exception); });
     116            ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
    114117          }
    115118        } else {
     
    117120            myExecutionStack.Push(compositeOperation.Operations[i]);
    118121        }
     122      }
     123    }
     124
     125    private void MergeScope(IScope original, IScope result) {
     126      // merge the results
     127      original.Clear();
     128      foreach(IVariable variable in result.Variables) {
     129        original.AddVariable(variable);
     130      }
     131      foreach(IScope subScope in result.SubScopes) {
     132        original.AddSubScope(subScope);
     133      }
     134      foreach(KeyValuePair<string, string> alias in result.Aliases) {
     135        original.AddAlias(alias.Key, alias.Value);
    119136      }
    120137    }
  • trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs

    r228 r248  
    1818    private Dictionary<Guid, byte[]> runningEngines = new Dictionary<Guid, byte[]>();
    1919    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
    20     private object locker = new object();
     20    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
     21    private object connectionLock = new object();
     22    private object dictionaryLock = new object();
     23
    2124    private const int MAX_RESTARTS = 5;
    22     private Exception exception;
     25    private const int MAX_CONNECTION_RETRIES = 10;
     26    private const int RETRY_TIMEOUT_SEC = 10;
     27    private const int CHECK_RESULTS_TIMEOUT = 10;
     28
    2329    private ChannelFactory<IGridServer> factory;
    24     public Exception Exception {
    25       get { return exception; }
    26     }
    2730
    2831    public JobManager(string address) {
     
    3134
    3235    internal void Reset() {
    33       lock(locker) {
    34         ResetConnection();
     36      ResetConnection();
     37      lock(dictionaryLock) {
    3538        foreach(WaitHandle wh in waithandles.Values) wh.Close();
    3639        waithandles.Clear();
    3740        engineOperations.Clear();
    3841        runningEngines.Clear();
    39         exception = null;
     42        results.Clear();
    4043      }
    4144    }
    4245
    4346    private void ResetConnection() {
    44       // open a new channel
    45       NetTcpBinding binding = new NetTcpBinding();
    46       binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
    47       binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
    48       binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
    49       binding.Security.Mode = SecurityMode.None;
    50       factory = new ChannelFactory<IGridServer>(binding);
    51       server = factory.CreateChannel(new EndpointAddress(address));
     47      lock(connectionLock) {
     48        // open a new channel
     49        NetTcpBinding binding = new NetTcpBinding();
     50        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
     51        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
     52        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
     53        binding.Security.Mode = SecurityMode.None;
     54
     55        factory = new ChannelFactory<IGridServer>(binding);
     56        server = factory.CreateChannel(new EndpointAddress(address));
     57      }
    5258    }
    5359
     
    5864      PersistenceManager.Save(engine, stream);
    5965      stream.Close();
    60       if(factory.State != CommunicationState.Opened)
    61         ResetConnection();
    62       Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
    63       lock(locker) {
     66      byte[] zippedEngine = memStream.ToArray();
     67      Guid currentEngineGuid = Guid.Empty;
     68      bool success = false;
     69      int retryCount = 0;
     70      do {
     71        lock(connectionLock) {
     72          if(factory.State != CommunicationState.Opened)
     73            ResetConnection();
     74          try {
     75            currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
     76            success = true;
     77          } catch(TimeoutException timeoutException) {
     78            if(retryCount < MAX_CONNECTION_RETRIES) {
     79              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     80              retryCount++;
     81            } else {
     82              throw new ApplicationException("Max retries reached.", timeoutException);
     83            }
     84          } catch(CommunicationException communicationException) {
     85            // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution)
     86            if(retryCount < MAX_CONNECTION_RETRIES) {
     87              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     88              retryCount++;
     89            } else {
     90              throw new ApplicationException("Max retries reached.", communicationException);
     91            }
     92          }
     93        }
     94      } while(!success);
     95      lock(dictionaryLock) {
    6496        runningEngines[currentEngineGuid] = memStream.ToArray();
    6597        engineOperations[currentEngineGuid] = operation;
    6698        waithandles[currentEngineGuid] = new ManualResetEvent(false);
    67         ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
    6899      }
     100      ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
    69101      return waithandles[currentEngineGuid];
     102    }
     103
     104    public IScope EndExecuteOperation(AtomicOperation operation) {
     105      byte[] zippedResult = results[operation];
     106      // restore the engine
     107      GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress);
     108      ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
     109
     110      return resultEngine.InitialOperation.Scope;
    70111    }
    71112
     
    74115      int restartCounter = 0;
    75116      do {
    76         try {
    77           if(factory.State != CommunicationState.Opened) ResetConnection();
    78           byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100);
    79           if(resultXml != null) {
    80             // restore the engine
    81             GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress);
    82             ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
    83 
    84             // merge the results
    85             IScope oldScope = engineOperations[engineGuid].Scope;
    86             oldScope.Clear();
    87             foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) {
    88               oldScope.AddVariable(variable);
    89             }
    90             foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) {
    91               oldScope.AddSubScope(subScope);
    92             }
    93             foreach(KeyValuePair<string, string> alias in resultEngine.InitialOperation.Scope.Aliases) {
    94               oldScope.AddAlias(alias.Key, alias.Value);
    95             }
    96 
    97             lock(locker) {
    98               // signal the wait handle and clean up then return
    99               waithandles[engineGuid].Set();
    100               engineOperations.Remove(engineGuid);
    101               waithandles.Remove(engineGuid);
    102               runningEngines.Remove(engineGuid);
    103             }
    104             return;
    105           } else {
    106             // check if the server is still working on the job
    107             JobState jobState = server.JobState(engineGuid);
    108             if(jobState == JobState.Unkown) {
    109               // restart job
    110               byte[] packedEngine;
    111               lock(locker) {
    112                 packedEngine = runningEngines[engineGuid];
     117        byte[] zippedResult = null;
     118        lock(connectionLock) {
     119          bool success = false;
     120          int retries = 0;
     121          do {
     122            if(factory.State != CommunicationState.Opened) ResetConnection();
     123            try {
     124              zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
     125              success = true;
     126            } catch(TimeoutException timeoutException) {
     127              success = false;
     128              retries++;
     129              Thread.Sleep(RETRY_TIMEOUT_SEC);
     130            } catch(CommunicationException communicationException) {
     131              success = false;
     132              retries++;
     133              Thread.Sleep(RETRY_TIMEOUT_SEC);
     134            }
     135
     136          } while(!success && retries < MAX_CONNECTION_RETRIES);
     137        }
     138        if(zippedResult != null) {
     139          lock(dictionaryLock) {
     140            // store result
     141            results[engineOperations[engineGuid]] = zippedResult;
     142
     143            // signal the wait handle and clean up then return
     144            engineOperations.Remove(engineGuid);
     145            runningEngines.Remove(engineGuid);
     146            waithandles[engineGuid].Set();
     147            waithandles.Remove(engineGuid);
     148          }
     149          return;
     150        } else {
     151          // check if the server is still working on the job
     152          bool success = false;
     153          int retries = 0;
     154          JobState jobState = JobState.Unkown;
     155          do {
     156            try {
     157              lock(connectionLock) {
     158                if(factory.State != CommunicationState.Opened) ResetConnection();
     159                jobState = server.JobState(engineGuid);
    113160              }
    114               server.BeginExecuteEngine(packedEngine);
    115               restartCounter++;
    116             }
     161              success = true;
     162            } catch(TimeoutException timeoutException) {
     163              retries++;
     164              success = false;
     165              Thread.Sleep(RETRY_TIMEOUT_SEC);
     166            } catch(CommunicationException communicationException) {
     167              retries++;
     168              success = false;
     169              Thread.Sleep(RETRY_TIMEOUT_SEC);
     170            }
     171          } while(!success && retries < MAX_CONNECTION_RETRIES);
     172          if(jobState == JobState.Unkown) {
     173            // restart job
     174            byte[] packedEngine;
     175            lock(dictionaryLock) {
     176              packedEngine = runningEngines[engineGuid];
     177            }
     178            success = false;
     179            retries = 0;
     180            do {
     181              try {
     182                lock(connectionLock) {
     183                  if(factory.State != CommunicationState.Opened) ResetConnection();
     184                  server.BeginExecuteEngine(packedEngine);
     185                }
     186                success = true;
     187              } catch(TimeoutException timeoutException) {
     188                success = false;
     189                retries++;
     190                Thread.Sleep(RETRY_TIMEOUT_SEC);
     191              } catch(CommunicationException communicationException) {
     192                success = false;
     193                retries++;
     194                Thread.Sleep(RETRY_TIMEOUT_SEC);
     195              }
     196            } while(!success && retries < MAX_CONNECTION_RETRIES);
     197            restartCounter++;
    117198          }
    118         } catch(Exception e) {
    119           // catch all exceptions set an exception flag, signal the wait-handle and exit the routine
    120           this.exception = new Exception("There was a problem with parallel execution", e);
    121           waithandles[engineGuid].Set();
    122           return;
    123199        }
    124200
    125201        // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem
    126202        if(restartCounter > MAX_RESTARTS) {
    127           this.exception = new Exception("Maximal number of parallel job restarts reached");
    128           waithandles[engineGuid].Set();
    129           return;
    130         }
    131 
    132         Thread.Sleep(TimeSpan.FromSeconds(10.0));
     203          throw new ApplicationException("Maximum number of job restarts reached.");
     204        }
     205
     206        Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
    133207      } while(true);
    134208    }
Note: See TracChangeset for help on using the changeset viewer.