Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/ClientCommunicator.cs @ 1337

Last change on this file since 1337 was 1334, checked in by msteinbi, 15 years ago

Merging defaultscheduler and ischeduler (dummy check in) with correct version (#507)

File size: 13.5 KB
RevLine 
[1121]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
[741]23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
[751]26using HeuristicLab.Hive.Contracts.BusinessObjects;
[780]27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
[823]29using HeuristicLab.Core;
[842]30using HeuristicLab.Hive.Server.Core.InternalInterfaces.DataAccess;
31using System.Resources;
32using System.Reflection;
[1001]33using HeuristicLab.Hive.JobBase;
[1141]34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
[1154]35using System.Threading;
[741]36
37namespace HeuristicLab.Hive.Server.Core {
[780]38  /// <summary>
39  /// The ClientCommunicator manages the whole communication with the client
40  /// </summary>
41  public class ClientCommunicator: IClientCommunicator {
[1154]42    private static Dictionary<Guid, DateTime> lastHeartbeats =
[1099]43      new Dictionary<Guid,DateTime>();
[783]44
[1154]45    private static ReaderWriterLockSlim heartbeatLock =
[1158]46      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
[1154]47
[842]48    IClientAdapter clientAdapter;
[970]49    IJobAdapter jobAdapter;
[1004]50    IJobResultsAdapter jobResultAdapter;
[1088]51    ILifecycleManager lifecycleManager;
[1141]52    IInternalJobManager jobManager;
[1272]53    IScheduler scheduler;
[842]54
[1121]55    /// <summary>
56    /// Initialization of the Adapters to the database
57    /// Initialization of Eventhandler for the lifecycle management
58    /// Initialization of lastHearbeats Dictionary
59    /// </summary>
[783]60    public ClientCommunicator() {
[970]61      clientAdapter = ServiceLocator.GetClientAdapter();
62      jobAdapter = ServiceLocator.GetJobAdapter();
[1011]63      jobResultAdapter = ServiceLocator.GetJobResultsAdapter();
[1088]64      lifecycleManager = ServiceLocator.GetLifecycleManager();
[1141]65      jobManager = ServiceLocator.GetJobManager() as
66        IInternalJobManager;
[1272]67      scheduler = ServiceLocator.GetScheduler();
[842]68
[1133]69      lifecycleManager.RegisterHeartbeat(
70        new EventHandler(lifecycleManager_OnServerHeartbeat));
[783]71    }
72
[1121]73    /// <summary>
74    /// Check if online clients send their hearbeats
75    /// if not -> set them offline and check if they where calculating a job
76    /// </summary>
77    /// <param name="sender"></param>
78    /// <param name="e"></param>
[1088]79    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
80      List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
81
82      foreach (ClientInfo client in allClients) {
[1096]83        if (client.State != State.offline && client.State != State.nullState) {
[1154]84          heartbeatLock.EnterUpgradeableReadLock();
85
[1096]86          if (!lastHeartbeats.ContainsKey(client.ClientId)) {
87            client.State = State.offline;
88            clientAdapter.Update(client);
[1166]89            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
[1160]90              jobManager.ResetJobsDependingOnResults(job);
91            }
[1096]92          } else {
93            DateTime lastHbOfClient = lastHeartbeats[client.ClientId];
[1154]94
[1099]95            TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
[1118]96            // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
97            if (dif.Seconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
98              // if client calculated jobs, the job must be reset
99              if (client.State == State.calculating) {
100                // check wich job the client was calculating and reset it
[1166]101                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
[1160]102                  jobManager.ResetJobsDependingOnResults(job);
[1118]103                }
104              }
105             
106              // client must be set offline
107              client.State = State.offline;
108              clientAdapter.Update(client);
[1154]109
110              heartbeatLock.EnterWriteLock();
[1118]111              lastHeartbeats.Remove(client.ClientId);
[1154]112              heartbeatLock.ExitWriteLock();
[1118]113            }
[1096]114          }
[1154]115
116          heartbeatLock.ExitUpgradeableReadLock();
[1096]117        } else {
[1154]118          heartbeatLock.EnterWriteLock();
[1096]119          if (lastHeartbeats.ContainsKey(client.ClientId))
120            lastHeartbeats.Remove(client.ClientId);
[1154]121          heartbeatLock.ExitWriteLock();
[1096]122        }
[1088]123      }
124    }
125
[741]126    #region IClientCommunicator Members
127
[1121]128    /// <summary>
129    /// Login process for the client
130    /// A hearbeat entry is created as well (login is the first hearbeat)
131    /// </summary>
132    /// <param name="clientInfo"></param>
133    /// <returns></returns>
[791]134    public Response Login(ClientInfo clientInfo) {
[741]135      Response response = new Response();
136
[1154]137      heartbeatLock.EnterWriteLock();
[1096]138      if (lastHeartbeats.ContainsKey(clientInfo.ClientId)) {
139        lastHeartbeats[clientInfo.ClientId] = DateTime.Now;
140      } else {
141        lastHeartbeats.Add(clientInfo.ClientId, DateTime.Now);
142      }
[1154]143      heartbeatLock.ExitWriteLock();
[1096]144
[995]145      ICollection<ClientInfo> allClients = clientAdapter.GetAll();
146      ClientInfo client = clientAdapter.GetById(clientInfo.ClientId);
[1096]147      if (client != null && client.State != State.offline && client.State != State.nullState) {
[1004]148        response.Success = false;
149        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
150        return response;
[842]151      }
[1096]152      clientInfo.State = State.idle;
[1004]153      clientAdapter.Update(clientInfo);
154      response.Success = true;
155      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
[842]156
[741]157      return response;
158    }
159
[1121]160    /// <summary>
161    /// The client has to send regulary heartbeats
162    /// this hearbeats will be stored in the heartbeats dictionary
163    /// check if there is work for the client and send the client a response if he should pull a job
164    /// </summary>
165    /// <param name="hbData"></param>
166    /// <returns></returns>
[780]167    public ResponseHB SendHeartBeat(HeartBeatData hbData) {
[783]168      ResponseHB response = new ResponseHB();
169
[1096]170      response.ActionRequest = new List<MessageContainer>();
171      if (clientAdapter.GetById(hbData.ClientId).State == State.offline ||
172          clientAdapter.GetById(hbData.ClientId).State == State.nullState) {
173        response.Success = false;
174        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
175        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
176        return response;
177      }
178
[1154]179      heartbeatLock.EnterWriteLock();
[1088]180      if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
181        lastHeartbeats[hbData.ClientId] = DateTime.Now;
182      } else {
183        lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
184      }
[1154]185      heartbeatLock.ExitWriteLock();
[1088]186
[783]187      response.Success = true;
[1272]188      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
189      if (hbData.freeCores > 0 && scheduler.ExistsJobForClient(hbData))
[783]190        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
191      else
192        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
193
[1124]194      if (hbData.jobProgress != null) {
[1166]195        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
[1160]196        if (jobsOfClient == null || jobsOfClient.Count == 0) {
197          response.Success = false;
198          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
199          return response;
200        }
201
[1124]202        foreach (KeyValuePair<long, double> jobProgress in hbData.jobProgress) {
203          Job curJob = jobAdapter.GetById(jobProgress.Key);
[1160]204          if (curJob.Client == null || curJob.Client.ClientId != hbData.ClientId) {
205            response.Success = false;
206            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
207          } else {
208            curJob.Percentage = jobProgress.Value;
209            jobAdapter.Update(curJob);
210          }
[1124]211        }
212      }
213
[783]214      return response;
[780]215    }
[1121]216   
217    /// <summary>
218    /// if the client asked to pull a job he calls this method
219    /// the server selects a job and sends it to the client
220    /// </summary>
221    /// <param name="clientId"></param>
222    /// <returns></returns>
[780]223    public ResponseJob PullJob(Guid clientId) {
[783]224      ResponseJob response = new ResponseJob();
[1154]225
[1272]226      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
227      if (job2Calculate != null) {
[1154]228        response.Job = job2Calculate;
229        response.Success = true;
230        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
[1160]231      } else {
[1272]232        response.Success = false;
233        response.Job = null;
[1160]234        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
[805]235      }
[941]236      return response;
[780]237    }
238
[1121]239    /// <summary>
240    /// the client can send job results during calculating
241    /// and will send a final job result when he finished calculating
242    /// these job results will be stored in the database
243    /// </summary>
244    /// <param name="clientId"></param>
245    /// <param name="jobId"></param>
246    /// <param name="result"></param>
247    /// <param name="exception"></param>
248    /// <param name="finished"></param>
249    /// <returns></returns>
[1103]250    public ResponseResultReceived SendJobResult(Guid clientId,
251      long jobId,
252      byte[] result,
[1133]253      double percentage,
[1103]254      Exception exception, 
255      bool finished) {
[838]256      ResponseResultReceived response = new ResponseResultReceived();
[1103]257      ClientInfo client =
258        clientAdapter.GetById(clientId);
259
260      Job job =
261        jobAdapter.GetById(jobId);
[1133]262
263      if (job.Client == null)    {
264        response.Success = false;
265        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
266        return response;
267      }
268      if (job.Client.ClientId != clientId) {
269        response.Success = false;
270        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
271        return response;
272      }
[1004]273      if (job == null) {
274        response.Success = false;
[1133]275        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
[1004]276        return response;
277      }
278      if (job.State != State.calculating) {
279        response.Success = false;
280        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
281        return response;
282      }
[1133]283      job.SerializedJob = result;
284      job.Percentage = percentage;
285
[1004]286      if (finished) {
287        job.State = State.finished;
288        jobAdapter.Update(job);
289
[1154]290        client.State = State.idle;
291        clientAdapter.Update(client);
292
[1004]293        List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
294        foreach (JobResult currentResult in jobResults)
295          jobResultAdapter.Delete(currentResult);
296      }
297
[1103]298      JobResult jobResult =
299        new JobResult();
300      jobResult.Client = client;
301      jobResult.Job = job;
302      jobResult.Result = result;
[1133]303      jobResult.Percentage = percentage;
[1103]304      jobResult.Exception = exception;
[1170]305      jobResult.DateFinished = DateTime.Now;
[1103]306
[1133]307      jobResultAdapter.Update(jobResult);
308      jobAdapter.Update(job);
[1103]309
[783]310      response.Success = true;
[929]311      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
[1103]312      response.JobId = jobId;
[1137]313      response.finished = finished;
[783]314
315      return response;
[780]316    }
[1096]317
[1121]318    /// <summary>
319    /// when a client logs out the state will be set
320    /// and the entry in the last hearbeats dictionary will be removed
321    /// </summary>
322    /// <param name="clientId"></param>
[1154]323    /// <returns></returns>                       
[780]324    public Response Logout(Guid clientId) {
[786]325      Response response = new Response();
[1096]326
[1154]327      heartbeatLock.EnterWriteLock();
[1096]328      if (lastHeartbeats.ContainsKey(clientId))
329        lastHeartbeats.Remove(clientId);
[1154]330      heartbeatLock.ExitWriteLock();
[1096]331
[995]332      ClientInfo client = clientAdapter.GetById(clientId);
[902]333      if (client == null) {
[783]334        response.Success = false;
[929]335        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
[902]336        return response;
[783]337      }
[1127]338      List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
339      if (client.State == State.calculating) {
340        // check wich job the client was calculating and reset it
341        foreach (Job job in allJobs) {
342          if (job.Client.ClientId == client.ClientId) {
[1141]343            jobManager.ResetJobsDependingOnResults(job);
[1127]344          }
345        }
346      }
347
[902]348      client.State = State.offline;
[995]349      clientAdapter.Update(client);
[902]350
351      response.Success = true;
[929]352      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
[902]353     
[783]354      return response;
[780]355    }
356
[741]357    #endregion
358  }
359}
Note: See TracBrowser for help on using the repository browser.