Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
07/22/08 21:15:56 (16 years ago)
Author:
gkronber
Message:

improved JobManager code and fixed a typo in the JobState enum. (ticket #188)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Grid/JobManager.cs

    r387 r391  
    3434
    3535namespace HeuristicLab.Grid {
     36  public class JobExecutionException : ApplicationException {
     37    public JobExecutionException(string msg) : base(msg) { }
     38  }
     39
    3640  public class JobManager {
    3741    private const int MAX_RESTARTS = 5;
     
    4044    private const int CHECK_RESULTS_TIMEOUT = 3;
    4145
     46    private class Job {
     47      public Guid guid;
     48      public ProcessingEngine engine;
     49      public ManualResetEvent waitHandle;
     50      public int restarts;
     51    }
     52
    4253    private IGridServer server;
    4354    private string address;
    4455    private object waitingQueueLock = new object();
    45     private Queue<ProcessingEngine> waitingEngines = new Queue<ProcessingEngine>();
     56    private Queue<Job> waitingJobs = new Queue<Job>();
    4657    private object runningQueueLock = new object();
    47     private Queue<Guid> runningEngines = new Queue<Guid>();
    48 
    49     private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
    50     private Dictionary<ProcessingEngine, ManualResetEvent> waithandles = new Dictionary<ProcessingEngine, ManualResetEvent>();
     58    private Queue<Job> runningJobs = new Queue<Job>();
    5159    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
    52     private Dictionary<ProcessingEngine, int> restarts = new Dictionary<ProcessingEngine, int>();
    5360
    5461    private List<IOperation> erroredOperations = new List<IOperation>();
     
    7279      ResetConnection();
    7380      lock(dictionaryLock) {
    74         foreach(WaitHandle wh in waithandles.Values) wh.Close();
    75         waithandles.Clear();
    76         engines.Clear();
     81        foreach(Job j in waitingJobs) {
     82          j.waitHandle.Close();
     83        }
     84        waitingJobs.Clear();
     85        foreach(Job j in runningJobs) {
     86          j.waitHandle.Close();
     87        }
     88        runningJobs.Clear();
    7789        results.Clear();
    7890        erroredOperations.Clear();
    79         runningEngines.Clear();
    80         waitingEngines.Clear();
    81         restarts.Clear();
    8291      }
    8392    }
     
    100109      try {
    101110        while(true) {
    102           bool enginesWaiting = false;
     111          Job job = null;
    103112          lock(waitingQueueLock) {
    104             enginesWaiting = waitingEngines.Count > 0;
    105           }
    106           if(enginesWaiting) {
    107             ProcessingEngine engine;
    108             lock(waitingQueueLock) {
    109               engine = waitingEngines.Dequeue();
     113            if(waitingJobs.Count > 0) job = waitingJobs.Dequeue();
     114          }
     115          if(job==null) waitingWaitHandle.WaitOne(); // no jobs waiting
     116          else {
     117            Guid currentEngineGuid = TryStartExecuteEngine(job.engine);
     118            if(currentEngineGuid == Guid.Empty) {
     119              // couldn't start the job -> requeue
     120              if(job.restarts < MAX_RESTARTS) {
     121                job.restarts++;
     122                lock(waitingQueueLock) waitingJobs.Enqueue(job);
     123                waitingWaitHandle.Set();
     124              } else {
     125                // max restart count reached -> give up on this job and flag error
     126                lock(dictionaryLock) {
     127                  erroredOperations.Add(job.engine.InitialOperation);
     128                  job.waitHandle.Set();
     129                }
     130              }
     131            } else {
     132              // job started successfully
     133              job.guid = currentEngineGuid;
     134              lock(runningQueueLock) {
     135                runningJobs.Enqueue(job);
     136                runningWaitHandle.Set();
     137              }
    110138            }
    111             int nRestarts = 0;
    112             lock(dictionaryLock) {
    113               if(restarts.ContainsKey(engine)) {
    114                 nRestarts = restarts[engine];
    115                 restarts[engine] = nRestarts + 1;
    116               } else {
    117                 restarts[engine] = 0;
    118               }
    119             }
    120             if(nRestarts < MAX_RESTARTS) {
    121               byte[] zippedEngine = ZipEngine(engine);
    122               Guid currentEngineGuid = Guid.Empty;
    123               bool success = false;
    124               int retryCount = 0;
    125               do {
    126                 try {
    127                   lock(connectionLock) {
    128                     currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
    129                   }
    130                   lock(dictionaryLock) {
    131                     engines[currentEngineGuid] = engine;
    132                   }
    133                   lock(runningQueueLock) {
    134                     runningEngines.Enqueue(currentEngineGuid);
    135                   }
    136 
    137                   success = true;
    138                 } catch(TimeoutException timeoutException) {
    139                   if(retryCount++ >= MAX_CONNECTION_RETRIES) {
    140                     //                  throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException);
    141                     lock(waitingQueueLock) {
    142                       waitingEngines.Enqueue(engine);
    143                     }
    144                     success = true;
    145                   }
    146                   Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    147                 } catch(CommunicationException communicationException) {
    148                   if(retryCount++ >= MAX_CONNECTION_RETRIES) {
    149                     //                  throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException);
    150                     lock(waitingQueueLock) {
    151                       waitingEngines.Enqueue(engine);
    152                     }
    153                     success = true;
    154                   }
    155                   ResetConnection();
    156                   Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    157                 }
    158               } while(!success); // connection attempts
    159             } // restarts
    160             else {
    161               lock(dictionaryLock) {
    162                 erroredOperations.Add(engine.InitialOperation);
    163                 restarts.Remove(engine);
    164                 Debug.Assert(!engines.ContainsValue(engine));
    165                 //// clean up and signal the wait handle then return
    166                 waithandles[engine].Set();
    167                 waithandles.Remove(engine);
    168               }
    169             }
    170           } else {
    171             // no engines are waiting
    172             waitingWaitHandle.WaitOne();
    173139          }
    174140        }
     
    177143      }
    178144    }
     145
    179146
    180147    public void GetResults() {
    181148      try {
    182149        while(true) {
    183           Guid engineGuid = Guid.Empty;
     150          Job job = null;
    184151          lock(runningQueueLock) {
    185             if(runningEngines.Count > 0) engineGuid = runningEngines.Dequeue();
    186           }
    187 
    188           if(engineGuid != Guid.Empty) {
    189             Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
    190             byte[] zippedResult = TryEndExecuteEngine(server, engineGuid);
     152            if(runningJobs.Count > 0) job = runningJobs.Dequeue();
     153          }
     154          if(job == null) runningWaitHandle.WaitOne(); // no jobs running
     155          else {
     156            byte[] zippedResult = TryEndExecuteEngine(server, job.guid);
    191157            if(zippedResult != null) { // successful
    192158              lock(dictionaryLock) {
    193                 ProcessingEngine engine = engines[engineGuid];
    194                 engines.Remove(engineGuid);
    195                 restarts.Remove(engine);
    196159                // store result
    197                 results[engine.InitialOperation] = zippedResult;
    198                 // clean up and signal the wait handle then return
    199                 waithandles[engine].Set();
    200                 waithandles.Remove(engine);
     160                results[job.engine.InitialOperation] = zippedResult;
     161                // notify consumer that result is ready
     162                job.waitHandle.Set();
    201163              }
    202164            } else {
    203165              // there was a problem -> check the state of the job and restart if necessary
    204               JobState jobState = TryGetJobState(server, engineGuid);
    205               if(jobState == JobState.Unkown) {
     166              JobState jobState = TryGetJobState(server, job.guid);
     167              if(jobState == JobState.Unknown) {
     168                job.restarts++;
    206169                lock(waitingQueueLock) {
    207                   ProcessingEngine engine = engines[engineGuid];
    208                   engines.Remove(engineGuid);
    209                   waitingEngines.Enqueue(engine);
     170                  waitingJobs.Enqueue(job);
    210171                  waitingWaitHandle.Set();
    211172                }
     
    213174                // job still active at the server
    214175                lock(runningQueueLock) {
    215                   runningEngines.Enqueue(engineGuid);
     176                  runningJobs.Enqueue(job);
     177                  runningWaitHandle.Set();
    216178                }
    217179              }
    218180            }
    219           } else {
    220             // no running engines
    221             runningWaitHandle.WaitOne();
    222181          }
    223182        }
     
    228187
    229188    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
    230       ProcessingEngine engine = new ProcessingEngine(globalScope, operation);
    231       waithandles[engine] = new ManualResetEvent(false);
     189      Job job = new Job();
     190      job.engine = new ProcessingEngine(globalScope, operation);
     191      job.waitHandle = new ManualResetEvent(false);
     192      job.restarts = 0;
    232193      lock(waitingQueueLock) {
    233         waitingEngines.Enqueue(engine);
     194        waitingJobs.Enqueue(job);
    234195      }
    235196      waitingWaitHandle.Set();
    236       return waithandles[engine];
     197      return job.waitHandle;
    237198    }
    238199
     
    250211      if(erroredOperations.Contains(operation)) {
    251212        erroredOperations.Remove(operation);
    252         throw new ApplicationException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
     213        throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
    253214      } else {
    254215        byte[] zippedResult = null;
     
    262223        }
    263224      }
     225    }
     226
     227    private Guid TryStartExecuteEngine(ProcessingEngine engine) {
     228      byte[] zippedEngine = ZipEngine(engine);
     229      int retries = 0;
     230      Guid guid = Guid.Empty;
     231      do {
     232        try {
     233          lock(connectionLock) {
     234            guid = server.BeginExecuteEngine(zippedEngine);
     235          }
     236          return guid;
     237        } catch(TimeoutException) {
     238          retries++;
     239          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     240        } catch(CommunicationException) {
     241          ResetConnection();
     242          retries++;
     243          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
     244        }
     245      } while(retries < MAX_CONNECTION_RETRIES);
     246      return Guid.Empty;
    264247    }
    265248
     
    302285        }
    303286      } while(retries < MAX_CONNECTION_RETRIES);
    304       return JobState.Unkown;
     287      return JobState.Unknown;
    305288    }
    306289  }
Note: See TracChangeset for help on using the changeset viewer.