source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 1593

Last change on this file since 1593 was 1593, checked in by msteinbi, 12 years ago

replaced PluginInfo with HivePluginInfo (#531)

File size: 20.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38
39namespace HeuristicLab.Hive.Server.Core {
40  /// <summary>
41  /// The ClientCommunicator manages the whole communication with the client
42  /// </summary>
43  public class ClientCommunicator: IClientCommunicator {
44    private static Dictionary<Guid, DateTime> lastHeartbeats =
45      new Dictionary<Guid,DateTime>();
46
47    private static ReaderWriterLockSlim heartbeatLock =
48      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
49
50    private ISessionFactory factory;
51    private ILifecycleManager lifecycleManager;
52    private IInternalJobManager jobManager;
53    private IScheduler scheduler;
54
55    /// <summary>
56    /// Initialization of the Adapters to the database
57    /// Initialization of Eventhandler for the lifecycle management
58    /// Initialization of lastHearbeats Dictionary
59    /// </summary>
60    public ClientCommunicator() {
61      factory = ServiceLocator.GetSessionFactory();
62     
63      lifecycleManager = ServiceLocator.GetLifecycleManager();
64      jobManager = ServiceLocator.GetJobManager() as
65        IInternalJobManager;
66      scheduler = ServiceLocator.GetScheduler();
67
68      lifecycleManager.RegisterHeartbeat(
69        new EventHandler(lifecycleManager_OnServerHeartbeat));
70    }
71
72    /// <summary>
73    /// Check if online clients send their hearbeats
74    /// if not -> set them offline and check if they where calculating a job
75    /// </summary>
76    /// <param name="sender"></param>
77    /// <param name="e"></param>
78    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
79      ISession session = factory.GetSessionForCurrentThread();
80      ITransaction tx = null;
81
82      try {
83        IClientAdapter clientAdapter =
84          session.GetDataAdapter<ClientInfo, IClientAdapter>();
85        IJobAdapter jobAdapter =
86          session.GetDataAdapter<Job, IJobAdapter>();
87
88        tx = session.BeginTransaction();
89
90        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
91
92        foreach (ClientInfo client in allClients) {
93          if (client.State != State.offline && client.State != State.nullState) {
94            heartbeatLock.EnterUpgradeableReadLock();
95
96            if (!lastHeartbeats.ContainsKey(client.Id)) {
97              client.State = State.offline;
98              clientAdapter.Update(client);
99              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
100                jobManager.ResetJobsDependingOnResults(job);
101              }
102            } else {
103              DateTime lastHbOfClient = lastHeartbeats[client.Id];
104
105              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
106              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
107              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
108                // if client calculated jobs, the job must be reset
109                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
110                  jobManager.ResetJobsDependingOnResults(job);
111                }
112
113                // client must be set offline
114                client.State = State.offline;
115                clientAdapter.Update(client);
116
117                heartbeatLock.EnterWriteLock();
118                lastHeartbeats.Remove(client.Id);
119                heartbeatLock.ExitWriteLock();
120              }
121            }
122
123            heartbeatLock.ExitUpgradeableReadLock();
124          } else {
125            heartbeatLock.EnterWriteLock();
126            if (lastHeartbeats.ContainsKey(client.Id))
127              lastHeartbeats.Remove(client.Id);
128            heartbeatLock.ExitWriteLock();
129          }
130        }
131        tx.Commit();
132      }
133      catch (Exception ex) {
134        if (tx != null)
135          tx.Rollback();
136        throw ex;
137      }
138      finally {
139        if (session != null)
140          session.EndSession();
141      }
142    }
143
144    #region IClientCommunicator Members
145
146    /// <summary>
147    /// Login process for the client
148    /// A hearbeat entry is created as well (login is the first hearbeat)
149    /// </summary>
150    /// <param name="clientInfo"></param>
151    /// <returns></returns>
152    public Response Login(ClientInfo clientInfo) {
153      ISession session = factory.GetSessionForCurrentThread();
154      ITransaction tx = null;
155
156      try {
157        IClientAdapter clientAdapter =
158          session.GetDataAdapter<ClientInfo, IClientAdapter>();
159
160        tx = session.BeginTransaction();
161
162        Response response = new Response();
163
164        heartbeatLock.EnterWriteLock();
165        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
166          lastHeartbeats[clientInfo.Id] = DateTime.Now;
167        } else {
168          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
169        }
170        heartbeatLock.ExitWriteLock();
171
172        ClientInfo client = clientAdapter.GetById(clientInfo.Id);
173        if (client != null && client.State != State.offline && client.State != State.nullState) {
174          response.Success = false;
175          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
176          return response;
177        }
178        clientInfo.State = State.idle;
179        clientAdapter.Update(clientInfo);
180        response.Success = true;
181        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
182
183        tx.Commit();
184        return response;
185      }
186      catch (Exception ex) {
187        if (tx != null)
188          tx.Rollback();
189        throw ex;
190      }
191      finally {
192        if (session != null)
193          session.EndSession();
194      }
195    }
196
197    /// <summary>
198    /// The client has to send regulary heartbeats
199    /// this hearbeats will be stored in the heartbeats dictionary
200    /// check if there is work for the client and send the client a response if he should pull a job
201    /// </summary>
202    /// <param name="hbData"></param>
203    /// <returns></returns>
204    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
205      ISession session = factory.GetSessionForCurrentThread();
206      ITransaction tx = null;
207
208      try {
209        IClientAdapter clientAdapter =
210          session.GetDataAdapter<ClientInfo, IClientAdapter>();
211
212        IJobAdapter jobAdapter =
213          session.GetDataAdapter<Job, IJobAdapter>();
214
215        tx = session.BeginTransaction();
216
217        ResponseHB response = new ResponseHB();
218        response.ActionRequest = new List<MessageContainer>();
219
220        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
221
222        // check if the client is logged in
223        if (client.State == State.offline || client.State == State.nullState) {
224          response.Success = false;
225          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
226          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
227          return response;
228        }
229
230        client.NrOfFreeCores = hbData.FreeCores;
231        client.FreeMemory = hbData.FreeMemory;
232
233        // save timestamp of this heartbeat
234        heartbeatLock.EnterWriteLock();
235        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
236          lastHeartbeats[hbData.ClientId] = DateTime.Now;
237        } else {
238          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
239        }
240        heartbeatLock.ExitWriteLock();
241
242        // check if client has a free core for a new job
243        // if true, ask scheduler for a new job for this client
244        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
245          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
246        else
247          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
248
249        response.Success = true;
250        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
251
252        processJobProcess(hbData, jobAdapter, clientAdapter, response);
253        clientAdapter.Update(client);
254
255        tx.Commit();
256        return response;
257      }
258      catch (Exception ex) {
259        if (tx != null)
260          tx.Rollback();
261        throw ex;
262      }
263      finally {
264        if (session != null)
265          session.EndSession();
266      }
267    }
268
269    /// <summary>
270    /// Process the Job progress sent by a client
271    /// </summary>
272    /// <param name="hbData"></param>
273    /// <param name="jobAdapter"></param>
274    /// <param name="clientAdapter"></param>
275    /// <param name="response"></param>
276    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
277      if (hbData.JobProgress != null) {
278        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
279        if (jobsOfClient == null || jobsOfClient.Count == 0) {
280          response.Success = false;
281          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
282          return;
283        }
284
285        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
286          Job curJob = jobAdapter.GetById(jobProgress.Key);
287          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
288            response.Success = false;
289            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
290          } else if (curJob.State == State.abort) {
291            // a request to abort the job has been set
292            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
293            curJob.State = State.offline;
294          } else {
295            // save job progress
296            curJob.Percentage = jobProgress.Value;
297            jobAdapter.Update(curJob);
298
299            if (curJob.State == State.requestSnapshot) {
300              // a request for a snapshot has been set
301              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
302              curJob.State = State.calculating;
303            }
304          }
305        }
306      }
307    }
308   
309    /// <summary>
310    /// if the client was told to pull a job he calls this method
311    /// the server selects a job and sends it to the client
312    /// </summary>
313    /// <param name="clientId"></param>
314    /// <returns></returns>
315    public ResponseJob SendJob(Guid clientId) {
316      ResponseJob response = new ResponseJob();
317
318      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
319      if (job2Calculate != null) {
320        response.Job = job2Calculate;
321        response.Success = true;
322        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
323      } else {
324        response.Success = false;
325        response.Job = null;
326        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
327      }
328      return response;
329    }
330
331    private ResponseResultReceived ProcessJobResult(Guid clientId,
332      Guid jobId,
333      byte[] result,
334      double percentage,
335      Exception exception,
336      bool finished) {
337      ISession session = factory.GetSessionForCurrentThread();
338      ITransaction tx = null;
339
340      try {
341        IClientAdapter clientAdapter =
342          session.GetDataAdapter<ClientInfo, IClientAdapter>();
343        IJobAdapter jobAdapter =
344          session.GetDataAdapter<Job, IJobAdapter>();
345        IJobResultsAdapter jobResultAdapter =
346          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
347
348        tx = session.BeginTransaction();
349
350        ResponseResultReceived response = new ResponseResultReceived();
351        ClientInfo client =
352          clientAdapter.GetById(clientId);
353
354        Job job =
355          jobAdapter.GetById(jobId);
356
357        if (job == null) {
358          response.Success = false;
359          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
360          response.JobId = jobId;
361          return response;
362        }
363        if (job.Client == null) {
364          response.Success = false;
365          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
366          response.JobId = jobId;
367          return response;
368        }
369        if (job.Client.Id != clientId) {
370          response.Success = false;
371          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
372          response.JobId = jobId;
373          return response;
374        }
375        if (job.State == State.finished) {
376          response.Success = true;
377          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
378          response.JobId = jobId;
379          return response;
380        }
381        if (job.State != State.calculating) {
382          response.Success = false;
383          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
384          response.JobId = jobId;
385          return response;
386        }
387        job.SerializedJob = result;
388        job.Percentage = percentage;
389
390        if (finished) {
391          job.State = State.finished;
392          jobAdapter.Update(job);
393
394          client.State = State.idle;
395          clientAdapter.Update(client);
396
397          List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
398          foreach (JobResult currentResult in jobResults)
399            jobResultAdapter.Delete(currentResult);
400        }
401
402        JobResult jobResult =
403          new JobResult();
404        jobResult.Client = client;
405        jobResult.Job = job;
406        jobResult.Result = result;
407        jobResult.Percentage = percentage;
408        jobResult.Exception = exception;
409        jobResult.DateFinished = DateTime.Now;
410
411        jobResultAdapter.Update(jobResult);
412        jobAdapter.Update(job);
413
414        response.Success = true;
415        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
416        response.JobId = jobId;
417        response.finished = finished;
418
419        tx.Commit();
420        return response;
421      }
422      catch (Exception ex) {
423        if (tx != null)
424          tx.Rollback();
425        throw ex;
426      }
427      finally {
428        if (session != null)
429          session.EndSession();
430      }
431    }
432
433
434    /// <summary>
435    /// the client can send job results during calculating
436    /// and will send a final job result when he finished calculating
437    /// these job results will be stored in the database
438    /// </summary>
439    /// <param name="clientId"></param>
440    /// <param name="jobId"></param>
441    /// <param name="result"></param>
442    /// <param name="exception"></param>
443    /// <param name="finished"></param>
444    /// <returns></returns>
445    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
446      Guid jobId,
447      byte[] result,
448      double percentage,
449      Exception exception) {
450
451      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
452    }
453
454
455    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
456      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
457    }
458
459    /// <summary>
460    /// when a client logs out the state will be set
461    /// and the entry in the last hearbeats dictionary will be removed
462    /// </summary>
463    /// <param name="clientId"></param>
464    /// <returns></returns>                       
465    public Response Logout(Guid clientId) {
466      ISession session = factory.GetSessionForCurrentThread();
467      ITransaction tx = null;
468
469      try {
470        IClientAdapter clientAdapter =
471          session.GetDataAdapter<ClientInfo, IClientAdapter>();
472        IJobAdapter jobAdapter =
473          session.GetDataAdapter<Job, IJobAdapter>();
474
475        tx = session.BeginTransaction();
476
477        Response response = new Response();
478
479        heartbeatLock.EnterWriteLock();
480        if (lastHeartbeats.ContainsKey(clientId))
481          lastHeartbeats.Remove(clientId);
482        heartbeatLock.ExitWriteLock();
483
484        ClientInfo client = clientAdapter.GetById(clientId);
485        if (client == null) {
486          response.Success = false;
487          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
488          return response;
489        }
490        List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
491        if (client.State == State.calculating) {
492          // check wich job the client was calculating and reset it
493          foreach (Job job in allJobs) {
494            if (job.Client.Id == client.Id) {
495              jobManager.ResetJobsDependingOnResults(job);
496            }
497          }
498        }
499
500        client.State = State.offline;
501        clientAdapter.Update(client);
502
503        response.Success = true;
504        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
505
506        tx.Commit();
507        return response;
508      }
509      catch (Exception ex) {
510        if (tx != null)
511          tx.Rollback();
512        throw ex;
513      }
514      finally {
515        if (session != null)
516          session.EndSession();
517      }
518    }
519
520    /// <summary>
521    /// If a client goes offline and restores a job he was calculating
522    /// he can ask the client if he still needs the job result
523    /// </summary>
524    /// <param name="jobId"></param>
525    /// <returns></returns>
526    public Response IsJobStillNeeded(Guid jobId) {
527      ISession session = factory.GetSessionForCurrentThread();
528      ITransaction tx = null;
529
530      try {
531        IJobAdapter jobAdapter =
532          session.GetDataAdapter<Job, IJobAdapter>();
533        tx = session.BeginTransaction();
534
535        Response response = new Response();
536        Job job = jobAdapter.GetById(jobId);
537        if (job == null) {
538          response.Success = false;
539          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
540          return response;
541        }
542        if (job.State == State.finished) {
543          response.Success = true;
544          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
545          return response;
546        }
547        job.State = State.finished;
548        jobAdapter.Update(job);
549
550        response.Success = true;
551        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
552        tx.Commit();
553        return response;
554      }
555      catch (Exception ex) {
556        if (tx != null)
557          tx.Rollback();
558        throw ex;
559      }
560      finally {
561        if (session != null)
562          session.EndSession();
563      }
564    }
565
566    public ResponsePlugin SendPlugins(List<HivePluginInfo> pluginList) {
567
568
569      throw new NotImplementedException();
570    }
571
572    #endregion
573  }
574}
Note: See TracBrowser for help on using the repository browser.