Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/16/08 17:36:29 (17 years ago)
Author:
gkronber
Message:

improved code for distributed-engine job-manager. Also removed the throwing of an uncaught exception in a thread-pool thread.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs

    r281 r315  
    3939    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
    4040    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
     41    private List<IOperation> erroredOperations = new List<IOperation>();
    4142    private object connectionLock = new object();
    4243    private object dictionaryLock = new object();
     
    4445    private const int MAX_RESTARTS = 5;
    4546    private const int MAX_CONNECTION_RETRIES = 10;
    46     private const int RETRY_TIMEOUT_SEC = 10;
     47    private const int RETRY_TIMEOUT_SEC = 60;
    4748    private const int CHECK_RESULTS_TIMEOUT = 10;
    4849
     
    6061        engines.Clear();
    6162        results.Clear();
     63        erroredOperations.Clear();
    6264      }
    6365    }
     
    8486      int retryCount = 0;
    8587      do {
    86         lock(connectionLock) {
    87           try {
     88        try {
     89          lock(connectionLock) {
    8890            currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
    89             success = true;
    90           } catch(TimeoutException timeoutException) {
    91             if(retryCount < MAX_CONNECTION_RETRIES) {
    92               Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    93               retryCount++;
    94             } else {
    95               throw new ApplicationException("Max retries reached.", timeoutException);
    96             }
    97           } catch(CommunicationException communicationException) {
    98             ResetConnection();
    99             // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution)
    100             if(retryCount < MAX_CONNECTION_RETRIES) {
    101               Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    102               retryCount++;
    103             } else {
    104               throw new ApplicationException("Max retries reached.", communicationException);
    105             }
    106           }
     91          }
     92          success = true;
     93        } catch(TimeoutException timeoutException) {
     94          if(retryCount++ >= MAX_CONNECTION_RETRIES) {
     95            throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException);
     96          }
     97          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     98        } catch(CommunicationException communicationException) {
     99          if(retryCount++ >= MAX_CONNECTION_RETRIES) {
     100            throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException);
     101          }
     102          ResetConnection();
     103          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    107104        }
    108105      } while(!success);
     
    126123
    127124    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
    128       byte[] zippedResult = null;
    129       lock(dictionaryLock) {
    130         zippedResult = results[operation];
    131         results.Remove(operation);
    132       }
    133       // restore the engine
    134       using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
    135         return (ProcessingEngine)PersistenceManager.Load(stream);
    136       }     
     125      if(erroredOperations.Contains(operation)) {
     126        erroredOperations.Remove(operation);
     127        throw new ApplicationException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
     128      } else {
     129        byte[] zippedResult = null;
     130        lock(dictionaryLock) {
     131          zippedResult = results[operation];
     132          results.Remove(operation);
     133        }
     134        // restore the engine
     135        using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
     136          return (ProcessingEngine)PersistenceManager.Load(stream);
     137        }
     138      }
    137139    }
    138140
     
    142144      do {
    143145        Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
    144         byte[] zippedResult = null;
    145         lock(connectionLock) {
    146           bool success = false;
    147           int retries = 0;
    148           do {
    149             try {
    150               zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
    151               success = true;
    152             } catch(TimeoutException timeoutException) {
    153               success = false;
    154               retries++;
    155               Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    156             } catch(CommunicationException communicationException) {
    157               ResetConnection();
    158               success = false;
    159               retries++;
    160               Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    161             }
    162 
    163           } while(!success && retries < MAX_CONNECTION_RETRIES);
    164         }
    165         if(zippedResult != null) {
     146        byte[] zippedResult = TryEndExecuteEngine(server, engineGuid);
     147        if(zippedResult != null) { // successful
    166148          lock(dictionaryLock) {
    167149            // store result
    168150            results[engines[engineGuid].InitialOperation] = zippedResult;
    169 
    170             // signal the wait handle and clean up then return
     151            // clean up and signal the wait handle then return
    171152            engines.Remove(engineGuid);
    172153            waithandles[engineGuid].Set();
     
    175156          return;
    176157        } else {
    177           // check if the server is still working on the job
    178           bool success = false;
    179           int retries = 0;
    180           JobState jobState = JobState.Unkown;
    181           do {
    182             try {
    183               lock(connectionLock) {
    184                 jobState = server.JobState(engineGuid);
    185               }
    186               success = true;
    187             } catch(TimeoutException timeoutException) {
    188               retries++;
    189               success = false;
    190               Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    191             } catch(CommunicationException communicationException) {
    192               ResetConnection();
    193               retries++;
    194               success = false;
    195               Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    196             }
    197           } while(!success && retries < MAX_CONNECTION_RETRIES);
     158          // there was a problem -> check the state of the job and restart if necessary
     159          JobState jobState = TryGetJobState(server, engineGuid);
    198160          if(jobState == JobState.Unkown) {
    199             // restart job
    200             ProcessingEngine engine;
    201             lock(dictionaryLock) {
    202               engine = engines[engineGuid];
    203             }
    204             byte[] zippedEngine = ZipEngine(engine);
    205             success = false;
    206             retries = 0;
    207             do {
    208               try {
    209                 lock(connectionLock) {
    210                   server.BeginExecuteEngine(zippedEngine);
    211                 }
    212                 success = true;
    213               } catch(TimeoutException timeoutException) {
    214                 success = false;
    215                 retries++;
    216                 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    217               } catch(CommunicationException communicationException) {
    218                 ResetConnection();
    219                 success = false;
    220                 retries++;
    221                 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    222               }
    223             } while(!success && retries < MAX_CONNECTION_RETRIES);
     161            TryRestartJob(engineGuid);
    224162            restartCounter++;
    225163          }
    226164        }
    227 
    228         // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem
    229         if(restartCounter > MAX_RESTARTS) {
    230           throw new ApplicationException("Maximum number of job restarts reached.");
    231         }
    232       } while(true);
     165      } while(restartCounter < MAX_RESTARTS);
     166      lock(dictionaryLock) {
     167        // the job was never finished and restarting didn't help -> stop trying to execute the job and
     168        // save the faulted operation in a list to throw an exception when EndExecuteEngine is called.
     169        erroredOperations.Add(engines[engineGuid].InitialOperation);
     170        // clean up and signal the wait handle
     171        engines.Remove(engineGuid);
     172        waithandles[engineGuid].Set();
     173        waithandles.Remove(engineGuid);
     174      }
     175    }
     176
     177    private void TryRestartJob(Guid engineGuid) {
     178      // restart job
     179      ProcessingEngine engine;
     180      lock(dictionaryLock) {
     181        engine = engines[engineGuid];
     182      }
     183      byte[] zippedEngine = ZipEngine(engine);
     184      int retries = 0;
     185      do {
     186        try {
     187          lock(connectionLock) {
     188            server.BeginExecuteEngine(zippedEngine);
     189          }
     190          return;
     191        } catch(TimeoutException timeoutException) {
     192          retries++;
     193          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     194        } catch(CommunicationException communicationException) {
     195          ResetConnection();
     196          retries++;
     197          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     198        }
     199      } while(retries < MAX_CONNECTION_RETRIES);
     200    }
     201
     202    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
     203      int retries = 0;
     204      do {
     205        try {
     206          lock(connectionLock) {
     207            byte[] zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
     208            return zippedResult;
     209          }
     210        } catch(TimeoutException timeoutException) {
     211          retries++;
     212          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     213        } catch(CommunicationException communicationException) {
     214          ResetConnection();
     215          retries++;
     216          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     217        }
     218      } while(retries < MAX_CONNECTION_RETRIES);
     219      return null;
     220    }
     221
     222    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
     223      // check if the server is still working on the job
     224      int retries = 0;
     225      do {
     226        try {
     227          lock(connectionLock) {
     228            JobState jobState = server.JobState(engineGuid);
     229            return jobState;
     230          }
     231        } catch(TimeoutException timeoutException) {
     232          retries++;
     233          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     234        } catch(CommunicationException communicationException) {
     235          ResetConnection();
     236          retries++;
     237          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     238        }
     239      } while(retries < MAX_CONNECTION_RETRIES);
     240      return JobState.Unkown;
     241     
    233242    }
    234243  }
Note: See TracChangeset for help on using the changeset viewer.