Changeset 2058 for trunk/sources/HeuristicLab.Grid/3.2
- Timestamp:
- 06/18/09 15:01:10 (15 years ago)
- Location:
- trunk/sources/HeuristicLab.Grid/3.2
- Files:
-
- 1 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Grid/3.2/HeuristicLab.Grid-3.2.csproj
r2055 r2058 99 99 </Compile> 100 100 <Compile Include="AsyncGridResult.cs" /> 101 <Compile Include="GridServerProxy.cs" /> 101 102 <Compile Include="Database.cs" /> 102 103 <Compile Include="EngineRunner.cs" /> … … 128 129 <Project>{94186A6A-5176-4402-AE83-886557B53CCA}</Project> 129 130 <Name>HeuristicLab.PluginInfrastructure</Name> 131 </ProjectReference> 132 <ProjectReference Include="..\..\HeuristicLab.Tracing\3.2\HeuristicLab.Tracing-3.2.csproj"> 133 <Project>{EE2034D9-6E27-48A1-B855-42D45F69A4FC}</Project> 134 <Name>HeuristicLab.Tracing-3.2</Name> 130 135 </ProjectReference> 131 136 </ItemGroup> -
trunk/sources/HeuristicLab.Grid/3.2/HeuristicLabGridPlugin.cs
r1529 r2058 30 30 [PluginFile(Filename = "HeuristicLab.Grid-3.2.dll", Filetype = PluginFileType.Assembly)] 31 31 [Dependency(Dependency = "HeuristicLab.Core-3.2")] 32 [Dependency(Dependency = "HeuristicLab.Tracing-3.2")] 32 33 public class HeuristicLabGridPlugin : PluginBase { 33 34 } -
trunk/sources/HeuristicLab.Grid/3.2/JobManager.cs
r2055 r2058 39 39 public class JobManager { 40 40 private const int MAX_RESTARTS = 5; 41 private const int MAX_CONNECTION_RETRIES = 10;42 private const int RETRY_TIMEOUT_SEC = 60;43 41 private const int RESULT_POLLING_TIMEOUT = 5; 44 42 … … 49 47 private object runningQueueLock = new object(); 50 48 private Queue<AsyncGridResult> runningJobs = new Queue<AsyncGridResult>(); 51 private object connectionLock = new object();52 53 49 private AutoResetEvent runningWaitHandle = new AutoResetEvent(false); 54 50 private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false); 55 51 56 private ChannelFactory<IGridServer> factory;57 52 58 public JobManager( string address) {59 this. address = address;53 public JobManager(IGridServer server) { 54 this.server = server; 60 55 Thread starterThread = new Thread(StartEngines); 61 56 Thread resultsGatheringThread = new Thread(GetResults); … … 65 60 66 61 public void Reset() { 67 ResetConnection();68 62 lock (waitingQueueLock) { 69 63 foreach (AsyncGridResult r in waitingJobs) { … … 80 74 } 81 75 82 private void ResetConnection() {83 Trace.TraceInformation("Reset connection in JobManager");84 lock (connectionLock) {85 // open a new channel86 NetTcpBinding binding = new NetTcpBinding();87 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes88 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars89 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;90 91 factory = new ChannelFactory<IGridServer>(binding);92 server = factory.CreateChannel(new EndpointAddress(address));93 }94 }95 76 96 77 public void StartEngines() { … … 103 84 if (job == null) waitingWaitHandle.WaitOne(); // no jobs waiting 104 85 else { 105 Guid currentEngineGuid = TryStartExecuteEngine(job.Engine);86 Guid currentEngineGuid = server.BeginExecuteEngine(ZipEngine(job.Engine)); 106 87 if (currentEngineGuid == Guid.Empty) { 107 88 // couldn't start the job -> requeue … … 141 122 if (job == null) runningWaitHandle.WaitOne(); // no jobs running 142 123 else { 143 byte[] zippedResult = TryEndExecuteEngine(server,job.Guid);124 byte[] zippedResult = server.TryEndExecuteEngine(job.Guid); 144 125 if (zippedResult != null) { 145 126 // successful => store result … … 149 130 } else { 150 131 // there was a problem -> check the state of the job and restart if necessary 151 JobState jobState = TryGetJobState(server,job.Guid);132 JobState jobState = server.JobState(job.Guid); 152 133 if (jobState == JobState.Unknown) { 153 134 job.Restarts++; … … 195 176 } 196 177 } 197 198 private Guid TryStartExecuteEngine(IEngine engine) {199 byte[] zippedEngine = ZipEngine(engine);200 return SavelyExecute(() => server.BeginExecuteEngine(zippedEngine));201 }202 203 private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {204 return SavelyExecute(() => {205 byte[] zippedResult = server.TryEndExecuteEngine(engineGuid);206 return zippedResult;207 });208 }209 210 private JobState TryGetJobState(IGridServer server, Guid engineGuid) {211 return SavelyExecute(() => server.JobState(engineGuid));212 }213 214 private TResult SavelyExecute<TResult>(Func<TResult> a) {215 int retries = 0;216 do {217 try {218 lock (connectionLock) {219 return a();220 }221 }222 catch (TimeoutException) {223 retries++;224 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));225 }226 catch (CommunicationException) {227 ResetConnection();228 retries++;229 Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));230 }231 } while (retries < MAX_CONNECTION_RETRIES);232 Trace.TraceWarning("Reached max connection retries");233 return default(TResult);234 }235 178 } 236 179 }
Note: See TracChangeset
for help on using the changeset viewer.