Changeset 1154 for trunk/sources/HeuristicLab.Hive.Server.Core
- Timestamp:
- 01/18/09 12:58:11 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Hive.Server.Core/ClientCommunicator.cs
r1141 r1154 32 32 using System.Reflection; 33 33 using HeuristicLab.Hive.JobBase; 34 using System.Runtime.CompilerServices;35 34 using HeuristicLab.Hive.Server.Core.InternalInterfaces; 35 using System.Threading; 36 36 37 37 namespace HeuristicLab.Hive.Server.Core { … … 40 40 /// </summary> 41 41 public class ClientCommunicator: IClientCommunicator { 42 Dictionary<Guid, DateTime> lastHeartbeats =42 private static Dictionary<Guid, DateTime> lastHeartbeats = 43 43 new Dictionary<Guid,DateTime>(); 44 45 private static ReaderWriterLockSlim heartbeatLock = 46 new ReaderWriterLockSlim(); 47 48 private static Mutex jobLock = 49 new Mutex(); 44 50 45 51 IClientAdapter clientAdapter; … … 64 70 lifecycleManager.RegisterHeartbeat( 65 71 new EventHandler(lifecycleManager_OnServerHeartbeat)); 66 67 lastHeartbeats = new Dictionary<Guid, DateTime>();68 72 } 69 73 … … 74 78 /// <param name="sender"></param> 75 79 /// <param name="e"></param> 76 [MethodImpl(MethodImplOptions.Synchronized)]77 80 void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) { 78 81 List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll()); … … 81 84 foreach (ClientInfo client in allClients) { 82 85 if (client.State != State.offline && client.State != State.nullState) { 86 heartbeatLock.EnterUpgradeableReadLock(); 87 83 88 if (!lastHeartbeats.ContainsKey(client.ClientId)) { 84 89 client.State = State.offline; … … 86 91 } else { 87 92 DateTime lastHbOfClient = lastHeartbeats[client.ClientId]; 93 88 94 TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient); 89 95 // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF … … 102 108 client.State = State.offline; 103 109 clientAdapter.Update(client); 110 111 heartbeatLock.EnterWriteLock(); 104 112 lastHeartbeats.Remove(client.ClientId); 113 heartbeatLock.ExitWriteLock(); 105 114 } 106 115 } 116 117 heartbeatLock.ExitUpgradeableReadLock(); 107 118 } else { 119 heartbeatLock.EnterWriteLock(); 108 120 if (lastHeartbeats.ContainsKey(client.ClientId)) 109 121 lastHeartbeats.Remove(client.ClientId); 122 heartbeatLock.ExitWriteLock(); 110 123 } 111 124 } … … 120 133 /// <param name="clientInfo"></param> 121 134 /// <returns></returns> 122 [MethodImpl(MethodImplOptions.Synchronized)]123 135 public Response Login(ClientInfo clientInfo) { 124 136 Response response = new Response(); 125 137 138 heartbeatLock.EnterWriteLock(); 126 139 if (lastHeartbeats.ContainsKey(clientInfo.ClientId)) { 127 140 lastHeartbeats[clientInfo.ClientId] = DateTime.Now; … … 129 142 lastHeartbeats.Add(clientInfo.ClientId, DateTime.Now); 130 143 } 144 heartbeatLock.ExitWriteLock(); 131 145 132 146 ICollection<ClientInfo> allClients = clientAdapter.GetAll(); … … 152 166 /// <param name="hbData"></param> 153 167 /// <returns></returns> 154 [MethodImpl(MethodImplOptions.Synchronized)]155 168 public ResponseHB SendHeartBeat(HeartBeatData hbData) { 156 169 ResponseHB response = new ResponseHB(); … … 165 178 } 166 179 180 heartbeatLock.EnterWriteLock(); 167 181 if (lastHeartbeats.ContainsKey(hbData.ClientId)) { 168 182 lastHeartbeats[hbData.ClientId] = DateTime.Now; … … 170 184 lastHeartbeats.Add(hbData.ClientId, DateTime.Now); 171 185 } 186 heartbeatLock.ExitWriteLock(); 172 187 173 188 response.Success = true; … … 196 211 /// <param name="clientId"></param> 197 212 /// <returns></returns> 198 [MethodImpl(MethodImplOptions.Synchronized)]199 213 public ResponseJob PullJob(Guid clientId) { 200 214 ResponseJob response = new ResponseJob(); 201 lock (this) { 202 LinkedList<Job> allOfflineJobs = new LinkedList<Job>(jobAdapter.GetJobsByState(State.offline)); 203 if (allOfflineJobs != null && allOfflineJobs.Count > 0) { 204 Job job2Calculate = allOfflineJobs.First.Value; 205 job2Calculate.State = State.calculating; 206 job2Calculate.Client = clientAdapter.GetById(clientId); 207 response.Job = job2Calculate; 208 jobAdapter.Update(job2Calculate); 209 response.Success = true; 210 response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED; 211 return response; 212 } 213 } 215 216 /// Critical section /// 217 jobLock.WaitOne(); 218 219 LinkedList<Job> allOfflineJobs = new LinkedList<Job>(jobAdapter.GetJobsByState(State.offline)); 220 if (allOfflineJobs != null && allOfflineJobs.Count > 0) { 221 Job job2Calculate = allOfflineJobs.First.Value; 222 job2Calculate.State = State.calculating; 223 job2Calculate.Client = clientAdapter.GetById(clientId); 224 job2Calculate.Client.State = State.calculating; 225 226 response.Job = job2Calculate; 227 jobAdapter.Update(job2Calculate); 228 response.Success = true; 229 response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED; 230 return response; 231 } 232 233 jobLock.ReleaseMutex(); 234 214 235 response.Success = true; 215 236 response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT; … … 228 249 /// <param name="finished"></param> 229 250 /// <returns></returns> 230 [MethodImpl(MethodImplOptions.Synchronized)]231 251 public ResponseResultReceived SendJobResult(Guid clientId, 232 252 long jobId, … … 269 289 jobAdapter.Update(job); 270 290 291 client.State = State.idle; 292 clientAdapter.Update(client); 293 271 294 List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job)); 272 295 foreach (JobResult currentResult in jobResults) … … 298 321 /// </summary> 299 322 /// <param name="clientId"></param> 300 /// <returns></returns> 301 [MethodImpl(MethodImplOptions.Synchronized)] 323 /// <returns></returns> 302 324 public Response Logout(Guid clientId) { 303 325 Response response = new Response(); 304 326 327 heartbeatLock.EnterWriteLock(); 305 328 if (lastHeartbeats.ContainsKey(clientId)) 306 329 lastHeartbeats.Remove(clientId); 330 heartbeatLock.ExitWriteLock(); 307 331 308 332 ClientInfo client = clientAdapter.GetById(clientId);
Note: See TracChangeset
for help on using the changeset viewer.