Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/ClientCommunicator.cs @ 1377

Last change on this file since 1377 was 1377, checked in by svonolfe, 15 years ago

Created Heuristiclab DB Core (refactoring) #527

File size: 15.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36
37namespace HeuristicLab.Hive.Server.Core {
38  /// <summary>
39  /// The ClientCommunicator manages the whole communication with the client
40  /// </summary>
41  public class ClientCommunicator: IClientCommunicator {
42    private static Dictionary<Guid, DateTime> lastHeartbeats =
43      new Dictionary<Guid,DateTime>();
44
45    private static ReaderWriterLockSlim heartbeatLock =
46      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
47
48    private IClientAdapter clientAdapter;
49    private IJobAdapter jobAdapter;
50    private IJobResultsAdapter jobResultAdapter;
51    private ILifecycleManager lifecycleManager;
52    private IInternalJobManager jobManager;
53    private IScheduler scheduler;
54
55    /// <summary>
56    /// Initialization of the Adapters to the database
57    /// Initialization of Eventhandler for the lifecycle management
58    /// Initialization of lastHearbeats Dictionary
59    /// </summary>
60    public ClientCommunicator() {
61      clientAdapter = ServiceLocator.GetClientAdapter();
62      jobAdapter = ServiceLocator.GetJobAdapter();
63      jobResultAdapter = ServiceLocator.GetJobResultsAdapter();
64      lifecycleManager = ServiceLocator.GetLifecycleManager();
65      jobManager = ServiceLocator.GetJobManager() as
66        IInternalJobManager;
67      scheduler = ServiceLocator.GetScheduler();
68
69      lifecycleManager.RegisterHeartbeat(
70        new EventHandler(lifecycleManager_OnServerHeartbeat));
71    }
72
73    /// <summary>
74    /// Check if online clients send their hearbeats
75    /// if not -> set them offline and check if they where calculating a job
76    /// </summary>
77    /// <param name="sender"></param>
78    /// <param name="e"></param>
79    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
80      List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
81
82      foreach (ClientInfo client in allClients) {
83        if (client.State != State.offline && client.State != State.nullState) {
84          heartbeatLock.EnterUpgradeableReadLock();
85
86          if (!lastHeartbeats.ContainsKey(client.ClientId)) {
87            client.State = State.offline;
88            clientAdapter.Update(client);
89            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
90              jobManager.ResetJobsDependingOnResults(job);
91            }
92          } else {
93            DateTime lastHbOfClient = lastHeartbeats[client.ClientId];
94
95            TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
96            // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
97            if (dif.Seconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
98              // if client calculated jobs, the job must be reset
99              if (client.State == State.calculating) {
100                // check wich job the client was calculating and reset it
101                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
102                  jobManager.ResetJobsDependingOnResults(job);
103                }
104              }
105             
106              // client must be set offline
107              client.State = State.offline;
108              clientAdapter.Update(client);
109
110              heartbeatLock.EnterWriteLock();
111              lastHeartbeats.Remove(client.ClientId);
112              heartbeatLock.ExitWriteLock();
113            }
114          }
115
116          heartbeatLock.ExitUpgradeableReadLock();
117        } else {
118          heartbeatLock.EnterWriteLock();
119          if (lastHeartbeats.ContainsKey(client.ClientId))
120            lastHeartbeats.Remove(client.ClientId);
121          heartbeatLock.ExitWriteLock();
122        }
123      }
124    }
125
126    #region IClientCommunicator Members
127
128    /// <summary>
129    /// Login process for the client
130    /// A hearbeat entry is created as well (login is the first hearbeat)
131    /// </summary>
132    /// <param name="clientInfo"></param>
133    /// <returns></returns>
134    public Response Login(ClientInfo clientInfo) {
135      Response response = new Response();
136
137      heartbeatLock.EnterWriteLock();
138      if (lastHeartbeats.ContainsKey(clientInfo.ClientId)) {
139        lastHeartbeats[clientInfo.ClientId] = DateTime.Now;
140      } else {
141        lastHeartbeats.Add(clientInfo.ClientId, DateTime.Now);
142      }
143      heartbeatLock.ExitWriteLock();
144
145      // todo: allClients legacy ?
146      ClientInfo client = clientAdapter.GetById(clientInfo.ClientId);
147      if (client != null && client.State != State.offline && client.State != State.nullState) {
148        response.Success = false;
149        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
150        return response;
151      }
152      clientInfo.State = State.idle;
153      clientAdapter.Update(clientInfo);
154      response.Success = true;
155      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
156
157      return response;
158    }
159
160    /// <summary>
161    /// The client has to send regulary heartbeats
162    /// this hearbeats will be stored in the heartbeats dictionary
163    /// check if there is work for the client and send the client a response if he should pull a job
164    /// </summary>
165    /// <param name="hbData"></param>
166    /// <returns></returns>
167    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
168      ResponseHB response = new ResponseHB();
169
170      // check if the client is logged in
171      response.ActionRequest = new List<MessageContainer>();
172      if (clientAdapter.GetById(hbData.ClientId).State == State.offline ||
173          clientAdapter.GetById(hbData.ClientId).State == State.nullState) {
174        response.Success = false;
175        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
176        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
177        return response;
178      }
179
180      // save timestamp of this heartbeat
181      heartbeatLock.EnterWriteLock();
182      if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
183        lastHeartbeats[hbData.ClientId] = DateTime.Now;
184      } else {
185        lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
186      }
187      heartbeatLock.ExitWriteLock();
188
189      response.Success = true;
190      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
191      // check if client has a free core for a new job
192      // if true, ask scheduler for a new job for this client
193      if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
194        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
195      else
196        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
197
198      if (hbData.JobProgress != null) {
199        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
200        if (jobsOfClient == null || jobsOfClient.Count == 0) {
201          response.Success = false;
202          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
203          return response;
204        }
205
206        foreach (KeyValuePair<long, double> jobProgress in hbData.JobProgress) {
207          Job curJob = jobAdapter.GetById(jobProgress.Key);
208          if (curJob.Client == null || curJob.Client.ClientId != hbData.ClientId) {
209            response.Success = false;
210            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
211          } else if(curJob.State == State.finished) {
212            // another client has finished this job allready
213            // the client can abort it
214            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));       
215          } else {
216            // save job progress
217            curJob.Percentage = jobProgress.Value;
218            jobAdapter.Update(curJob);
219          }
220        }
221      }
222
223      return response;
224    }
225   
226    /// <summary>
227    /// if the client asked to pull a job he calls this method
228    /// the server selects a job and sends it to the client
229    /// </summary>
230    /// <param name="clientId"></param>
231    /// <returns></returns>
232    public ResponseJob SendJob(Guid clientId) {
233      ResponseJob response = new ResponseJob();
234
235      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
236      if (job2Calculate != null) {
237        response.Job = job2Calculate;
238        response.Success = true;
239        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
240      } else {
241        response.Success = false;
242        response.Job = null;
243        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
244      }
245      return response;
246    }
247
248    private ResponseResultReceived ProcessJobResult(Guid clientId,
249      long jobId,
250      byte[] result,
251      double percentage,
252      Exception exception,
253      bool finished) {
254
255      ResponseResultReceived response = new ResponseResultReceived();
256      ClientInfo client =
257        clientAdapter.GetById(clientId);
258
259      Job job =
260        jobAdapter.GetById(jobId);
261
262      if (job.Client == null) {
263        response.Success = false;
264        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
265        return response;
266      }
267      if (job.Client.ClientId != clientId) {
268        response.Success = false;
269        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
270        return response;
271      }
272      if (job == null) {
273        response.Success = false;
274        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
275        return response;
276      }
277      if (job.State == State.finished) {
278        response.Success = true;
279        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
280        return response;
281      }
282      if (job.State != State.calculating) {
283        response.Success = false;
284        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
285        return response;
286      }
287      job.SerializedJob = result;
288      job.Percentage = percentage;
289
290      if (finished) {
291        job.State = State.finished;
292        jobAdapter.Update(job);
293
294        client.State = State.idle;
295        clientAdapter.Update(client);
296
297        List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
298        foreach (JobResult currentResult in jobResults)
299          jobResultAdapter.Delete(currentResult);
300      }
301
302      JobResult jobResult =
303        new JobResult();
304      jobResult.Client = client;
305      jobResult.Job = job;
306      jobResult.Result = result;
307      jobResult.Percentage = percentage;
308      jobResult.Exception = exception;
309      jobResult.DateFinished = DateTime.Now;
310
311      jobResultAdapter.Update(jobResult);
312      jobAdapter.Update(job);
313
314      response.Success = true;
315      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
316      response.JobId = jobId;
317      response.finished = finished;
318
319      return response;
320    }
321
322
323    /// <summary>
324    /// the client can send job results during calculating
325    /// and will send a final job result when he finished calculating
326    /// these job results will be stored in the database
327    /// </summary>
328    /// <param name="clientId"></param>
329    /// <param name="jobId"></param>
330    /// <param name="result"></param>
331    /// <param name="exception"></param>
332    /// <param name="finished"></param>
333    /// <returns></returns>
334    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
335      long jobId,
336      byte[] result,
337      double percentage,
338      Exception exception) {
339
340      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
341    }
342
343
344    public ResponseResultReceived ProcessSnapshot(Guid clientId, long jobId, byte[] result, double percentage, Exception exception) {
345      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
346    }
347
348    /// <summary>
349    /// when a client logs out the state will be set
350    /// and the entry in the last hearbeats dictionary will be removed
351    /// </summary>
352    /// <param name="clientId"></param>
353    /// <returns></returns>                       
354    public Response Logout(Guid clientId) {
355      Response response = new Response();
356
357      heartbeatLock.EnterWriteLock();
358      if (lastHeartbeats.ContainsKey(clientId))
359        lastHeartbeats.Remove(clientId);
360      heartbeatLock.ExitWriteLock();
361
362      ClientInfo client = clientAdapter.GetById(clientId);
363      if (client == null) {
364        response.Success = false;
365        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
366        return response;
367      }
368      List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
369      if (client.State == State.calculating) {
370        // check wich job the client was calculating and reset it
371        foreach (Job job in allJobs) {
372          if (job.Client.ClientId == client.ClientId) {
373            jobManager.ResetJobsDependingOnResults(job);
374          }
375        }
376      }
377
378      client.State = State.offline;
379      clientAdapter.Update(client);
380
381      response.Success = true;
382      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
383     
384      return response;
385    }
386
387    /// <summary>
388    /// If a client goes offline and restores a job he was calculating
389    /// he can ask the client if he still needs the job result
390    /// </summary>
391    /// <param name="jobId"></param>
392    /// <returns></returns>
393    public Response IsJobStillNeeded(long jobId) {
394      Response response = new Response();
395      Job job = jobAdapter.GetById(jobId);
396      if (job == null) {
397        response.Success = false;
398        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
399        return response;
400      }
401      if (job.State == State.finished) {
402        response.Success = true;
403        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
404        return response;
405      }
406      job.State = State.finished;
407      jobAdapter.Update(job);
408     
409      response.Success = true;
410      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
411      return response;
412    }
413
414    public ResponsePlugin SendPlugins(List<string> pluginList) {
415      throw new NotImplementedException();
416    }
417
418    #endregion
419  }
420}
Note: See TracBrowser for help on using the repository browser.