Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/18/09 15:01:10 (15 years ago)
Author:
gkronber
Message:

Refactored JobManager and added a plugin that contains a bridge between grid and hive. The bridge allows to use the execution engine service of Hive as a grid server. This way CEDMA job execution and DistributedEngine job execution can either use Hive or Grid as backend. #642 (Hive backend for CEDMA)

Location:
trunk/sources/HeuristicLab.Grid/3.2
Files:
1 added
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Grid/3.2/HeuristicLab.Grid-3.2.csproj

    r2055 r2058  
    9999    </Compile>
    100100    <Compile Include="AsyncGridResult.cs" />
     101    <Compile Include="GridServerProxy.cs" />
    101102    <Compile Include="Database.cs" />
    102103    <Compile Include="EngineRunner.cs" />
     
    128129      <Project>{94186A6A-5176-4402-AE83-886557B53CCA}</Project>
    129130      <Name>HeuristicLab.PluginInfrastructure</Name>
     131    </ProjectReference>
     132    <ProjectReference Include="..\..\HeuristicLab.Tracing\3.2\HeuristicLab.Tracing-3.2.csproj">
     133      <Project>{EE2034D9-6E27-48A1-B855-42D45F69A4FC}</Project>
     134      <Name>HeuristicLab.Tracing-3.2</Name>
    130135    </ProjectReference>
    131136  </ItemGroup>
  • trunk/sources/HeuristicLab.Grid/3.2/HeuristicLabGridPlugin.cs

    r1529 r2058  
    3030  [PluginFile(Filename = "HeuristicLab.Grid-3.2.dll", Filetype = PluginFileType.Assembly)]
    3131  [Dependency(Dependency = "HeuristicLab.Core-3.2")]
     32  [Dependency(Dependency = "HeuristicLab.Tracing-3.2")]
    3233  public class HeuristicLabGridPlugin : PluginBase {
    3334  }
  • trunk/sources/HeuristicLab.Grid/3.2/JobManager.cs

    r2055 r2058  
    3939  public class JobManager {
    4040    private const int MAX_RESTARTS = 5;
    41     private const int MAX_CONNECTION_RETRIES = 10;
    42     private const int RETRY_TIMEOUT_SEC = 60;
    4341    private const int RESULT_POLLING_TIMEOUT = 5;
    4442
     
    4947    private object runningQueueLock = new object();
    5048    private Queue<AsyncGridResult> runningJobs = new Queue<AsyncGridResult>();
    51     private object connectionLock = new object();
    52 
    5349    private AutoResetEvent runningWaitHandle = new AutoResetEvent(false);
    5450    private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false);
    5551
    56     private ChannelFactory<IGridServer> factory;
    5752
    58     public JobManager(string address) {
    59       this.address = address;
     53    public JobManager(IGridServer server) {
     54      this.server = server;
    6055      Thread starterThread = new Thread(StartEngines);
    6156      Thread resultsGatheringThread = new Thread(GetResults);
     
    6560
    6661    public void Reset() {
    67       ResetConnection();
    6862      lock (waitingQueueLock) {
    6963        foreach (AsyncGridResult r in waitingJobs) {
     
    8074    }
    8175
    82     private void ResetConnection() {
    83       Trace.TraceInformation("Reset connection in JobManager");
    84       lock (connectionLock) {
    85         // open a new channel
    86         NetTcpBinding binding = new NetTcpBinding();
    87         binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
    88         binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
    89         binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
    90 
    91         factory = new ChannelFactory<IGridServer>(binding);
    92         server = factory.CreateChannel(new EndpointAddress(address));
    93       }
    94     }
    9576
    9677    public void StartEngines() {
     
    10384          if (job == null) waitingWaitHandle.WaitOne(); // no jobs waiting
    10485          else {
    105             Guid currentEngineGuid = TryStartExecuteEngine(job.Engine);
     86            Guid currentEngineGuid = server.BeginExecuteEngine(ZipEngine(job.Engine));
    10687            if (currentEngineGuid == Guid.Empty) {
    10788              // couldn't start the job -> requeue
     
    141122          if (job == null) runningWaitHandle.WaitOne(); // no jobs running
    142123          else {
    143             byte[] zippedResult = TryEndExecuteEngine(server, job.Guid);
     124            byte[] zippedResult = server.TryEndExecuteEngine(job.Guid);
    144125            if (zippedResult != null) {
    145126              // successful => store result
     
    149130            } else {
    150131              // there was a problem -> check the state of the job and restart if necessary
    151               JobState jobState = TryGetJobState(server, job.Guid);
     132              JobState jobState = server.JobState(job.Guid);
    152133              if (jobState == JobState.Unknown) {
    153134                job.Restarts++;
     
    195176      }
    196177    }
    197 
    198     private Guid TryStartExecuteEngine(IEngine engine) {
    199       byte[] zippedEngine = ZipEngine(engine);
    200       return SavelyExecute(() => server.BeginExecuteEngine(zippedEngine));
    201     }
    202 
    203     private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
    204       return SavelyExecute(() => {
    205         byte[] zippedResult = server.TryEndExecuteEngine(engineGuid);
    206         return zippedResult;
    207       });
    208     }
    209 
    210     private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
    211       return SavelyExecute(() => server.JobState(engineGuid));
    212     }
    213 
    214     private TResult SavelyExecute<TResult>(Func<TResult> a) {
    215       int retries = 0;
    216       do {
    217         try {
    218           lock (connectionLock) {
    219             return a();
    220           }
    221         }
    222         catch (TimeoutException) {
    223           retries++;
    224           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    225         }
    226         catch (CommunicationException) {
    227           ResetConnection();
    228           retries++;
    229           Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
    230         }
    231       } while (retries < MAX_CONNECTION_RETRIES);
    232       Trace.TraceWarning("Reached max connection retries");
    233       return default(TResult);
    234     }
    235178  }
    236179}
Note: See TracChangeset for help on using the changeset viewer.