Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 1577

Last change on this file since 1577 was 1577, checked in by msteinbi, 15 years ago

added request and abort snapshot (#572)

File size: 20.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38
39namespace HeuristicLab.Hive.Server.Core {
40  /// <summary>
41  /// The ClientCommunicator manages the whole communication with the client
42  /// </summary>
43  public class ClientCommunicator: IClientCommunicator {
44    private static Dictionary<Guid, DateTime> lastHeartbeats =
45      new Dictionary<Guid,DateTime>();
46
47    private static ReaderWriterLockSlim heartbeatLock =
48      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
49
50    private ISessionFactory factory;
51    private ILifecycleManager lifecycleManager;
52    private IInternalJobManager jobManager;
53    private IScheduler scheduler;
54
55    /// <summary>
56    /// Initialization of the Adapters to the database
57    /// Initialization of Eventhandler for the lifecycle management
58    /// Initialization of lastHearbeats Dictionary
59    /// </summary>
60    public ClientCommunicator() {
61      factory = ServiceLocator.GetSessionFactory();
62     
63      lifecycleManager = ServiceLocator.GetLifecycleManager();
64      jobManager = ServiceLocator.GetJobManager() as
65        IInternalJobManager;
66      scheduler = ServiceLocator.GetScheduler();
67
68      lifecycleManager.RegisterHeartbeat(
69        new EventHandler(lifecycleManager_OnServerHeartbeat));
70    }
71
72    /// <summary>
73    /// Check if online clients send their hearbeats
74    /// if not -> set them offline and check if they where calculating a job
75    /// </summary>
76    /// <param name="sender"></param>
77    /// <param name="e"></param>
78    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
79      ISession session = factory.GetSessionForCurrentThread();
80      ITransaction tx = null;
81
82      try {
83        IClientAdapter clientAdapter =
84          session.GetDataAdapter<ClientInfo, IClientAdapter>();
85        IJobAdapter jobAdapter =
86          session.GetDataAdapter<Job, IJobAdapter>();
87
88        tx = session.BeginTransaction();
89
90        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
91
92        foreach (ClientInfo client in allClients) {
93          if (client.State != State.offline && client.State != State.nullState) {
94            heartbeatLock.EnterUpgradeableReadLock();
95
96            if (!lastHeartbeats.ContainsKey(client.Id)) {
97              client.State = State.offline;
98              clientAdapter.Update(client);
99              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
100                jobManager.ResetJobsDependingOnResults(job);
101              }
102            } else {
103              DateTime lastHbOfClient = lastHeartbeats[client.Id];
104
105              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
106              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
107              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
108                // if client calculated jobs, the job must be reset
109                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
110                  jobManager.ResetJobsDependingOnResults(job);
111                }
112
113                // client must be set offline
114                client.State = State.offline;
115                clientAdapter.Update(client);
116
117                heartbeatLock.EnterWriteLock();
118                lastHeartbeats.Remove(client.Id);
119                heartbeatLock.ExitWriteLock();
120              }
121            }
122
123            heartbeatLock.ExitUpgradeableReadLock();
124          } else {
125            heartbeatLock.EnterWriteLock();
126            if (lastHeartbeats.ContainsKey(client.Id))
127              lastHeartbeats.Remove(client.Id);
128            heartbeatLock.ExitWriteLock();
129          }
130        }
131        tx.Commit();
132      }
133      catch (Exception ex) {
134        if (tx != null)
135          tx.Rollback();
136        throw ex;
137      }
138      finally {
139        if (session != null)
140          session.EndSession();
141      }
142    }
143
144    #region IClientCommunicator Members
145
146    /// <summary>
147    /// Login process for the client
148    /// A hearbeat entry is created as well (login is the first hearbeat)
149    /// </summary>
150    /// <param name="clientInfo"></param>
151    /// <returns></returns>
152    public Response Login(ClientInfo clientInfo) {
153      ISession session = factory.GetSessionForCurrentThread();
154      ITransaction tx = null;
155
156      try {
157        IClientAdapter clientAdapter =
158          session.GetDataAdapter<ClientInfo, IClientAdapter>();
159
160        tx = session.BeginTransaction();
161
162        Response response = new Response();
163
164        heartbeatLock.EnterWriteLock();
165        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
166          lastHeartbeats[clientInfo.Id] = DateTime.Now;
167        } else {
168          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
169        }
170        heartbeatLock.ExitWriteLock();
171
172        ClientInfo client = clientAdapter.GetById(clientInfo.Id);
173        if (client != null && client.State != State.offline && client.State != State.nullState) {
174          response.Success = false;
175          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
176          return response;
177        }
178        clientInfo.State = State.idle;
179        clientAdapter.Update(clientInfo);
180        response.Success = true;
181        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
182
183        tx.Commit();
184        return response;
185      }
186      catch (Exception ex) {
187        if (tx != null)
188          tx.Rollback();
189        throw ex;
190      }
191      finally {
192        if (session != null)
193          session.EndSession();
194      }
195    }
196
197    /// <summary>
198    /// The client has to send regulary heartbeats
199    /// this hearbeats will be stored in the heartbeats dictionary
200    /// check if there is work for the client and send the client a response if he should pull a job
201    /// </summary>
202    /// <param name="hbData"></param>
203    /// <returns></returns>
204    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
205      ISession session = factory.GetSessionForCurrentThread();
206      ITransaction tx = null;
207
208      try {
209        IClientAdapter clientAdapter =
210          session.GetDataAdapter<ClientInfo, IClientAdapter>();
211
212        IJobAdapter jobAdapter =
213          session.GetDataAdapter<Job, IJobAdapter>();
214
215        tx = session.BeginTransaction();
216
217        ResponseHB response = new ResponseHB();
218        response.ActionRequest = new List<MessageContainer>();
219
220        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
221
222        // check if the client is logged in
223        if (client.State == State.offline || client.State == State.nullState) {
224          response.Success = false;
225          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
226          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
227          return response;
228        }
229
230        client.NrOfFreeCores = hbData.FreeCores;
231        client.FreeMemory = hbData.FreeMemory;
232
233        // save timestamp of this heartbeat
234        heartbeatLock.EnterWriteLock();
235        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
236          lastHeartbeats[hbData.ClientId] = DateTime.Now;
237        } else {
238          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
239        }
240        heartbeatLock.ExitWriteLock();
241
242        // check if client has a free core for a new job
243        // if true, ask scheduler for a new job for this client
244        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
245          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
246        else
247          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
248
249        response.Success = true;
250        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
251
252        processJobProcess(hbData, jobAdapter, clientAdapter, response);
253        clientAdapter.Update(client);
254
255        tx.Commit();
256        return response;
257      }
258      catch (Exception ex) {
259        if (tx != null)
260          tx.Rollback();
261        throw ex;
262      }
263      finally {
264        if (session != null)
265          session.EndSession();
266      }
267    }
268
269    /// <summary>
270    /// Process the Job progress sent by a client
271    /// </summary>
272    /// <param name="hbData"></param>
273    /// <param name="jobAdapter"></param>
274    /// <param name="clientAdapter"></param>
275    /// <param name="response"></param>
276    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
277      if (hbData.JobProgress != null) {
278        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
279        if (jobsOfClient == null || jobsOfClient.Count == 0) {
280          response.Success = false;
281          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
282          return;
283        }
284
285        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
286          Job curJob = jobAdapter.GetById(jobProgress.Key);
287          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
288            response.Success = false;
289            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
290          } else if (curJob.State == State.abort) {
291            // a request to abort the job has been set
292            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
293          } else {
294            // save job progress
295            curJob.Percentage = jobProgress.Value;
296            jobAdapter.Update(curJob);
297
298            if (curJob.State == State.requestSnapshot) {
299              // a request for a snapshot has been set
300              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
301            }
302          }
303        }
304      }
305    }
306   
307    /// <summary>
308    /// if the client was told to pull a job he calls this method
309    /// the server selects a job and sends it to the client
310    /// </summary>
311    /// <param name="clientId"></param>
312    /// <returns></returns>
313    public ResponseJob SendJob(Guid clientId) {
314      ResponseJob response = new ResponseJob();
315
316      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
317      if (job2Calculate != null) {
318        response.Job = job2Calculate;
319        response.Success = true;
320        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
321      } else {
322        response.Success = false;
323        response.Job = null;
324        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
325      }
326      return response;
327    }
328
329    private ResponseResultReceived ProcessJobResult(Guid clientId,
330      Guid jobId,
331      byte[] result,
332      double percentage,
333      Exception exception,
334      bool finished) {
335      ISession session = factory.GetSessionForCurrentThread();
336      ITransaction tx = null;
337
338      try {
339        IClientAdapter clientAdapter =
340          session.GetDataAdapter<ClientInfo, IClientAdapter>();
341        IJobAdapter jobAdapter =
342          session.GetDataAdapter<Job, IJobAdapter>();
343        IJobResultsAdapter jobResultAdapter =
344          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
345
346        tx = session.BeginTransaction();
347
348        ResponseResultReceived response = new ResponseResultReceived();
349        ClientInfo client =
350          clientAdapter.GetById(clientId);
351
352        Job job =
353          jobAdapter.GetById(jobId);
354
355        if (job == null) {
356          response.Success = false;
357          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
358          response.JobId = jobId;
359          return response;
360        }
361        if (job.Client == null) {
362          response.Success = false;
363          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
364          response.JobId = jobId;
365          return response;
366        }
367        if (job.Client.Id != clientId) {
368          response.Success = false;
369          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
370          response.JobId = jobId;
371          return response;
372        }
373        if (job.State == State.finished) {
374          response.Success = true;
375          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
376          response.JobId = jobId;
377          return response;
378        }
379        if (job.State != State.calculating) {
380          response.Success = false;
381          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
382          response.JobId = jobId;
383          return response;
384        }
385        job.SerializedJob = result;
386        job.Percentage = percentage;
387
388        if (finished) {
389          job.State = State.finished;
390          jobAdapter.Update(job);
391
392          client.State = State.idle;
393          clientAdapter.Update(client);
394
395          List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
396          foreach (JobResult currentResult in jobResults)
397            jobResultAdapter.Delete(currentResult);
398        }
399
400        JobResult jobResult =
401          new JobResult();
402        jobResult.Client = client;
403        jobResult.Job = job;
404        jobResult.Result = result;
405        jobResult.Percentage = percentage;
406        jobResult.Exception = exception;
407        jobResult.DateFinished = DateTime.Now;
408
409        jobResultAdapter.Update(jobResult);
410        jobAdapter.Update(job);
411
412        response.Success = true;
413        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
414        response.JobId = jobId;
415        response.finished = finished;
416
417        tx.Commit();
418        return response;
419      }
420      catch (Exception ex) {
421        if (tx != null)
422          tx.Rollback();
423        throw ex;
424      }
425      finally {
426        if (session != null)
427          session.EndSession();
428      }
429    }
430
431
432    /// <summary>
433    /// the client can send job results during calculating
434    /// and will send a final job result when he finished calculating
435    /// these job results will be stored in the database
436    /// </summary>
437    /// <param name="clientId"></param>
438    /// <param name="jobId"></param>
439    /// <param name="result"></param>
440    /// <param name="exception"></param>
441    /// <param name="finished"></param>
442    /// <returns></returns>
443    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
444      Guid jobId,
445      byte[] result,
446      double percentage,
447      Exception exception) {
448
449      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
450    }
451
452
453    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
454      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
455    }
456
457    /// <summary>
458    /// when a client logs out the state will be set
459    /// and the entry in the last hearbeats dictionary will be removed
460    /// </summary>
461    /// <param name="clientId"></param>
462    /// <returns></returns>                       
463    public Response Logout(Guid clientId) {
464      ISession session = factory.GetSessionForCurrentThread();
465      ITransaction tx = null;
466
467      try {
468        IClientAdapter clientAdapter =
469          session.GetDataAdapter<ClientInfo, IClientAdapter>();
470        IJobAdapter jobAdapter =
471          session.GetDataAdapter<Job, IJobAdapter>();
472
473        tx = session.BeginTransaction();
474
475        Response response = new Response();
476
477        heartbeatLock.EnterWriteLock();
478        if (lastHeartbeats.ContainsKey(clientId))
479          lastHeartbeats.Remove(clientId);
480        heartbeatLock.ExitWriteLock();
481
482        ClientInfo client = clientAdapter.GetById(clientId);
483        if (client == null) {
484          response.Success = false;
485          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
486          return response;
487        }
488        List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
489        if (client.State == State.calculating) {
490          // check wich job the client was calculating and reset it
491          foreach (Job job in allJobs) {
492            if (job.Client.Id == client.Id) {
493              jobManager.ResetJobsDependingOnResults(job);
494            }
495          }
496        }
497
498        client.State = State.offline;
499        clientAdapter.Update(client);
500
501        response.Success = true;
502        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
503
504        tx.Commit();
505        return response;
506      }
507      catch (Exception ex) {
508        if (tx != null)
509          tx.Rollback();
510        throw ex;
511      }
512      finally {
513        if (session != null)
514          session.EndSession();
515      }
516    }
517
518    /// <summary>
519    /// If a client goes offline and restores a job he was calculating
520    /// he can ask the client if he still needs the job result
521    /// </summary>
522    /// <param name="jobId"></param>
523    /// <returns></returns>
524    public Response IsJobStillNeeded(Guid jobId) {
525      ISession session = factory.GetSessionForCurrentThread();
526      ITransaction tx = null;
527
528      try {
529        IJobAdapter jobAdapter =
530          session.GetDataAdapter<Job, IJobAdapter>();
531        tx = session.BeginTransaction();
532
533        Response response = new Response();
534        Job job = jobAdapter.GetById(jobId);
535        if (job == null) {
536          response.Success = false;
537          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
538          return response;
539        }
540        if (job.State == State.finished) {
541          response.Success = true;
542          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
543          return response;
544        }
545        job.State = State.finished;
546        jobAdapter.Update(job);
547
548        response.Success = true;
549        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
550        tx.Commit();
551        return response;
552      }
553      catch (Exception ex) {
554        if (tx != null)
555          tx.Rollback();
556        throw ex;
557      }
558      finally {
559        if (session != null)
560          session.EndSession();
561      }
562    }
563
564    public ResponsePlugin SendPlugins(List<PluginInfo> pluginList) {
565
566
567      throw new NotImplementedException();
568    }
569
570    #endregion
571  }
572}
Note: See TracBrowser for help on using the repository browser.