Changeset 279 for branches/3.0/sources/HeuristicLab.Grid/EngineStore.cs
- Timestamp:
- 05/30/08 13:41:27 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3.0/sources/HeuristicLab.Grid/EngineStore.cs
r35 r279 34 34 private Dictionary<Guid, ManualResetEvent> waitHandles; 35 35 private Dictionary<Guid, byte[]> results; 36 private Dictionary<Guid, string> runningClients; 36 private Dictionary<Guid, DateTime> resultDate; 37 private Dictionary<Guid, DateTime> runningEngineDate; 38 private const int RESULT_EXPIRY_TIME_MIN = 10; 39 private const int RUNNING_JOB_EXPIRY_TIME_MIN = 10; 37 40 private object bigLock; 38 private ChannelFactory<IClient> clientChannelFactory;39 41 public int WaitingJobs { 40 42 get { … … 59 61 waitingEngines = new Dictionary<Guid, byte[]>(); 60 62 runningEngines = new Dictionary<Guid, byte[]>(); 61 runningClients = new Dictionary<Guid, string>();62 63 waitHandles = new Dictionary<Guid, ManualResetEvent>(); 63 64 results = new Dictionary<Guid, byte[]>(); 65 resultDate = new Dictionary<Guid, DateTime>(); 66 runningEngineDate = new Dictionary<Guid, DateTime>(); 64 67 bigLock = new object(); 65 66 NetTcpBinding binding = new NetTcpBinding();67 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes68 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars69 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;70 binding.Security.Mode = SecurityMode.None;71 72 clientChannelFactory = new ChannelFactory<IClient>(binding);73 68 } 74 69 75 public bool TryTakeEngine( string clientUrl,out Guid guid, out byte[] engine) {70 public bool TryTakeEngine(out Guid guid, out byte[] engine) { 76 71 lock(bigLock) { 77 72 if(engineList.Count == 0) { … … 85 80 waitingEngines.Remove(guid); 86 81 runningEngines[guid] = engine; 87 running Clients[guid] = clientUrl;82 runningEngineDate[guid] = DateTime.Now; 88 83 return true; 89 84 } … … 93 88 public void StoreResult(Guid guid, byte[] result) { 94 89 lock(bigLock) { 95 if(!runningEngines.ContainsKey(guid)) return; // ignore result when the engine is not known to be running 96 90 // clear old results 91 List<Guid> expiredResults = FindExpiredResults(DateTime.Now.AddMinutes(-RESULT_EXPIRY_TIME_MIN)); 92 foreach(Guid expiredGuid in expiredResults) { 93 results.Remove(expiredGuid); 94 waitHandles.Remove(expiredGuid); 95 resultDate.Remove(expiredGuid); 96 } 97 // add the new result 97 98 runningEngines.Remove(guid); 98 running Clients.Remove(guid);99 runningEngineDate.Remove(guid); 99 100 results[guid] = result; 101 resultDate[guid] = DateTime.Now; 100 102 waitHandles[guid].Set(); 101 103 } 104 } 105 106 private List<Guid> FindExpiredResults(DateTime expirationDate) { 107 List<Guid> expiredResults = new List<Guid>(); 108 foreach(Guid guid in results.Keys) { 109 if(resultDate[guid] < expirationDate) { 110 expiredResults.Add(guid); 111 } 112 } 113 return expiredResults; 114 } 115 private List<Guid> FindExpiredJobs(DateTime expirationDate) { 116 List<Guid> expiredJobs = new List<Guid>(); 117 foreach(Guid guid in runningEngines.Keys) { 118 if(runningEngineDate[guid] < expirationDate) { 119 expiredJobs.Add(guid); 120 } 121 } 122 return expiredJobs; 102 123 } 103 124 … … 113 134 return GetResult(guid, System.Threading.Timeout.Infinite); 114 135 } 136 115 137 internal byte[] GetResult(Guid guid, int timeout) { 116 138 lock(bigLock) { 117 if(waitHandles.ContainsKey(guid)) { 139 // result already available 140 if(results.ContainsKey(guid)) { 141 // if the wait-handle for this result is still alive then close and remove it 142 if(waitHandles.ContainsKey(guid)) { 143 ManualResetEvent waitHandle = waitHandles[guid]; 144 waitHandle.Close(); 145 waitHandles.Remove(guid); 146 } 147 byte[] result = results[guid]; 148 results.Remove(guid); 149 return result; 150 } else { 151 // result not yet available, if there is also no wait-handle for that result then we will never have a result and can return null 152 if(!waitHandles.ContainsKey(guid)) return null; 153 // otherwise we have a wait-handle and can wait for the result 118 154 ManualResetEvent waitHandle = waitHandles[guid]; 155 // wait 119 156 if(waitHandle.WaitOne(timeout, true)) { 157 // ok got the result in within the wait time => close and remove the wait-hande and return the result 120 158 waitHandle.Close(); 121 159 waitHandles.Remove(guid); 122 160 byte[] result = results[guid]; 123 results.Remove(guid);124 161 return result; 125 162 } else { 163 // no result yet, check for which jobs we waited too long and requeue those jobs 164 List<Guid> expiredJobs = FindExpiredJobs(DateTime.Now.AddMinutes(-RUNNING_JOB_EXPIRY_TIME_MIN)); 165 foreach(Guid expiredGuid in expiredJobs) { 166 engineList.Insert(0, expiredGuid); 167 waitingEngines[expiredGuid] = runningEngines[expiredGuid]; 168 runningEngines.Remove(expiredGuid); 169 runningEngineDate.Remove(expiredGuid); 170 } 126 171 return null; 127 172 } 128 } else {129 return null;130 173 } 131 174 } … … 133 176 134 177 internal void AbortEngine(Guid guid) { 135 string clientUrl = ""; 178 throw new NotImplementedException(); 179 } 180 181 internal JobState JobState(Guid guid) { 136 182 lock(bigLock) { 137 if(runningClients.ContainsKey(guid)) { 138 clientUrl = runningClients[guid]; 139 IClient client = clientChannelFactory.CreateChannel(new EndpointAddress(clientUrl)); 140 client.Abort(guid); 141 } else if(waitingEngines.ContainsKey(guid)) { 142 byte[] engine = waitingEngines[guid]; 143 waitingEngines.Remove(guid); 144 engineList.Remove(guid); 145 waitHandles[guid].Set(); 146 results.Add(guid, engine); 147 } 183 if(waitingEngines.ContainsKey(guid)) return HeuristicLab.Grid.JobState.Waiting; 184 else if(waitHandles.ContainsKey(guid)) return HeuristicLab.Grid.JobState.Busy; 185 else if(results.ContainsKey(guid)) return HeuristicLab.Grid.JobState.Finished; 186 else return HeuristicLab.Grid.JobState.Unkown; 148 187 } 149 188 }
Note: See TracChangeset
for help on using the changeset viewer.