Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
05/30/08 13:41:27 (17 years ago)
Author:
gkronber
Message:

merged changesets r219:r228, r240, r241:258, r263:265, r267,r268, r269 from trunk into the HL3 stable branch

Location:
branches/3.0/sources/HeuristicLab.DistributedEngine
Files:
2 edited
1 copied

Legend:

Unmodified
Added
Removed
  • branches/3.0/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs

    r36 r279  
    3030using System.IO;
    3131using System.IO.Compression;
     32using HeuristicLab.PluginInfrastructure;
     33using System.Windows.Forms;
     34using System.Diagnostics;
    3235
    3336namespace HeuristicLab.DistributedEngine {
    3437  public class DistributedEngine : EngineBase, IEditable {
    35     private IGridServer server;
    36     private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>();
    37     private List<Guid> runningEngines = new List<Guid>();
     38    private JobManager jobManager;
     39    private CompositeOperation waitingOperations;
    3840    private string serverAddress;
    39     private bool cancelRequested;
    40     private CompositeOperation waitingOperations;
    4141    public string ServerAddress {
    4242      get { return serverAddress; }
     
    4747      }
    4848    }
    49     public override bool Terminated {
    50       get {
    51         return myExecutionStack.Count == 0 && runningEngines.Count == 0 && waitingOperations==null;
    52       }
    53     }
    5449    public override object Clone(IDictionary<Guid, object> clonedObjects) {
    5550      DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects);
     
    6661
    6762    public override void Execute() {
    68       NetTcpBinding binding = new NetTcpBinding();
    69       binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
    70       binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
    71       binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
    72       binding.Security.Mode = SecurityMode.None;
    73       ChannelFactory<IGridServer> factory = new ChannelFactory<IGridServer>(binding);
    74       server = factory.CreateChannel(new EndpointAddress(serverAddress));
    75 
     63      if(jobManager == null) this.jobManager = new JobManager(serverAddress);
     64      jobManager.Reset();
    7665      base.Execute();
    7766    }
     
    8170    }
    8271
    83     public override void Abort() {
    84       lock(runningEngines) {
    85         cancelRequested = true;
    86         foreach(Guid engineGuid in runningEngines) {
    87           server.AbortEngine(engineGuid);
    88         }
    89       }
    90     }
    91     public override void Reset() {
    92       base.Reset();
    93       engineOperations.Clear();
    94       runningEngines.Clear();
    95       cancelRequested = false;
    96     }
    97 
    9872    protected override void ProcessNextOperation() {
    99       lock(runningEngines) {
    100         if(runningEngines.Count == 0 && cancelRequested) {
    101           base.Abort();
    102           cancelRequested = false;
    103           if(waitingOperations != null && waitingOperations.Operations.Count != 0) {
    104             myExecutionStack.Push(waitingOperations);
    105             waitingOperations = null;
     73      IOperation operation = myExecutionStack.Pop();
     74      if(operation is AtomicOperation) {
     75        AtomicOperation atomicOperation = (AtomicOperation)operation;
     76        IOperation next = null;
     77        try {
     78          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
     79        } catch(Exception ex) {
     80          // push operation on stack again
     81          myExecutionStack.Push(atomicOperation);
     82          Abort();
     83          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
     84        }
     85        if(next != null)
     86          myExecutionStack.Push(next);
     87        OnOperationExecuted(atomicOperation);
     88        if(atomicOperation.Operator.Breakpoint) Abort();
     89      } else if(operation is CompositeOperation) {
     90        CompositeOperation compositeOperation = (CompositeOperation)operation;
     91        if(compositeOperation.ExecuteInParallel) {
     92          try {
     93            WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
     94            int i = 0;
     95            // HACK: assume that all atomicOperations have the same parent scope.
     96            // 1) find that parent scope
     97            // 2) remove all branches starting from the global scope that don't lead to the parentScope of the parallel operation
     98            // 3) keep the branches to 'repair' the scope-tree later
     99            // 4) for each parallel job attach only the sub-scope that this operation uses
     100            // 5) after starting all parallel jobs restore the whole scope-tree
     101            IScope parentScope = FindParentScope(GlobalScope, compositeOperation);
     102            List<IList<IScope>> prunedScopes = new List<IList<IScope>>();
     103            PruneToParentScope(GlobalScope, parentScope, prunedScopes);
     104            List<IScope> subScopes = new List<IScope>(parentScope.SubScopes);
     105            foreach(IScope scope in subScopes) {
     106              parentScope.RemoveSubScope(scope);
     107            }
     108            // start all parallel jobs
     109            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
     110              parentScope.AddSubScope(parOperation.Scope);
     111              waithandles[i++] = jobManager.BeginExecuteOperation(GlobalScope, parOperation);
     112              parentScope.RemoveSubScope(parOperation.Scope);
     113            }
     114            foreach(IScope scope in subScopes) {
     115              parentScope.AddSubScope(scope);
     116            }
     117            prunedScopes.Reverse();
     118            RestoreFullTree(GlobalScope, prunedScopes);
     119
     120            // wait until all jobs are finished
     121            // WaitAll works only with maximally 64 waithandles
     122            if(waithandles.Length <= 64) {
     123              WaitHandle.WaitAll(waithandles);
     124            } else {
     125              for(i = 0; i < waithandles.Length; i++) {
     126                waithandles[i].WaitOne();
     127                waithandles[i].Close();
     128              }
     129            }
     130            // retrieve results and merge into scope-tree
     131            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
     132              IScope result = jobManager.EndExecuteOperation(parOperation);
     133              MergeScope(parOperation.Scope, result);
     134            }
     135          } catch(Exception e) {
     136            myExecutionStack.Push(compositeOperation);
     137            Abort();
     138            ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(e); });
    106139          }
    107           return;
    108         }
    109         if(runningEngines.Count != 0) {
    110           Guid engineGuid = runningEngines[0];
    111           byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100);
    112           if(resultXml != null) {
    113             GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress);
    114             ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
    115             IScope oldScope = engineOperations[engineGuid].Scope;
    116             oldScope.Clear();
    117             foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) {
    118               oldScope.AddVariable(variable);
    119             }
    120             foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) {
    121               oldScope.AddSubScope(subScope);
    122             }
    123             OnOperationExecuted(engineOperations[engineGuid]);
    124 
    125             if(cancelRequested & resultEngine.ExecutionStack.Count != 0) {
    126               if(waitingOperations == null) {
    127                 waitingOperations = new CompositeOperation();
    128                 waitingOperations.ExecuteInParallel = false;
    129               }
    130               CompositeOperation task = new CompositeOperation();
    131               while(resultEngine.ExecutionStack.Count > 0) {
    132                 AtomicOperation oldOperation = (AtomicOperation)resultEngine.ExecutionStack.Pop();
    133                 if(oldOperation.Scope == resultEngine.InitialOperation.Scope) {
    134                   oldOperation = new AtomicOperation(oldOperation.Operator, oldScope);
    135                 }
    136                 task.AddOperation(oldOperation);
    137               }
    138               waitingOperations.AddOperation(task);
    139             }
    140             runningEngines.Remove(engineGuid);
    141             engineOperations.Remove(engineGuid);
    142           }
    143           return;
    144         }
    145         IOperation operation = myExecutionStack.Pop();
    146         if(operation is AtomicOperation) {
    147           AtomicOperation atomicOperation = (AtomicOperation)operation;
    148           IOperation next = null;
    149           try {
    150             next = atomicOperation.Operator.Execute(atomicOperation.Scope);
    151           } catch(Exception ex) {
    152             // push operation on stack again
    153             myExecutionStack.Push(atomicOperation);
    154             Abort();
    155             ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    156           }
    157           if(next != null)
    158             myExecutionStack.Push(next);
    159           OnOperationExecuted(atomicOperation);
    160           if(atomicOperation.Operator.Breakpoint) Abort();
    161         } else if(operation is CompositeOperation) {
    162           CompositeOperation compositeOperation = (CompositeOperation)operation;
    163           if(compositeOperation.ExecuteInParallel) {
    164             foreach(AtomicOperation parOperation in compositeOperation.Operations) {
    165               ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed?
    166               MemoryStream memStream = new MemoryStream();
    167               GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
    168               PersistenceManager.Save(engine, stream);
    169               stream.Close();
    170               Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
    171               runningEngines.Add(currentEngineGuid);
    172               engineOperations[currentEngineGuid] = parOperation;
    173             }
    174           } else {
    175             for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
    176               myExecutionStack.Push(compositeOperation.Operations[i]);
    177           }
    178         }
     140        } else {
     141          for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
     142            myExecutionStack.Push(compositeOperation.Operations[i]);
     143        }
     144      }
     145    }
     146
     147    private void RestoreFullTree(IScope currentScope, IList<IList<IScope>> savedScopes) {
     148      if(savedScopes.Count == 0) return;
     149      IScope remainingBranch = currentScope.SubScopes[0];
     150      currentScope.RemoveSubScope(remainingBranch);
     151      IList<IScope> savedScopesForCurrent = savedScopes[0];
     152      foreach(IScope savedScope in savedScopesForCurrent) {
     153        currentScope.AddSubScope(savedScope);
     154      }
     155      savedScopes.RemoveAt(0);
     156      RestoreFullTree(remainingBranch, savedScopes);
     157    }
     158
     159    private IScope PruneToParentScope(IScope currentScope, IScope scope, IList<IList<IScope>> prunedScopes) {
     160      if(currentScope == scope) return currentScope;
     161      if(currentScope.SubScopes.Count == 0) return null;
     162      IScope foundScope = null;
     163      // try to find the searched scope in all my sub-scopes
     164      foreach(IScope subScope in currentScope.SubScopes) {
     165        foundScope = PruneToParentScope(subScope, scope, prunedScopes);
     166        if(foundScope != null) break; // we can stop as soon as we find the scope in a branch
     167      }
     168      if(foundScope != null) { // when we found the scopes in my sub-scopes
     169        List<IScope> subScopes = new List<IScope>(currentScope.SubScopes); // store the list of sub-scopes
     170        prunedScopes.Add(subScopes);
     171        // remove all my sub-scopes
     172        foreach(IScope subScope in subScopes) {
     173          currentScope.RemoveSubScope(subScope);
     174        }
     175        // add only the branch that leads to the scope that I search for
     176        currentScope.AddSubScope(foundScope);
     177        return currentScope; // return that this scope contains the branch that leads to the searched scopes
     178      } else {
     179        return null; // otherwise we didn't find the searched scope and we can return null
     180      }
     181    }
     182
     183    private IScope FindParentScope(IScope currentScope, CompositeOperation compositeOperation) {
     184      AtomicOperation currentOperation = (AtomicOperation)compositeOperation.Operations[0];
     185      if(currentScope.SubScopes.Contains(currentOperation.Scope)) return currentScope;
     186      foreach(IScope subScope in currentScope.SubScopes) {
     187        IScope result = FindParentScope(subScope, compositeOperation);
     188        if(result != null) return result;
     189      }
     190      return null;
     191    }
     192
     193    private void MergeScope(IScope original, IScope result) {
     194      // merge the results
     195      original.Clear();
     196      foreach(IVariable variable in result.Variables) {
     197        original.AddVariable(variable);
     198      }
     199      foreach(IScope subScope in result.SubScopes) {
     200        original.AddSubScope(subScope);
     201      }
     202      foreach(KeyValuePair<string, string> alias in result.Aliases) {
     203        original.AddAlias(alias.Key, alias.Value);
    179204      }
    180205    }
  • branches/3.0/sources/HeuristicLab.DistributedEngine/HeuristicLab.DistributedEngine.csproj

    r30 r279  
    5050  <ItemGroup>
    5151    <Compile Include="HeuristicLabDistributedEnginePlugin.cs" />
     52    <Compile Include="JobManager.cs" />
    5253    <Compile Include="Properties\AssemblyInfo.cs" />
    5354    <Compile Include="DistributedEngine.cs" />
  • branches/3.0/sources/HeuristicLab.DistributedEngine/JobManager.cs

    r228 r279  
    1 using System;
     1#region License Information
     2/* HeuristicLab
     3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
     4 *
     5 * This file is part of HeuristicLab.
     6 *
     7 * HeuristicLab is free software: you can redistribute it and/or modify
     8 * it under the terms of the GNU General Public License as published by
     9 * the Free Software Foundation, either version 3 of the License, or
     10 * (at your option) any later version.
     11 *
     12 * HeuristicLab is distributed in the hope that it will be useful,
     13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
     14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     15 * GNU General Public License for more details.
     16 *
     17 * You should have received a copy of the GNU General Public License
     18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
     19 */
     20#endregion
     21
     22using System;
    223using System.Collections.Generic;
    324using System.Linq;
     
    1536    private IGridServer server;
    1637    private string address;
    17     private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>();
    18     private Dictionary<Guid, byte[]> runningEngines = new Dictionary<Guid, byte[]>();
     38    private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
    1939    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
    20     private object locker = new object();
     40    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
     41    private object connectionLock = new object();
     42    private object dictionaryLock = new object();
     43
    2144    private const int MAX_RESTARTS = 5;
    22     private Exception exception;
     45    private const int MAX_CONNECTION_RETRIES = 10;
     46    private const int RETRY_TIMEOUT_SEC = 10;
     47    private const int CHECK_RESULTS_TIMEOUT = 10;
     48
    2349    private ChannelFactory<IGridServer> factory;
    24     public Exception Exception {
    25       get { return exception; }
    26     }
    2750
    2851    public JobManager(string address) {
     
    3154
    3255    internal void Reset() {
    33       lock(locker) {
    34         ResetConnection();
     56      ResetConnection();
     57      lock(dictionaryLock) {
    3558        foreach(WaitHandle wh in waithandles.Values) wh.Close();
    3659        waithandles.Clear();
    37         engineOperations.Clear();
    38         runningEngines.Clear();
    39         exception = null;
     60        engines.Clear();
     61        results.Clear();
    4062      }
    4163    }
    4264
    4365    private void ResetConnection() {
    44       // open a new channel
    45       NetTcpBinding binding = new NetTcpBinding();
    46       binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
    47       binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
    48       binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
    49       binding.Security.Mode = SecurityMode.None;
    50       factory = new ChannelFactory<IGridServer>(binding);
    51       server = factory.CreateChannel(new EndpointAddress(address));
    52     }
    53 
    54     public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) {
    55       ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed?
     66      lock(connectionLock) {
     67        // open a new channel
     68        NetTcpBinding binding = new NetTcpBinding();
     69        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
     70        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
     71        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
     72        binding.Security.Mode = SecurityMode.None;
     73
     74        factory = new ChannelFactory<IGridServer>(binding);
     75        server = factory.CreateChannel(new EndpointAddress(address));
     76      }
     77    }
     78
     79    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
     80      ProcessingEngine engine = new ProcessingEngine(globalScope, operation);
     81      byte[] zippedEngine = ZipEngine(engine);
     82      Guid currentEngineGuid = Guid.Empty;
     83      bool success = false;
     84      int retryCount = 0;
     85      do {
     86        lock(connectionLock) {
     87          try {
     88            currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
     89            success = true;
     90          } catch(TimeoutException timeoutException) {
     91            if(retryCount < MAX_CONNECTION_RETRIES) {
     92              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     93              retryCount++;
     94            } else {
     95              throw new ApplicationException("Max retries reached.", timeoutException);
     96            }
     97          } catch(CommunicationException communicationException) {
     98            ResetConnection();
     99            // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution)
     100            if(retryCount < MAX_CONNECTION_RETRIES) {
     101              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     102              retryCount++;
     103            } else {
     104              throw new ApplicationException("Max retries reached.", communicationException);
     105            }
     106          }
     107        }
     108      } while(!success);
     109      lock(dictionaryLock) {
     110        engines[currentEngineGuid] = engine;
     111        waithandles[currentEngineGuid] = new ManualResetEvent(false);
     112      }
     113      ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
     114      return waithandles[currentEngineGuid];
     115    }
     116
     117    private byte[] ZipEngine(ProcessingEngine engine) {
    56118      MemoryStream memStream = new MemoryStream();
    57119      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
    58120      PersistenceManager.Save(engine, stream);
    59121      stream.Close();
    60       if(factory.State != CommunicationState.Opened)
    61         ResetConnection();
    62       Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
    63       lock(locker) {
    64         runningEngines[currentEngineGuid] = memStream.ToArray();
    65         engineOperations[currentEngineGuid] = operation;
    66         waithandles[currentEngineGuid] = new ManualResetEvent(false);
    67         ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
    68       }
    69       return waithandles[currentEngineGuid];
     122      byte[] zippedEngine = memStream.ToArray();
     123      memStream.Close();
     124      return zippedEngine;
     125    }
     126
     127    public IScope EndExecuteOperation(AtomicOperation operation) {
     128      byte[] zippedResult = null;
     129      lock(dictionaryLock) {
     130        zippedResult = results[operation];
     131        results.Remove(operation);
     132      }
     133      // restore the engine
     134      using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
     135        ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
     136        return resultEngine.InitialOperation.Scope;
     137      }     
    70138    }
    71139
     
    74142      int restartCounter = 0;
    75143      do {
    76         try {
    77           if(factory.State != CommunicationState.Opened) ResetConnection();
    78           byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100);
    79           if(resultXml != null) {
    80             // restore the engine
    81             GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress);
    82             ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
    83 
    84             // merge the results
    85             IScope oldScope = engineOperations[engineGuid].Scope;
    86             oldScope.Clear();
    87             foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) {
    88               oldScope.AddVariable(variable);
    89             }
    90             foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) {
    91               oldScope.AddSubScope(subScope);
    92             }
    93             foreach(KeyValuePair<string, string> alias in resultEngine.InitialOperation.Scope.Aliases) {
    94               oldScope.AddAlias(alias.Key, alias.Value);
    95             }
    96 
    97             lock(locker) {
    98               // signal the wait handle and clean up then return
    99               waithandles[engineGuid].Set();
    100               engineOperations.Remove(engineGuid);
    101               waithandles.Remove(engineGuid);
    102               runningEngines.Remove(engineGuid);
    103             }
    104             return;
    105           } else {
    106             // check if the server is still working on the job
    107             JobState jobState = server.JobState(engineGuid);
    108             if(jobState == JobState.Unkown) {
    109               // restart job
    110               byte[] packedEngine;
    111               lock(locker) {
    112                 packedEngine = runningEngines[engineGuid];
     144        Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
     145        byte[] zippedResult = null;
     146        lock(connectionLock) {
     147          bool success = false;
     148          int retries = 0;
     149          do {
     150            try {
     151              zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
     152              success = true;
     153            } catch(TimeoutException timeoutException) {
     154              success = false;
     155              retries++;
     156              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     157            } catch(CommunicationException communicationException) {
     158              ResetConnection();
     159              success = false;
     160              retries++;
     161              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     162            }
     163
     164          } while(!success && retries < MAX_CONNECTION_RETRIES);
     165        }
     166        if(zippedResult != null) {
     167          lock(dictionaryLock) {
     168            // store result
     169            results[engines[engineGuid].InitialOperation] = zippedResult;
     170
     171            // signal the wait handle and clean up then return
     172            engines.Remove(engineGuid);
     173            waithandles[engineGuid].Set();
     174            waithandles.Remove(engineGuid);
     175          }
     176          return;
     177        } else {
     178          // check if the server is still working on the job
     179          bool success = false;
     180          int retries = 0;
     181          JobState jobState = JobState.Unkown;
     182          do {
     183            try {
     184              lock(connectionLock) {
     185                jobState = server.JobState(engineGuid);
    113186              }
    114               server.BeginExecuteEngine(packedEngine);
    115               restartCounter++;
    116             }
     187              success = true;
     188            } catch(TimeoutException timeoutException) {
     189              retries++;
     190              success = false;
     191              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     192            } catch(CommunicationException communicationException) {
     193              ResetConnection();
     194              retries++;
     195              success = false;
     196              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     197            }
     198          } while(!success && retries < MAX_CONNECTION_RETRIES);
     199          if(jobState == JobState.Unkown) {
     200            // restart job
     201            ProcessingEngine engine;
     202            lock(dictionaryLock) {
     203              engine = engines[engineGuid];
     204            }
     205            byte[] zippedEngine = ZipEngine(engine);
     206            success = false;
     207            retries = 0;
     208            do {
     209              try {
     210                lock(connectionLock) {
     211                  server.BeginExecuteEngine(zippedEngine);
     212                }
     213                success = true;
     214              } catch(TimeoutException timeoutException) {
     215                success = false;
     216                retries++;
     217                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     218              } catch(CommunicationException communicationException) {
     219                ResetConnection();
     220                success = false;
     221                retries++;
     222                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     223              }
     224            } while(!success && retries < MAX_CONNECTION_RETRIES);
     225            restartCounter++;
    117226          }
    118         } catch(Exception e) {
    119           // catch all exceptions set an exception flag, signal the wait-handle and exit the routine
    120           this.exception = new Exception("There was a problem with parallel execution", e);
    121           waithandles[engineGuid].Set();
    122           return;
    123227        }
    124228
    125229        // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem
    126230        if(restartCounter > MAX_RESTARTS) {
    127           this.exception = new Exception("Maximal number of parallel job restarts reached");
    128           waithandles[engineGuid].Set();
    129           return;
    130         }
    131 
    132         Thread.Sleep(TimeSpan.FromSeconds(10.0));
     231          throw new ApplicationException("Maximum number of job restarts reached.");
     232        }
    133233      } while(true);
    134234    }
Note: See TracChangeset for help on using the changeset viewer.