Free cookie consent management tool by TermsFeed Policy Generator

Changeset 386


Ignore:
Timestamp:
07/21/08 16:40:49 (16 years ago)
Author:
gkronber
Message:

worked on #199

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Grid/JobManager.cs

    r383 r386  
    3131using System.IO.Compression;
    3232using System.Windows.Forms;
     33using System.Diagnostics;
    3334
    3435namespace HeuristicLab.Grid {
    3536  public class JobManager {
     37    private const int MAX_RESTARTS = 5;
     38    private const int MAX_CONNECTION_RETRIES = 10;
     39    private const int RETRY_TIMEOUT_SEC = 60;
     40    private const int CHECK_RESULTS_TIMEOUT = 3;
     41
    3642    private IGridServer server;
    3743    private string address;
     44    private object waitingQueueLock = new object();
     45    private Queue<ProcessingEngine> waitingEngines = new Queue<ProcessingEngine>();
     46    private object runningQueueLock = new object();
     47    private Queue<Guid> runningEngines = new Queue<Guid>();
     48
    3849    private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
    39     private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
     50    private Dictionary<ProcessingEngine, ManualResetEvent> waithandles = new Dictionary<ProcessingEngine, ManualResetEvent>();
    4051    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
     52    private Dictionary<ProcessingEngine, int> restarts = new Dictionary<ProcessingEngine, int>();
     53
    4154    private List<IOperation> erroredOperations = new List<IOperation>();
    4255    private object connectionLock = new object();
    4356    private object dictionaryLock = new object();
    4457
    45     private const int MAX_RESTARTS = 5;
    46     private const int MAX_CONNECTION_RETRIES = 10;
    47     private const int RETRY_TIMEOUT_SEC = 60;
    48     private const int CHECK_RESULTS_TIMEOUT = 10;
     58    private ManualResetEvent runningWaitHandle = new ManualResetEvent(false);
     59    private ManualResetEvent waitingWaitHandle = new ManualResetEvent(false);
    4960
    5061    private ChannelFactory<IGridServer> factory;
     
    5263    public JobManager(string address) {
    5364      this.address = address;
     65      Thread starterThread = new Thread(StartEngines);
     66      Thread resultsGatheringThread = new Thread(GetResults);
     67      starterThread.Start();
     68      resultsGatheringThread.Start();
    5469    }
    5570
     
    6277        results.Clear();
    6378        erroredOperations.Clear();
     79        runningEngines.Clear();
     80        waitingEngines.Clear();
     81        restarts.Clear();
    6482      }
    6583    }
     
    7997    }
    8098
     99    public void StartEngines() {
     100      try {
     101        while(true) {
     102          bool enginesWaiting = false;
     103          lock(waitingQueueLock) {
     104            enginesWaiting = waitingEngines.Count > 0;
     105          }
     106          if(enginesWaiting) {
     107            ProcessingEngine engine;
     108            lock(waitingQueueLock) {
     109              engine = waitingEngines.Dequeue();
     110            }
     111            int nRestarts = 0;
     112            lock(dictionaryLock) {
     113              if(restarts.ContainsKey(engine)) {
     114                nRestarts = restarts[engine];
     115                restarts[engine] = nRestarts + 1;
     116              } else {
     117                restarts[engine] = 0;
     118              }
     119            }
     120            if(nRestarts < MAX_RESTARTS) {
     121              byte[] zippedEngine = ZipEngine(engine);
     122              Guid currentEngineGuid = Guid.Empty;
     123              bool success = false;
     124              int retryCount = 0;
     125              do {
     126                try {
     127                  lock(connectionLock) {
     128                    currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
     129                  }
     130                  lock(dictionaryLock) {
     131                    engines[currentEngineGuid] = engine;
     132                  }
     133                  lock(runningQueueLock) {
     134                    runningEngines.Enqueue(currentEngineGuid);
     135                  }
     136
     137                  success = true;
     138                } catch(TimeoutException timeoutException) {
     139                  if(retryCount++ >= MAX_CONNECTION_RETRIES) {
     140                    //                  throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException);
     141                    lock(waitingQueueLock) {
     142                      waitingEngines.Enqueue(engine);
     143                    }
     144                    success = true;
     145                  }
     146                  Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     147                } catch(CommunicationException communicationException) {
     148                  if(retryCount++ >= MAX_CONNECTION_RETRIES) {
     149                    //                  throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException);
     150                    lock(waitingQueueLock) {
     151                      waitingEngines.Enqueue(engine);
     152                    }
     153                    success = true;
     154                  }
     155                  ResetConnection();
     156                  Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     157                }
     158              } while(!success); // connection attempts
     159            } // restarts
     160            else {
     161              lock(dictionaryLock) {
     162                erroredOperations.Add(engine.InitialOperation);
     163                restarts.Remove(engine);
     164                Debug.Assert(!engines.ContainsValue(engine));
     165                //// clean up and signal the wait handle then return
     166                waithandles[engine].Set();
     167                waithandles.Remove(engine);
     168              }
     169            }
     170          } else {
     171            // no engines are waiting
     172            waitingWaitHandle.WaitOne();
     173            waitingWaitHandle.Reset();
     174          }
     175        }
     176      } finally {
     177        Debug.Assert(false);  // make sure that we are notified when this thread is stopped in debugging
     178      }
     179    }
     180
     181    public void GetResults() {
     182      try {
     183        while(true) {
     184          Guid engineGuid = Guid.Empty;
     185          lock(runningQueueLock) {
     186            if(runningEngines.Count > 0) engineGuid = runningEngines.Dequeue();
     187          }
     188
     189          if(engineGuid != Guid.Empty) {
     190            Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
     191            byte[] zippedResult = TryEndExecuteEngine(server, engineGuid);
     192            if(zippedResult != null) { // successful
     193              lock(dictionaryLock) {
     194                ProcessingEngine engine = engines[engineGuid];
     195                engines.Remove(engineGuid);
     196                restarts.Remove(engine);
     197                // store result
     198                results[engine.InitialOperation] = zippedResult;
     199                // clean up and signal the wait handle then return
     200                waithandles[engine].Set();
     201                waithandles.Remove(engine);
     202              }
     203            } else {
     204              // there was a problem -> check the state of the job and restart if necessary
     205              JobState jobState = TryGetJobState(server, engineGuid);
     206              if(jobState == JobState.Unkown) {
     207                lock(waitingQueueLock) {
     208                  ProcessingEngine engine = engines[engineGuid];
     209                  engines.Remove(engineGuid);
     210                  waitingEngines.Enqueue(engine);
     211                  waitingWaitHandle.Set();
     212                }
     213              } else {
     214                // job still active at the server
     215                lock(runningQueueLock) {
     216                  runningEngines.Enqueue(engineGuid);
     217                }
     218              }
     219            }
     220          } else {
     221            // no running engines
     222            runningWaitHandle.WaitOne();
     223            runningWaitHandle.Reset();
     224          }
     225        }
     226      } finally {
     227        Debug.Assert(false); // just to make sure that I get notified when debugging whenever this thread is killed somehow
     228      }
     229    }
     230
    81231    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
    82232      ProcessingEngine engine = new ProcessingEngine(globalScope, operation);
    83       byte[] zippedEngine = ZipEngine(engine);
    84       Guid currentEngineGuid = Guid.Empty;
    85       bool success = false;
    86       int retryCount = 0;
    87       do {
    88         try {
    89           lock(connectionLock) {
    90             currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
    91           }
    92           success = true;
    93         } catch(TimeoutException timeoutException) {
    94           if(retryCount++ >= MAX_CONNECTION_RETRIES) {
    95             throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException);
    96           }
    97           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    98         } catch(CommunicationException communicationException) {
    99           if(retryCount++ >= MAX_CONNECTION_RETRIES) {
    100             throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException);
    101           }
    102           ResetConnection();
    103           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    104         }
    105       } while(!success);
    106       lock(dictionaryLock) {
    107         engines[currentEngineGuid] = engine;
    108         waithandles[currentEngineGuid] = new ManualResetEvent(false);
    109       }
    110       ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
    111       return waithandles[currentEngineGuid];
     233      waithandles[engine] = new ManualResetEvent(false);
     234      lock(waitingQueueLock) {
     235        waitingEngines.Enqueue(engine);
     236      }
     237      waitingWaitHandle.Set();
     238      return waithandles[engine];
    112239    }
    113240
     
    139266    }
    140267
    141     private void TryGetResult(object state) {
    142       Guid engineGuid = (Guid)state;
    143       int restartCounter = 0;
    144       do {
    145         Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
    146         byte[] zippedResult = TryEndExecuteEngine(server, engineGuid);
    147         if(zippedResult != null) { // successful
    148           lock(dictionaryLock) {
    149             // store result
    150             results[engines[engineGuid].InitialOperation] = zippedResult;
    151             // clean up and signal the wait handle then return
    152             engines.Remove(engineGuid);
    153             waithandles[engineGuid].Set();
    154             waithandles.Remove(engineGuid);
    155           }
    156           return;
    157         } else {
    158           // there was a problem -> check the state of the job and restart if necessary
    159           JobState jobState = TryGetJobState(server, engineGuid);
    160           if(jobState == JobState.Unkown) {
    161             TryRestartJob(engineGuid);
    162             restartCounter++;
    163           }
    164         }
    165       } while(restartCounter < MAX_RESTARTS);
    166       lock(dictionaryLock) {
    167         // the job was never finished and restarting didn't help -> stop trying to execute the job and
    168         // save the faulted operation in a list to throw an exception when EndExecuteEngine is called.
    169         erroredOperations.Add(engines[engineGuid].InitialOperation);
    170         // clean up and signal the wait handle
    171         engines.Remove(engineGuid);
    172         waithandles[engineGuid].Set();
    173         waithandles.Remove(engineGuid);
    174       }
    175     }
    176 
    177     private void TryRestartJob(Guid engineGuid) {
    178       // restart job
    179       ProcessingEngine engine;
    180       lock(dictionaryLock) {
    181         engine = engines[engineGuid];
    182       }
    183       byte[] zippedEngine = ZipEngine(engine);
    184       int retries = 0;
    185       do {
    186         try {
    187           lock(connectionLock) {
    188             server.BeginExecuteEngine(zippedEngine);
    189           }
    190           return;
    191         } catch(TimeoutException) {
    192           retries++;
    193           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    194         } catch(CommunicationException) {
    195           ResetConnection();
    196           retries++;
    197           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    198         }
    199       } while(retries < MAX_CONNECTION_RETRIES);
    200     }
    201 
    202268    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
    203269      int retries = 0;
Note: See TracChangeset for help on using the changeset viewer.