Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/ClientCommunicator.cs @ 1369

Last change on this file since 1369 was 1369, checked in by msteinbi, 15 years ago

Implementing Lifecycle Management (#453)

File size: 15.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.Core.InternalInterfaces.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36
37namespace HeuristicLab.Hive.Server.Core {
38  /// <summary>
39  /// The ClientCommunicator manages the whole communication with the client
40  /// </summary>
41  public class ClientCommunicator: IClientCommunicator {
42    private static Dictionary<Guid, DateTime> lastHeartbeats =
43      new Dictionary<Guid,DateTime>();
44
45    private static ReaderWriterLockSlim heartbeatLock =
46      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
47
48    private IClientAdapter clientAdapter;
49    private IJobAdapter jobAdapter;
50    private IJobResultsAdapter jobResultAdapter;
51    private ILifecycleManager lifecycleManager;
52    private IInternalJobManager jobManager;
53    private IScheduler scheduler;
54
55    /// <summary>
56    /// Initialization of the Adapters to the database
57    /// Initialization of Eventhandler for the lifecycle management
58    /// Initialization of lastHearbeats Dictionary
59    /// </summary>
60    public ClientCommunicator() {
61      clientAdapter = ServiceLocator.GetClientAdapter();
62      jobAdapter = ServiceLocator.GetJobAdapter();
63      jobResultAdapter = ServiceLocator.GetJobResultsAdapter();
64      lifecycleManager = ServiceLocator.GetLifecycleManager();
65      jobManager = ServiceLocator.GetJobManager() as
66        IInternalJobManager;
67      scheduler = ServiceLocator.GetScheduler();
68
69      lifecycleManager.RegisterHeartbeat(
70        new EventHandler(lifecycleManager_OnServerHeartbeat));
71    }
72
73    /// <summary>
74    /// Check if online clients send their hearbeats
75    /// if not -> set them offline and check if they where calculating a job
76    /// </summary>
77    /// <param name="sender"></param>
78    /// <param name="e"></param>
79    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
80      List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
81
82      foreach (ClientInfo client in allClients) {
83        if (client.State != State.offline && client.State != State.nullState) {
84          heartbeatLock.EnterUpgradeableReadLock();
85
86          if (!lastHeartbeats.ContainsKey(client.ClientId)) {
87            client.State = State.offline;
88            clientAdapter.Update(client);
89            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
90              jobManager.ResetJobsDependingOnResults(job);
91            }
92          } else {
93            DateTime lastHbOfClient = lastHeartbeats[client.ClientId];
94
95            TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
96            // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
97            if (dif.Seconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
98              // if client calculated jobs, the job must be reset
99              if (client.State == State.calculating) {
100                // check wich job the client was calculating and reset it
101                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
102                  jobManager.ResetJobsDependingOnResults(job);
103                }
104              }
105             
106              // client must be set offline
107              client.State = State.offline;
108              clientAdapter.Update(client);
109
110              heartbeatLock.EnterWriteLock();
111              lastHeartbeats.Remove(client.ClientId);
112              heartbeatLock.ExitWriteLock();
113            }
114          }
115
116          heartbeatLock.ExitUpgradeableReadLock();
117        } else {
118          heartbeatLock.EnterWriteLock();
119          if (lastHeartbeats.ContainsKey(client.ClientId))
120            lastHeartbeats.Remove(client.ClientId);
121          heartbeatLock.ExitWriteLock();
122        }
123      }
124    }
125
126    #region IClientCommunicator Members
127
128    /// <summary>
129    /// Login process for the client
130    /// A hearbeat entry is created as well (login is the first hearbeat)
131    /// </summary>
132    /// <param name="clientInfo"></param>
133    /// <returns></returns>
134    public Response Login(ClientInfo clientInfo) {
135      Response response = new Response();
136
137      heartbeatLock.EnterWriteLock();
138      if (lastHeartbeats.ContainsKey(clientInfo.ClientId)) {
139        lastHeartbeats[clientInfo.ClientId] = DateTime.Now;
140      } else {
141        lastHeartbeats.Add(clientInfo.ClientId, DateTime.Now);
142      }
143      heartbeatLock.ExitWriteLock();
144
145      // todo: allClients legacy ?
146      ClientInfo client = clientAdapter.GetById(clientInfo.ClientId);
147      if (client != null && client.State != State.offline && client.State != State.nullState) {
148        response.Success = false;
149        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
150        return response;
151      }
152      clientInfo.State = State.idle;
153      clientAdapter.Update(clientInfo);
154      response.Success = true;
155      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
156
157      return response;
158    }
159
160    /// <summary>
161    /// The client has to send regulary heartbeats
162    /// this hearbeats will be stored in the heartbeats dictionary
163    /// check if there is work for the client and send the client a response if he should pull a job
164    /// </summary>
165    /// <param name="hbData"></param>
166    /// <returns></returns>
167    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
168      ResponseHB response = new ResponseHB();
169
170      // check if the client is logged in
171      response.ActionRequest = new List<MessageContainer>();
172      if (clientAdapter.GetById(hbData.ClientId).State == State.offline ||
173          clientAdapter.GetById(hbData.ClientId).State == State.nullState) {
174        response.Success = false;
175        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
176        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
177        return response;
178      }
179
180      // save timestamp of this heartbeat
181      heartbeatLock.EnterWriteLock();
182      if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
183        lastHeartbeats[hbData.ClientId] = DateTime.Now;
184      } else {
185        lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
186      }
187      heartbeatLock.ExitWriteLock();
188
189      response.Success = true;
190      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
191      // check if client has a free core for a new job
192      // if true, ask scheduler for a new job for this client
193      if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
194        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
195      else
196        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
197
198      if (hbData.JobProgress != null) {
199        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
200        if (jobsOfClient == null || jobsOfClient.Count == 0) {
201          response.Success = false;
202          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
203          return response;
204        }
205
206        foreach (KeyValuePair<long, double> jobProgress in hbData.JobProgress) {
207          Job curJob = jobAdapter.GetById(jobProgress.Key);
208          if (curJob.Client == null || curJob.Client.ClientId != hbData.ClientId) {
209            response.Success = false;
210            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
211          } else if(curJob.State == State.finished) {
212            // another client has finished this job allready
213            // the client can abort it
214            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));       
215          } else {
216            // save job progress
217            curJob.Percentage = jobProgress.Value;
218            jobAdapter.Update(curJob);
219          }
220        }
221      }
222
223      return response;
224    }
225   
226    /// <summary>
227    /// if the client asked to pull a job he calls this method
228    /// the server selects a job and sends it to the client
229    /// </summary>
230    /// <param name="clientId"></param>
231    /// <returns></returns>
232    public ResponseJob SendJob(Guid clientId) {
233      ResponseJob response = new ResponseJob();
234
235      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
236      if (job2Calculate != null) {
237        response.Job = job2Calculate;
238        response.Success = true;
239        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
240      } else {
241        response.Success = false;
242        response.Job = null;
243        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
244      }
245      return response;
246    }
247
248    /// <summary>
249    /// the client can send job results during calculating
250    /// and will send a final job result when he finished calculating
251    /// these job results will be stored in the database
252    /// </summary>
253    /// <param name="clientId"></param>
254    /// <param name="jobId"></param>
255    /// <param name="result"></param>
256    /// <param name="exception"></param>
257    /// <param name="finished"></param>
258    /// <returns></returns>
259    public ResponseResultReceived ProcessJobResult(Guid clientId,
260      long jobId,
261      byte[] result,
262      double percentage,
263      Exception exception, 
264      bool finished) {
265      ResponseResultReceived response = new ResponseResultReceived();
266      ClientInfo client =
267        clientAdapter.GetById(clientId);
268
269      Job job =
270        jobAdapter.GetById(jobId);
271
272      if (job.Client == null)    {
273        response.Success = false;
274        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
275        return response;
276      }
277      if (job.Client.ClientId != clientId) {
278        response.Success = false;
279        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
280        return response;
281      }
282      if (job == null) {
283        response.Success = false;
284        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
285        return response;
286      }
287      if (job.State == State.finished) {
288        response.Success = true;
289        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
290        return response;       
291      }
292      if (job.State != State.calculating) {
293        response.Success = false;
294        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
295        return response;
296      }
297      job.SerializedJob = result;
298      job.Percentage = percentage;
299
300      if (finished) {
301        job.State = State.finished;
302        jobAdapter.Update(job);
303
304        client.State = State.idle;
305        clientAdapter.Update(client);
306
307        List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
308        foreach (JobResult currentResult in jobResults)
309          jobResultAdapter.Delete(currentResult);
310      }
311
312      JobResult jobResult =
313        new JobResult();
314      jobResult.Client = client;
315      jobResult.Job = job;
316      jobResult.Result = result;
317      jobResult.Percentage = percentage;
318      jobResult.Exception = exception;
319      jobResult.DateFinished = DateTime.Now;
320
321      jobResultAdapter.Update(jobResult);
322      jobAdapter.Update(job);
323
324      response.Success = true;
325      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
326      response.JobId = jobId;
327      response.finished = finished;
328
329      return response;
330    }
331
332    /// <summary>
333    /// when a client logs out the state will be set
334    /// and the entry in the last hearbeats dictionary will be removed
335    /// </summary>
336    /// <param name="clientId"></param>
337    /// <returns></returns>                       
338    public Response Logout(Guid clientId) {
339      Response response = new Response();
340
341      heartbeatLock.EnterWriteLock();
342      if (lastHeartbeats.ContainsKey(clientId))
343        lastHeartbeats.Remove(clientId);
344      heartbeatLock.ExitWriteLock();
345
346      ClientInfo client = clientAdapter.GetById(clientId);
347      if (client == null) {
348        response.Success = false;
349        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
350        return response;
351      }
352      List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
353      if (client.State == State.calculating) {
354        // check wich job the client was calculating and reset it
355        foreach (Job job in allJobs) {
356          if (job.Client.ClientId == client.ClientId) {
357            jobManager.ResetJobsDependingOnResults(job);
358          }
359        }
360      }
361
362      client.State = State.offline;
363      clientAdapter.Update(client);
364
365      response.Success = true;
366      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
367     
368      return response;
369    }
370
371    /// <summary>
372    /// If a client goes offline and restores a job he was calculating
373    /// he can ask the client if he still needs the job result
374    /// </summary>
375    /// <param name="jobId"></param>
376    /// <returns></returns>
377    public Response IsJobStillNeeded(long jobId) {
378      Response response = new Response();
379      Job job = jobAdapter.GetById(jobId);
380      if (job == null) {
381        response.Success = false;
382        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
383        return response;
384      }
385      if (job.State == State.finished) {
386        response.Success = true;
387        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
388        return response;
389      }
390      job.State = State.finished;
391      jobAdapter.Update(job);
392     
393      response.Success = true;
394      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
395      return response;
396    }
397
398    public ResponsePlugin SendPlugins(List<string> pluginList) {
399      throw new NotImplementedException();
400    }
401
402    #endregion
403  }
404}
Note: See TracBrowser for help on using the repository browser.