Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/ClientCommunicator.cs @ 1334

Last change on this file since 1334 was 1334, checked in by msteinbi, 15 years ago

Merging defaultscheduler and ischeduler (dummy check in) with correct version (#507)

File size: 13.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.Core.InternalInterfaces.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36
37namespace HeuristicLab.Hive.Server.Core {
38  /// <summary>
39  /// The ClientCommunicator manages the whole communication with the client
40  /// </summary>
41  public class ClientCommunicator: IClientCommunicator {
42    private static Dictionary<Guid, DateTime> lastHeartbeats =
43      new Dictionary<Guid,DateTime>();
44
45    private static ReaderWriterLockSlim heartbeatLock =
46      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
47
48    IClientAdapter clientAdapter;
49    IJobAdapter jobAdapter;
50    IJobResultsAdapter jobResultAdapter;
51    ILifecycleManager lifecycleManager;
52    IInternalJobManager jobManager;
53    IScheduler scheduler;
54
55    /// <summary>
56    /// Initialization of the Adapters to the database
57    /// Initialization of Eventhandler for the lifecycle management
58    /// Initialization of lastHearbeats Dictionary
59    /// </summary>
60    public ClientCommunicator() {
61      clientAdapter = ServiceLocator.GetClientAdapter();
62      jobAdapter = ServiceLocator.GetJobAdapter();
63      jobResultAdapter = ServiceLocator.GetJobResultsAdapter();
64      lifecycleManager = ServiceLocator.GetLifecycleManager();
65      jobManager = ServiceLocator.GetJobManager() as
66        IInternalJobManager;
67      scheduler = ServiceLocator.GetScheduler();
68
69      lifecycleManager.RegisterHeartbeat(
70        new EventHandler(lifecycleManager_OnServerHeartbeat));
71    }
72
73    /// <summary>
74    /// Check if online clients send their hearbeats
75    /// if not -> set them offline and check if they where calculating a job
76    /// </summary>
77    /// <param name="sender"></param>
78    /// <param name="e"></param>
79    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
80      List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
81
82      foreach (ClientInfo client in allClients) {
83        if (client.State != State.offline && client.State != State.nullState) {
84          heartbeatLock.EnterUpgradeableReadLock();
85
86          if (!lastHeartbeats.ContainsKey(client.ClientId)) {
87            client.State = State.offline;
88            clientAdapter.Update(client);
89            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
90              jobManager.ResetJobsDependingOnResults(job);
91            }
92          } else {
93            DateTime lastHbOfClient = lastHeartbeats[client.ClientId];
94
95            TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
96            // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
97            if (dif.Seconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
98              // if client calculated jobs, the job must be reset
99              if (client.State == State.calculating) {
100                // check wich job the client was calculating and reset it
101                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
102                  jobManager.ResetJobsDependingOnResults(job);
103                }
104              }
105             
106              // client must be set offline
107              client.State = State.offline;
108              clientAdapter.Update(client);
109
110              heartbeatLock.EnterWriteLock();
111              lastHeartbeats.Remove(client.ClientId);
112              heartbeatLock.ExitWriteLock();
113            }
114          }
115
116          heartbeatLock.ExitUpgradeableReadLock();
117        } else {
118          heartbeatLock.EnterWriteLock();
119          if (lastHeartbeats.ContainsKey(client.ClientId))
120            lastHeartbeats.Remove(client.ClientId);
121          heartbeatLock.ExitWriteLock();
122        }
123      }
124    }
125
126    #region IClientCommunicator Members
127
128    /// <summary>
129    /// Login process for the client
130    /// A hearbeat entry is created as well (login is the first hearbeat)
131    /// </summary>
132    /// <param name="clientInfo"></param>
133    /// <returns></returns>
134    public Response Login(ClientInfo clientInfo) {
135      Response response = new Response();
136
137      heartbeatLock.EnterWriteLock();
138      if (lastHeartbeats.ContainsKey(clientInfo.ClientId)) {
139        lastHeartbeats[clientInfo.ClientId] = DateTime.Now;
140      } else {
141        lastHeartbeats.Add(clientInfo.ClientId, DateTime.Now);
142      }
143      heartbeatLock.ExitWriteLock();
144
145      ICollection<ClientInfo> allClients = clientAdapter.GetAll();
146      ClientInfo client = clientAdapter.GetById(clientInfo.ClientId);
147      if (client != null && client.State != State.offline && client.State != State.nullState) {
148        response.Success = false;
149        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
150        return response;
151      }
152      clientInfo.State = State.idle;
153      clientAdapter.Update(clientInfo);
154      response.Success = true;
155      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
156
157      return response;
158    }
159
160    /// <summary>
161    /// The client has to send regulary heartbeats
162    /// this hearbeats will be stored in the heartbeats dictionary
163    /// check if there is work for the client and send the client a response if he should pull a job
164    /// </summary>
165    /// <param name="hbData"></param>
166    /// <returns></returns>
167    public ResponseHB SendHeartBeat(HeartBeatData hbData) {
168      ResponseHB response = new ResponseHB();
169
170      response.ActionRequest = new List<MessageContainer>();
171      if (clientAdapter.GetById(hbData.ClientId).State == State.offline ||
172          clientAdapter.GetById(hbData.ClientId).State == State.nullState) {
173        response.Success = false;
174        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
175        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
176        return response;
177      }
178
179      heartbeatLock.EnterWriteLock();
180      if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
181        lastHeartbeats[hbData.ClientId] = DateTime.Now;
182      } else {
183        lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
184      }
185      heartbeatLock.ExitWriteLock();
186
187      response.Success = true;
188      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
189      if (hbData.freeCores > 0 && scheduler.ExistsJobForClient(hbData))
190        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
191      else
192        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
193
194      if (hbData.jobProgress != null) {
195        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
196        if (jobsOfClient == null || jobsOfClient.Count == 0) {
197          response.Success = false;
198          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
199          return response;
200        }
201
202        foreach (KeyValuePair<long, double> jobProgress in hbData.jobProgress) {
203          Job curJob = jobAdapter.GetById(jobProgress.Key);
204          if (curJob.Client == null || curJob.Client.ClientId != hbData.ClientId) {
205            response.Success = false;
206            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
207          } else {
208            curJob.Percentage = jobProgress.Value;
209            jobAdapter.Update(curJob);
210          }
211        }
212      }
213
214      return response;
215    }
216   
217    /// <summary>
218    /// if the client asked to pull a job he calls this method
219    /// the server selects a job and sends it to the client
220    /// </summary>
221    /// <param name="clientId"></param>
222    /// <returns></returns>
223    public ResponseJob PullJob(Guid clientId) {
224      ResponseJob response = new ResponseJob();
225
226      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
227      if (job2Calculate != null) {
228        response.Job = job2Calculate;
229        response.Success = true;
230        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
231      } else {
232        response.Success = false;
233        response.Job = null;
234        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
235      }
236      return response;
237    }
238
239    /// <summary>
240    /// the client can send job results during calculating
241    /// and will send a final job result when he finished calculating
242    /// these job results will be stored in the database
243    /// </summary>
244    /// <param name="clientId"></param>
245    /// <param name="jobId"></param>
246    /// <param name="result"></param>
247    /// <param name="exception"></param>
248    /// <param name="finished"></param>
249    /// <returns></returns>
250    public ResponseResultReceived SendJobResult(Guid clientId,
251      long jobId,
252      byte[] result,
253      double percentage,
254      Exception exception, 
255      bool finished) {
256      ResponseResultReceived response = new ResponseResultReceived();
257      ClientInfo client =
258        clientAdapter.GetById(clientId);
259
260      Job job =
261        jobAdapter.GetById(jobId);
262
263      if (job.Client == null)    {
264        response.Success = false;
265        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
266        return response;
267      }
268      if (job.Client.ClientId != clientId) {
269        response.Success = false;
270        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
271        return response;
272      }
273      if (job == null) {
274        response.Success = false;
275        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
276        return response;
277      }
278      if (job.State != State.calculating) {
279        response.Success = false;
280        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
281        return response;
282      }
283      job.SerializedJob = result;
284      job.Percentage = percentage;
285
286      if (finished) {
287        job.State = State.finished;
288        jobAdapter.Update(job);
289
290        client.State = State.idle;
291        clientAdapter.Update(client);
292
293        List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
294        foreach (JobResult currentResult in jobResults)
295          jobResultAdapter.Delete(currentResult);
296      }
297
298      JobResult jobResult =
299        new JobResult();
300      jobResult.Client = client;
301      jobResult.Job = job;
302      jobResult.Result = result;
303      jobResult.Percentage = percentage;
304      jobResult.Exception = exception;
305      jobResult.DateFinished = DateTime.Now;
306
307      jobResultAdapter.Update(jobResult);
308      jobAdapter.Update(job);
309
310      response.Success = true;
311      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
312      response.JobId = jobId;
313      response.finished = finished;
314
315      return response;
316    }
317
318    /// <summary>
319    /// when a client logs out the state will be set
320    /// and the entry in the last hearbeats dictionary will be removed
321    /// </summary>
322    /// <param name="clientId"></param>
323    /// <returns></returns>                       
324    public Response Logout(Guid clientId) {
325      Response response = new Response();
326
327      heartbeatLock.EnterWriteLock();
328      if (lastHeartbeats.ContainsKey(clientId))
329        lastHeartbeats.Remove(clientId);
330      heartbeatLock.ExitWriteLock();
331
332      ClientInfo client = clientAdapter.GetById(clientId);
333      if (client == null) {
334        response.Success = false;
335        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
336        return response;
337      }
338      List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
339      if (client.State == State.calculating) {
340        // check wich job the client was calculating and reset it
341        foreach (Job job in allJobs) {
342          if (job.Client.ClientId == client.ClientId) {
343            jobManager.ResetJobsDependingOnResults(job);
344          }
345        }
346      }
347
348      client.State = State.offline;
349      clientAdapter.Update(client);
350
351      response.Success = true;
352      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
353     
354      return response;
355    }
356
357    #endregion
358  }
359}
Note: See TracBrowser for help on using the repository browser.