Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 1530

Last change on this file since 1530 was 1530, checked in by gkronber, 15 years ago

Moved source files of plugins Hive ... Visualization.Test into version-specific sub-folders. #576

File size: 19.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38
39namespace HeuristicLab.Hive.Server.Core {
40  /// <summary>
41  /// The ClientCommunicator manages the whole communication with the client
42  /// </summary>
43  public class ClientCommunicator: IClientCommunicator {
44    private static Dictionary<Guid, DateTime> lastHeartbeats =
45      new Dictionary<Guid,DateTime>();
46
47    private static ReaderWriterLockSlim heartbeatLock =
48      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
49
50    private ISessionFactory factory;
51    private ILifecycleManager lifecycleManager;
52    private IInternalJobManager jobManager;
53    private IScheduler scheduler;
54
55    /// <summary>
56    /// Initialization of the Adapters to the database
57    /// Initialization of Eventhandler for the lifecycle management
58    /// Initialization of lastHearbeats Dictionary
59    /// </summary>
60    public ClientCommunicator() {
61      factory = ServiceLocator.GetSessionFactory();
62     
63      lifecycleManager = ServiceLocator.GetLifecycleManager();
64      jobManager = ServiceLocator.GetJobManager() as
65        IInternalJobManager;
66      scheduler = ServiceLocator.GetScheduler();
67
68      lifecycleManager.RegisterHeartbeat(
69        new EventHandler(lifecycleManager_OnServerHeartbeat));
70    }
71
72    /// <summary>
73    /// Check if online clients send their hearbeats
74    /// if not -> set them offline and check if they where calculating a job
75    /// </summary>
76    /// <param name="sender"></param>
77    /// <param name="e"></param>
78    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
79      ISession session = factory.GetSessionForCurrentThread();
80      ITransaction tx = null;
81
82      try {
83        IClientAdapter clientAdapter =
84          session.GetDataAdapter<ClientInfo, IClientAdapter>();
85        IJobAdapter jobAdapter =
86          session.GetDataAdapter<Job, IJobAdapter>();
87
88        tx = session.BeginTransaction();
89
90        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
91
92        foreach (ClientInfo client in allClients) {
93          if (client.State != State.offline && client.State != State.nullState) {
94            heartbeatLock.EnterUpgradeableReadLock();
95
96            if (!lastHeartbeats.ContainsKey(client.Id)) {
97              client.State = State.offline;
98              clientAdapter.Update(client);
99              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
100                jobManager.ResetJobsDependingOnResults(job);
101              }
102            } else {
103              DateTime lastHbOfClient = lastHeartbeats[client.Id];
104
105              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
106              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
107              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
108                // if client calculated jobs, the job must be reset
109                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
110                  jobManager.ResetJobsDependingOnResults(job);
111                }
112
113                // client must be set offline
114                client.State = State.offline;
115                clientAdapter.Update(client);
116
117                heartbeatLock.EnterWriteLock();
118                lastHeartbeats.Remove(client.Id);
119                heartbeatLock.ExitWriteLock();
120              }
121            }
122
123            heartbeatLock.ExitUpgradeableReadLock();
124          } else {
125            heartbeatLock.EnterWriteLock();
126            if (lastHeartbeats.ContainsKey(client.Id))
127              lastHeartbeats.Remove(client.Id);
128            heartbeatLock.ExitWriteLock();
129          }
130        }
131        tx.Commit();
132      }
133      catch (Exception ex) {
134        if (tx != null)
135          tx.Rollback();
136        throw ex;
137      }
138      finally {
139        if (session != null)
140          session.EndSession();
141      }
142    }
143
144    #region IClientCommunicator Members
145
146    /// <summary>
147    /// Login process for the client
148    /// A hearbeat entry is created as well (login is the first hearbeat)
149    /// </summary>
150    /// <param name="clientInfo"></param>
151    /// <returns></returns>
152    public Response Login(ClientInfo clientInfo) {
153      ISession session = factory.GetSessionForCurrentThread();
154      ITransaction tx = null;
155
156      try {
157        IClientAdapter clientAdapter =
158          session.GetDataAdapter<ClientInfo, IClientAdapter>();
159
160        tx = session.BeginTransaction();
161
162        Response response = new Response();
163
164        heartbeatLock.EnterWriteLock();
165        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
166          lastHeartbeats[clientInfo.Id] = DateTime.Now;
167        } else {
168          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
169        }
170        heartbeatLock.ExitWriteLock();
171
172        ClientInfo client = clientAdapter.GetById(clientInfo.Id);
173        if (client != null && client.State != State.offline && client.State != State.nullState) {
174          response.Success = false;
175          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
176          return response;
177        }
178        clientInfo.State = State.idle;
179        clientAdapter.Update(clientInfo);
180        response.Success = true;
181        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
182
183        tx.Commit();
184        return response;
185      }
186      catch (Exception ex) {
187        if (tx != null)
188          tx.Rollback();
189        throw ex;
190      }
191      finally {
192        if (session != null)
193          session.EndSession();
194      }
195    }
196
197    /// <summary>
198    /// The client has to send regulary heartbeats
199    /// this hearbeats will be stored in the heartbeats dictionary
200    /// check if there is work for the client and send the client a response if he should pull a job
201    /// </summary>
202    /// <param name="hbData"></param>
203    /// <returns></returns>
204    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
205      ISession session = factory.GetSessionForCurrentThread();
206      ITransaction tx = null;
207
208      try {
209        IClientAdapter clientAdapter =
210          session.GetDataAdapter<ClientInfo, IClientAdapter>();
211
212        IJobAdapter jobAdapter =
213          session.GetDataAdapter<Job, IJobAdapter>();
214
215        tx = session.BeginTransaction();
216
217        ResponseHB response = new ResponseHB();
218        response.ActionRequest = new List<MessageContainer>();
219
220        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
221
222        // check if the client is logged in
223        if (client.State == State.offline || client.State == State.nullState) {
224          response.Success = false;
225          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
226          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
227          return response;
228        }
229
230        client.NrOfFreeCores = hbData.FreeCores;
231        client.FreeMemory = hbData.FreeMemory;
232
233        // save timestamp of this heartbeat
234        heartbeatLock.EnterWriteLock();
235        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
236          lastHeartbeats[hbData.ClientId] = DateTime.Now;
237        } else {
238          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
239        }
240        heartbeatLock.ExitWriteLock();
241
242        response.Success = true;
243        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
244        // check if client has a free core for a new job
245        // if true, ask scheduler for a new job for this client
246        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
247          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
248        else
249          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
250
251        processJobProcess(hbData, jobAdapter, clientAdapter, response);
252        clientAdapter.Update(client);
253
254        tx.Commit();
255        return response;
256      }
257      catch (Exception ex) {
258        if (tx != null)
259          tx.Rollback();
260        throw ex;
261      }
262      finally {
263        if (session != null)
264          session.EndSession();
265      }
266    }
267
268    /// <summary>
269    /// Process the Job progress sent by a client
270    /// </summary>
271    /// <param name="hbData"></param>
272    /// <param name="jobAdapter"></param>
273    /// <param name="clientAdapter"></param>
274    /// <param name="response"></param>
275    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
276      if (hbData.JobProgress != null) {
277        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
278        if (jobsOfClient == null || jobsOfClient.Count == 0) {
279          response.Success = false;
280          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
281          return;
282        }
283
284        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
285          Job curJob = jobAdapter.GetById(jobProgress.Key);
286          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
287            response.Success = false;
288            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
289          } else if (curJob.State == State.finished) {
290            // another client has finished this job allready
291            // the client can abort it
292            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
293          } else {
294            // save job progress
295            curJob.Percentage = jobProgress.Value;
296            jobAdapter.Update(curJob);
297          }
298        }
299      }
300    }
301   
302    /// <summary>
303    /// if the client asked to pull a job he calls this method
304    /// the server selects a job and sends it to the client
305    /// </summary>
306    /// <param name="clientId"></param>
307    /// <returns></returns>
308    public ResponseJob SendJob(Guid clientId) {
309      ResponseJob response = new ResponseJob();
310
311      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
312      if (job2Calculate != null) {
313        response.Job = job2Calculate;
314        response.Success = true;
315        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
316      } else {
317        response.Success = false;
318        response.Job = null;
319        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
320      }
321      return response;
322    }
323
324    private ResponseResultReceived ProcessJobResult(Guid clientId,
325      Guid jobId,
326      byte[] result,
327      double percentage,
328      Exception exception,
329      bool finished) {
330      ISession session = factory.GetSessionForCurrentThread();
331      ITransaction tx = null;
332
333      try {
334        IClientAdapter clientAdapter =
335          session.GetDataAdapter<ClientInfo, IClientAdapter>();
336        IJobAdapter jobAdapter =
337          session.GetDataAdapter<Job, IJobAdapter>();
338        IJobResultsAdapter jobResultAdapter =
339          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
340
341        tx = session.BeginTransaction();
342
343        ResponseResultReceived response = new ResponseResultReceived();
344        ClientInfo client =
345          clientAdapter.GetById(clientId);
346
347        Job job =
348          jobAdapter.GetById(jobId);
349
350        if (job.Client == null) {
351          response.Success = false;
352          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
353          return response;
354        }
355        if (job.Client.Id != clientId) {
356          response.Success = false;
357          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
358          return response;
359        }
360        if (job == null) {
361          response.Success = false;
362          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
363          return response;
364        }
365        if (job.State == State.finished) {
366          response.Success = true;
367          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
368          return response;
369        }
370        if (job.State != State.calculating) {
371          response.Success = false;
372          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
373          return response;
374        }
375        job.SerializedJob = result;
376        job.Percentage = percentage;
377
378        if (finished) {
379          job.State = State.finished;
380          jobAdapter.Update(job);
381
382          client.State = State.idle;
383          clientAdapter.Update(client);
384
385          List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
386          foreach (JobResult currentResult in jobResults)
387            jobResultAdapter.Delete(currentResult);
388        }
389
390        JobResult jobResult =
391          new JobResult();
392        jobResult.Client = client;
393        jobResult.Job = job;
394        jobResult.Result = result;
395        jobResult.Percentage = percentage;
396        jobResult.Exception = exception;
397        jobResult.DateFinished = DateTime.Now;
398
399        jobResultAdapter.Update(jobResult);
400        jobAdapter.Update(job);
401
402        response.Success = true;
403        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
404        response.JobId = jobId;
405        response.finished = finished;
406
407        tx.Commit();
408        return response;
409      }
410      catch (Exception ex) {
411        if (tx != null)
412          tx.Rollback();
413        throw ex;
414      }
415      finally {
416        if (session != null)
417          session.EndSession();
418      }
419    }
420
421
422    /// <summary>
423    /// the client can send job results during calculating
424    /// and will send a final job result when he finished calculating
425    /// these job results will be stored in the database
426    /// </summary>
427    /// <param name="clientId"></param>
428    /// <param name="jobId"></param>
429    /// <param name="result"></param>
430    /// <param name="exception"></param>
431    /// <param name="finished"></param>
432    /// <returns></returns>
433    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
434      Guid jobId,
435      byte[] result,
436      double percentage,
437      Exception exception) {
438
439      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
440    }
441
442
443    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
444      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
445    }
446
447    /// <summary>
448    /// when a client logs out the state will be set
449    /// and the entry in the last hearbeats dictionary will be removed
450    /// </summary>
451    /// <param name="clientId"></param>
452    /// <returns></returns>                       
453    public Response Logout(Guid clientId) {
454      ISession session = factory.GetSessionForCurrentThread();
455      ITransaction tx = null;
456
457      try {
458        IClientAdapter clientAdapter =
459          session.GetDataAdapter<ClientInfo, IClientAdapter>();
460        IJobAdapter jobAdapter =
461          session.GetDataAdapter<Job, IJobAdapter>();
462
463        tx = session.BeginTransaction();
464
465        Response response = new Response();
466
467        heartbeatLock.EnterWriteLock();
468        if (lastHeartbeats.ContainsKey(clientId))
469          lastHeartbeats.Remove(clientId);
470        heartbeatLock.ExitWriteLock();
471
472        ClientInfo client = clientAdapter.GetById(clientId);
473        if (client == null) {
474          response.Success = false;
475          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
476          return response;
477        }
478        List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
479        if (client.State == State.calculating) {
480          // check wich job the client was calculating and reset it
481          foreach (Job job in allJobs) {
482            if (job.Client.Id == client.Id) {
483              jobManager.ResetJobsDependingOnResults(job);
484            }
485          }
486        }
487
488        client.State = State.offline;
489        clientAdapter.Update(client);
490
491        response.Success = true;
492        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
493
494        tx.Commit();
495        return response;
496      }
497      catch (Exception ex) {
498        if (tx != null)
499          tx.Rollback();
500        throw ex;
501      }
502      finally {
503        if (session != null)
504          session.EndSession();
505      }
506    }
507
508    /// <summary>
509    /// If a client goes offline and restores a job he was calculating
510    /// he can ask the client if he still needs the job result
511    /// </summary>
512    /// <param name="jobId"></param>
513    /// <returns></returns>
514    public Response IsJobStillNeeded(Guid jobId) {
515      ISession session = factory.GetSessionForCurrentThread();
516      ITransaction tx = null;
517
518      try {
519        IJobAdapter jobAdapter =
520          session.GetDataAdapter<Job, IJobAdapter>();
521        tx = session.BeginTransaction();
522
523        Response response = new Response();
524        Job job = jobAdapter.GetById(jobId);
525        if (job == null) {
526          response.Success = false;
527          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
528          return response;
529        }
530        if (job.State == State.finished) {
531          response.Success = true;
532          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
533          return response;
534        }
535        job.State = State.finished;
536        jobAdapter.Update(job);
537
538        response.Success = true;
539        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
540        tx.Commit();
541        return response;
542      }
543      catch (Exception ex) {
544        if (tx != null)
545          tx.Rollback();
546        throw ex;
547      }
548      finally {
549        if (session != null)
550          session.EndSession();
551      }
552    }
553
554    public ResponsePlugin SendPlugins(List<PluginInfo> pluginList) {
555
556
557      throw new NotImplementedException();
558    }
559
560    #endregion
561  }
562}
Note: See TracBrowser for help on using the repository browser.