Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/17/09 18:07:15 (15 years ago)
Author:
gkronber
Message:

Refactored JobManager and DistributedEngine to fix bugs in the GridExecuter. #644.

Location:
trunk/sources/HeuristicLab.Grid/3.2
Files:
1 added
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Grid/3.2/HeuristicLab.Grid-3.2.csproj

    r1534 r2055  
    9898      <DependentUpon>ClientForm.cs</DependentUpon>
    9999    </Compile>
     100    <Compile Include="AsyncGridResult.cs" />
    100101    <Compile Include="Database.cs" />
    101102    <Compile Include="EngineRunner.cs" />
  • trunk/sources/HeuristicLab.Grid/3.2/IGridServer.cs

    r1529 r2055  
    2727namespace HeuristicLab.Grid {
    2828  public enum JobState {
    29     Unknown,
     29    Unknown = 0, // default value
    3030    Waiting,
    3131    Busy,
  • trunk/sources/HeuristicLab.Grid/3.2/JobManager.cs

    r1529 r2055  
    4343    private const int RESULT_POLLING_TIMEOUT = 5;
    4444
    45     private class Job {
    46       public Guid guid;
    47       public ProcessingEngine engine;
    48       public ManualResetEvent waitHandle;
    49       public int restarts;
    50     }
    51 
    5245    private IGridServer server;
    5346    private string address;
    5447    private object waitingQueueLock = new object();
    55     private Queue<Job> waitingJobs = new Queue<Job>();
     48    private Queue<AsyncGridResult> waitingJobs = new Queue<AsyncGridResult>();
    5649    private object runningQueueLock = new object();
    57     private Queue<Job> runningJobs = new Queue<Job>();
    58     private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
    59 
    60     private List<IOperation> erroredOperations = new List<IOperation>();
     50    private Queue<AsyncGridResult> runningJobs = new Queue<AsyncGridResult>();
    6151    private object connectionLock = new object();
    62     private object dictionaryLock = new object();
    6352
    6453    private AutoResetEvent runningWaitHandle = new AutoResetEvent(false);
     
    7766    public void Reset() {
    7867      ResetConnection();
    79       lock(dictionaryLock) {
    80         foreach(Job j in waitingJobs) {
    81           j.waitHandle.Close();
     68      lock (waitingQueueLock) {
     69        foreach (AsyncGridResult r in waitingJobs) {
     70          r.WaitHandle.Close();
    8271        }
    8372        waitingJobs.Clear();
    84         foreach(Job j in runningJobs) {
    85           j.waitHandle.Close();
     73      }
     74      lock (runningQueueLock) {
     75        foreach (AsyncGridResult r in runningJobs) {
     76          r.WaitHandle.Close();
    8677        }
    8778        runningJobs.Clear();
    88         results.Clear();
    89         erroredOperations.Clear();
    9079      }
    9180    }
     
    9382    private void ResetConnection() {
    9483      Trace.TraceInformation("Reset connection in JobManager");
    95       lock(connectionLock) {
     84      lock (connectionLock) {
    9685        // open a new channel
    9786        NetTcpBinding binding = new NetTcpBinding();
     
    10796    public void StartEngines() {
    10897      try {
    109         while(true) {
    110           Job job = null;
    111           lock(waitingQueueLock) {
    112             if(waitingJobs.Count > 0) job = waitingJobs.Dequeue();
    113           }
    114           if(job==null) waitingWaitHandle.WaitOne(); // no jobs waiting
     98        while (true) {
     99          AsyncGridResult job = null;
     100          lock (waitingQueueLock) {
     101            if (waitingJobs.Count > 0) job = waitingJobs.Dequeue();
     102          }
     103          if (job == null) waitingWaitHandle.WaitOne(); // no jobs waiting
    115104          else {
    116             Guid currentEngineGuid = TryStartExecuteEngine(job.engine);
    117             if(currentEngineGuid == Guid.Empty) {
     105            Guid currentEngineGuid = TryStartExecuteEngine(job.Engine);
     106            if (currentEngineGuid == Guid.Empty) {
    118107              // couldn't start the job -> requeue
    119               if(job.restarts < MAX_RESTARTS) {
    120                 job.restarts++;
    121                 lock(waitingQueueLock) waitingJobs.Enqueue(job);
     108              if (job.Restarts < MAX_RESTARTS) {
     109                job.Restarts++;
     110                lock (waitingQueueLock) waitingJobs.Enqueue(job);
    122111                waitingWaitHandle.Set();
    123112              } else {
    124113                // max restart count reached -> give up on this job and flag error
    125                 lock(dictionaryLock) {
    126                   erroredOperations.Add(job.engine.InitialOperation);
    127                   job.waitHandle.Set();
    128                 }
     114                job.Aborted = true;
     115                job.SignalFinished();
    129116              }
    130117            } else {
    131118              // job started successfully
    132               job.guid = currentEngineGuid;
    133               lock(runningQueueLock) {
     119              job.Guid = currentEngineGuid;
     120              lock (runningQueueLock) {
    134121                runningJobs.Enqueue(job);
    135122                runningWaitHandle.Set();
     
    138125          }
    139126        }
    140       } catch(Exception e) {
    141         Trace.TraceError("Exception "+e+" in JobManager.StartEngines() killed the start-engine thread\n"+e.StackTrace);
     127      }
     128      catch (Exception e) {
     129        Trace.TraceError("Exception " + e + " in JobManager.StartEngines() killed the start-engine thread\n" + e.StackTrace);
    142130      }
    143131    }
     
    146134    public void GetResults() {
    147135      try {
    148         while(true) {
    149           Job job = null;
    150           lock(runningQueueLock) {
    151             if(runningJobs.Count > 0) job = runningJobs.Dequeue();
    152           }
    153           if(job == null) runningWaitHandle.WaitOne(); // no jobs running
     136        while (true) {
     137          AsyncGridResult job = null;
     138          lock (runningQueueLock) {
     139            if (runningJobs.Count > 0) job = runningJobs.Dequeue();
     140          }
     141          if (job == null) runningWaitHandle.WaitOne(); // no jobs running
    154142          else {
    155             byte[] zippedResult = TryEndExecuteEngine(server, job.guid);
    156             if(zippedResult != null) { // successful
    157               lock(dictionaryLock) {
    158                 // store result
    159                 results[job.engine.InitialOperation] = zippedResult;
    160                 // notify consumer that result is ready
    161                 job.waitHandle.Set();
    162               }
     143            byte[] zippedResult = TryEndExecuteEngine(server, job.Guid);
     144            if (zippedResult != null) {
     145              // successful => store result
     146              job.ZippedResult = zippedResult;
     147              // notify consumer that result is ready
     148              job.SignalFinished();
    163149            } else {
    164150              // there was a problem -> check the state of the job and restart if necessary
    165               JobState jobState = TryGetJobState(server, job.guid);
    166               if(jobState == JobState.Unknown) {
    167                 job.restarts++;
    168                 lock(waitingQueueLock) {
     151              JobState jobState = TryGetJobState(server, job.Guid);
     152              if (jobState == JobState.Unknown) {
     153                job.Restarts++;
     154                lock (waitingQueueLock) {
    169155                  waitingJobs.Enqueue(job);
    170156                  waitingWaitHandle.Set();
     
    172158              } else {
    173159                // job still active at the server
    174                 lock(runningQueueLock) {
     160                lock (runningQueueLock) {
    175161                  runningJobs.Enqueue(job);
    176162                  runningWaitHandle.Set();
     
    181167          }
    182168        }
    183       } catch(Exception e) {
    184         Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n"+ e.StackTrace);
    185       }
    186     }
    187 
    188     public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
    189       return BeginExecuteEngine(new ProcessingEngine(globalScope, operation));
    190     }
    191 
    192     public WaitHandle BeginExecuteEngine(ProcessingEngine engine) {
    193       Job job = new Job();
    194       job.engine = engine;
    195       job.waitHandle = new ManualResetEvent(false);
    196       job.restarts = 0;
    197       lock(waitingQueueLock) {
    198         waitingJobs.Enqueue(job);
     169      }
     170      catch (Exception e) {
     171        Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n" + e.StackTrace);
     172      }
     173    }
     174
     175    public AsyncGridResult BeginExecuteEngine(ProcessingEngine engine) {
     176      AsyncGridResult asyncResult = new AsyncGridResult(engine);
     177      asyncResult.Engine = engine;
     178      lock (waitingQueueLock) {
     179        waitingJobs.Enqueue(asyncResult);
    199180      }
    200181      waitingWaitHandle.Set();
    201       return job.waitHandle;
    202     }
    203 
    204     private byte[] ZipEngine(ProcessingEngine engine) {
     182      return asyncResult;
     183    }
     184
     185    private byte[] ZipEngine(IEngine engine) {
    205186      return PersistenceManager.SaveToGZip(engine);
    206187    }
    207188
    208     public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
    209       if(erroredOperations.Contains(operation)) {
    210         erroredOperations.Remove(operation);
     189    public IEngine EndExecuteEngine(AsyncGridResult asyncResult) {
     190      if (asyncResult.Aborted) {
    211191        throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
    212192      } else {
    213         byte[] zippedResult = null;
    214         lock(dictionaryLock) {
    215           zippedResult = results[operation];
    216           results.Remove(operation);
    217         }
    218193        // restore the engine
    219         return (ProcessingEngine)PersistenceManager.RestoreFromGZip(zippedResult);
    220       }
    221     }
    222 
    223     private Guid TryStartExecuteEngine(ProcessingEngine engine) {
     194        return (IEngine)PersistenceManager.RestoreFromGZip(asyncResult.ZippedResult);
     195      }
     196    }
     197
     198    private Guid TryStartExecuteEngine(IEngine engine) {
    224199      byte[] zippedEngine = ZipEngine(engine);
     200      return SavelyExecute(() => server.BeginExecuteEngine(zippedEngine));
     201    }
     202
     203    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
     204      return SavelyExecute(() => {
     205        byte[] zippedResult = server.TryEndExecuteEngine(engineGuid);
     206        return zippedResult;
     207      });
     208    }
     209
     210    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
     211      return SavelyExecute(() => server.JobState(engineGuid));
     212    }
     213
     214    private TResult SavelyExecute<TResult>(Func<TResult> a) {
    225215      int retries = 0;
    226       Guid guid = Guid.Empty;
    227216      do {
    228217        try {
    229           lock(connectionLock) {
    230             guid = server.BeginExecuteEngine(zippedEngine);
    231           }
    232           return guid;
    233         } catch(TimeoutException) {
     218          lock (connectionLock) {
     219            return a();
     220          }
     221        }
     222        catch (TimeoutException) {
    234223          retries++;
    235224          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    236         } catch(CommunicationException) {
     225        }
     226        catch (CommunicationException) {
    237227          ResetConnection();
    238228          retries++;
    239229          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    240230        }
    241       } while(retries < MAX_CONNECTION_RETRIES);
    242       Trace.TraceWarning("Reached max connection retries in TryStartExecuteEngine");
    243       return Guid.Empty;
    244     }
    245 
    246     private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
    247       int retries = 0;
    248       do {
    249         try {
    250           lock(connectionLock) {
    251             byte[] zippedResult = server.TryEndExecuteEngine(engineGuid);
    252             return zippedResult;
    253           }
    254         } catch(TimeoutException) {
    255           retries++;
    256           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    257         } catch(CommunicationException) {
    258           ResetConnection();
    259           retries++;
    260           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    261         }
    262       } while(retries < MAX_CONNECTION_RETRIES);
    263       Trace.TraceWarning("Reached max connection retries in TryEndExecuteEngine");
    264       return null;
    265     }
    266 
    267     private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
    268       // check if the server is still working on the job
    269       int retries = 0;
    270       do {
    271         try {
    272           lock(connectionLock) {
    273             JobState jobState = server.JobState(engineGuid);
    274             return jobState;
    275           }
    276         } catch(TimeoutException) {
    277           retries++;
    278           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    279         } catch(CommunicationException) {
    280           ResetConnection();
    281           retries++;
    282           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    283         }
    284       } while(retries < MAX_CONNECTION_RETRIES);
    285       Trace.TraceWarning("Reached max connection retries in TryGetJobState");
    286       return JobState.Unknown;
     231      } while (retries < MAX_CONNECTION_RETRIES);
     232      Trace.TraceWarning("Reached max connection retries");
     233      return default(TResult);
    287234    }
    288235  }
Note: See TracChangeset for help on using the changeset viewer.