Changeset 1154


Ignore:
Timestamp:
01/18/09 12:58:11 (12 years ago)
Author:
svonolfe
Message:

Improved locking to allow more concurrency in the ClientCommunicator (#399)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Hive.Server.Core/ClientCommunicator.cs

    r1141 r1154  
    3232using System.Reflection;
    3333using HeuristicLab.Hive.JobBase;
    34 using System.Runtime.CompilerServices;
    3534using HeuristicLab.Hive.Server.Core.InternalInterfaces;
     35using System.Threading;
    3636
    3737namespace HeuristicLab.Hive.Server.Core {
     
    4040  /// </summary>
    4141  public class ClientCommunicator: IClientCommunicator {
    42     Dictionary<Guid, DateTime> lastHeartbeats =
     42    private static Dictionary<Guid, DateTime> lastHeartbeats =
    4343      new Dictionary<Guid,DateTime>();
     44
     45    private static ReaderWriterLockSlim heartbeatLock =
     46      new ReaderWriterLockSlim();
     47
     48    private static Mutex jobLock =
     49      new Mutex();
    4450
    4551    IClientAdapter clientAdapter;
     
    6470      lifecycleManager.RegisterHeartbeat(
    6571        new EventHandler(lifecycleManager_OnServerHeartbeat));
    66 
    67       lastHeartbeats = new Dictionary<Guid, DateTime>();
    6872    }
    6973
     
    7478    /// <param name="sender"></param>
    7579    /// <param name="e"></param>
    76     [MethodImpl(MethodImplOptions.Synchronized)]
    7780    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
    7881      List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
     
    8184      foreach (ClientInfo client in allClients) {
    8285        if (client.State != State.offline && client.State != State.nullState) {
     86          heartbeatLock.EnterUpgradeableReadLock();
     87
    8388          if (!lastHeartbeats.ContainsKey(client.ClientId)) {
    8489            client.State = State.offline;
     
    8691          } else {
    8792            DateTime lastHbOfClient = lastHeartbeats[client.ClientId];
     93
    8894            TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
    8995            // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
     
    102108              client.State = State.offline;
    103109              clientAdapter.Update(client);
     110
     111              heartbeatLock.EnterWriteLock();
    104112              lastHeartbeats.Remove(client.ClientId);
     113              heartbeatLock.ExitWriteLock();
    105114            }
    106115          }
     116
     117          heartbeatLock.ExitUpgradeableReadLock();
    107118        } else {
     119          heartbeatLock.EnterWriteLock();
    108120          if (lastHeartbeats.ContainsKey(client.ClientId))
    109121            lastHeartbeats.Remove(client.ClientId);
     122          heartbeatLock.ExitWriteLock();
    110123        }
    111124      }
     
    120133    /// <param name="clientInfo"></param>
    121134    /// <returns></returns>
    122     [MethodImpl(MethodImplOptions.Synchronized)]
    123135    public Response Login(ClientInfo clientInfo) {
    124136      Response response = new Response();
    125137
     138      heartbeatLock.EnterWriteLock();
    126139      if (lastHeartbeats.ContainsKey(clientInfo.ClientId)) {
    127140        lastHeartbeats[clientInfo.ClientId] = DateTime.Now;
     
    129142        lastHeartbeats.Add(clientInfo.ClientId, DateTime.Now);
    130143      }
     144      heartbeatLock.ExitWriteLock();
    131145
    132146      ICollection<ClientInfo> allClients = clientAdapter.GetAll();
     
    152166    /// <param name="hbData"></param>
    153167    /// <returns></returns>
    154     [MethodImpl(MethodImplOptions.Synchronized)]
    155168    public ResponseHB SendHeartBeat(HeartBeatData hbData) {
    156169      ResponseHB response = new ResponseHB();
     
    165178      }
    166179
     180      heartbeatLock.EnterWriteLock();
    167181      if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
    168182        lastHeartbeats[hbData.ClientId] = DateTime.Now;
     
    170184        lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
    171185      }
     186      heartbeatLock.ExitWriteLock();
    172187
    173188      response.Success = true;
     
    196211    /// <param name="clientId"></param>
    197212    /// <returns></returns>
    198     [MethodImpl(MethodImplOptions.Synchronized)]
    199213    public ResponseJob PullJob(Guid clientId) {
    200214      ResponseJob response = new ResponseJob();
    201       lock (this) {
    202         LinkedList<Job> allOfflineJobs = new LinkedList<Job>(jobAdapter.GetJobsByState(State.offline));
    203         if (allOfflineJobs != null && allOfflineJobs.Count > 0) {
    204           Job job2Calculate = allOfflineJobs.First.Value;
    205           job2Calculate.State = State.calculating;
    206           job2Calculate.Client = clientAdapter.GetById(clientId);
    207           response.Job = job2Calculate;
    208           jobAdapter.Update(job2Calculate);
    209           response.Success = true;
    210           response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
    211           return response;
    212         }
    213       }
     215     
     216      /// Critical section ///
     217      jobLock.WaitOne();
     218
     219      LinkedList<Job> allOfflineJobs = new LinkedList<Job>(jobAdapter.GetJobsByState(State.offline));
     220      if (allOfflineJobs != null && allOfflineJobs.Count > 0) {
     221        Job job2Calculate = allOfflineJobs.First.Value;
     222        job2Calculate.State = State.calculating;
     223        job2Calculate.Client = clientAdapter.GetById(clientId);
     224        job2Calculate.Client.State = State.calculating;
     225
     226        response.Job = job2Calculate;
     227        jobAdapter.Update(job2Calculate);
     228        response.Success = true;
     229        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
     230        return response;
     231      }
     232
     233      jobLock.ReleaseMutex();
     234
    214235      response.Success = true;
    215236      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
     
    228249    /// <param name="finished"></param>
    229250    /// <returns></returns>
    230     [MethodImpl(MethodImplOptions.Synchronized)]
    231251    public ResponseResultReceived SendJobResult(Guid clientId,
    232252      long jobId,
     
    269289        jobAdapter.Update(job);
    270290
     291        client.State = State.idle;
     292        clientAdapter.Update(client);
     293
    271294        List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
    272295        foreach (JobResult currentResult in jobResults)
     
    298321    /// </summary>
    299322    /// <param name="clientId"></param>
    300     /// <returns></returns>
    301     [MethodImpl(MethodImplOptions.Synchronized)]                       
     323    /// <returns></returns>                       
    302324    public Response Logout(Guid clientId) {
    303325      Response response = new Response();
    304326
     327      heartbeatLock.EnterWriteLock();
    305328      if (lastHeartbeats.ContainsKey(clientId))
    306329        lastHeartbeats.Remove(clientId);
     330      heartbeatLock.ExitWriteLock();
    307331
    308332      ClientInfo client = clientAdapter.GetById(clientId);
Note: See TracChangeset for help on using the changeset viewer.