source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 1596

Last change on this file since 1596 was 1596, checked in by msteinbi, 12 years ago

Implemented SendPlugins method (#531)

File size: 21.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39
40namespace HeuristicLab.Hive.Server.Core {
41  /// <summary>
42  /// The ClientCommunicator manages the whole communication with the client
43  /// </summary>
44  public class ClientCommunicator: IClientCommunicator {
45    private static Dictionary<Guid, DateTime> lastHeartbeats =
46      new Dictionary<Guid,DateTime>();
47
48    private static ReaderWriterLockSlim heartbeatLock =
49      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
50
51    private ISessionFactory factory;
52    private ILifecycleManager lifecycleManager;
53    private IInternalJobManager jobManager;
54    private IScheduler scheduler;
55
56    /// <summary>
57    /// Initialization of the Adapters to the database
58    /// Initialization of Eventhandler for the lifecycle management
59    /// Initialization of lastHearbeats Dictionary
60    /// </summary>
61    public ClientCommunicator() {
62      factory = ServiceLocator.GetSessionFactory();
63     
64      lifecycleManager = ServiceLocator.GetLifecycleManager();
65      jobManager = ServiceLocator.GetJobManager() as
66        IInternalJobManager;
67      scheduler = ServiceLocator.GetScheduler();
68
69      lifecycleManager.RegisterHeartbeat(
70        new EventHandler(lifecycleManager_OnServerHeartbeat));
71    }
72
73    /// <summary>
74    /// Check if online clients send their hearbeats
75    /// if not -> set them offline and check if they where calculating a job
76    /// </summary>
77    /// <param name="sender"></param>
78    /// <param name="e"></param>
79    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
80      ISession session = factory.GetSessionForCurrentThread();
81      ITransaction tx = null;
82
83      try {
84        IClientAdapter clientAdapter =
85          session.GetDataAdapter<ClientInfo, IClientAdapter>();
86        IJobAdapter jobAdapter =
87          session.GetDataAdapter<Job, IJobAdapter>();
88
89        tx = session.BeginTransaction();
90
91        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
92
93        foreach (ClientInfo client in allClients) {
94          if (client.State != State.offline && client.State != State.nullState) {
95            heartbeatLock.EnterUpgradeableReadLock();
96
97            if (!lastHeartbeats.ContainsKey(client.Id)) {
98              client.State = State.offline;
99              clientAdapter.Update(client);
100              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
101                jobManager.ResetJobsDependingOnResults(job);
102              }
103            } else {
104              DateTime lastHbOfClient = lastHeartbeats[client.Id];
105
106              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
107              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
108              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
109                // if client calculated jobs, the job must be reset
110                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
111                  jobManager.ResetJobsDependingOnResults(job);
112                }
113
114                // client must be set offline
115                client.State = State.offline;
116                clientAdapter.Update(client);
117
118                heartbeatLock.EnterWriteLock();
119                lastHeartbeats.Remove(client.Id);
120                heartbeatLock.ExitWriteLock();
121              }
122            }
123
124            heartbeatLock.ExitUpgradeableReadLock();
125          } else {
126            heartbeatLock.EnterWriteLock();
127            if (lastHeartbeats.ContainsKey(client.Id))
128              lastHeartbeats.Remove(client.Id);
129            heartbeatLock.ExitWriteLock();
130          }
131        }
132        tx.Commit();
133      }
134      catch (Exception ex) {
135        if (tx != null)
136          tx.Rollback();
137        throw ex;
138      }
139      finally {
140        if (session != null)
141          session.EndSession();
142      }
143    }
144
145    #region IClientCommunicator Members
146
147    /// <summary>
148    /// Login process for the client
149    /// A hearbeat entry is created as well (login is the first hearbeat)
150    /// </summary>
151    /// <param name="clientInfo"></param>
152    /// <returns></returns>
153    public Response Login(ClientInfo clientInfo) {
154      ISession session = factory.GetSessionForCurrentThread();
155      ITransaction tx = null;
156
157      try {
158        IClientAdapter clientAdapter =
159          session.GetDataAdapter<ClientInfo, IClientAdapter>();
160
161        tx = session.BeginTransaction();
162
163        Response response = new Response();
164
165        heartbeatLock.EnterWriteLock();
166        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
167          lastHeartbeats[clientInfo.Id] = DateTime.Now;
168        } else {
169          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
170        }
171        heartbeatLock.ExitWriteLock();
172
173        ClientInfo client = clientAdapter.GetById(clientInfo.Id);
174        if (client != null && client.State != State.offline && client.State != State.nullState) {
175          response.Success = false;
176          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
177          return response;
178        }
179        clientInfo.State = State.idle;
180        clientAdapter.Update(clientInfo);
181        response.Success = true;
182        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
183
184        tx.Commit();
185        return response;
186      }
187      catch (Exception ex) {
188        if (tx != null)
189          tx.Rollback();
190        throw ex;
191      }
192      finally {
193        if (session != null)
194          session.EndSession();
195      }
196    }
197
198    /// <summary>
199    /// The client has to send regulary heartbeats
200    /// this hearbeats will be stored in the heartbeats dictionary
201    /// check if there is work for the client and send the client a response if he should pull a job
202    /// </summary>
203    /// <param name="hbData"></param>
204    /// <returns></returns>
205    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
206      ISession session = factory.GetSessionForCurrentThread();
207      ITransaction tx = null;
208
209      try {
210        IClientAdapter clientAdapter =
211          session.GetDataAdapter<ClientInfo, IClientAdapter>();
212
213        IJobAdapter jobAdapter =
214          session.GetDataAdapter<Job, IJobAdapter>();
215
216        tx = session.BeginTransaction();
217
218        ResponseHB response = new ResponseHB();
219        response.ActionRequest = new List<MessageContainer>();
220
221        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
222
223        // check if the client is logged in
224        if (client.State == State.offline || client.State == State.nullState) {
225          response.Success = false;
226          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
227          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
228          return response;
229        }
230
231        client.NrOfFreeCores = hbData.FreeCores;
232        client.FreeMemory = hbData.FreeMemory;
233
234        // save timestamp of this heartbeat
235        heartbeatLock.EnterWriteLock();
236        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
237          lastHeartbeats[hbData.ClientId] = DateTime.Now;
238        } else {
239          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
240        }
241        heartbeatLock.ExitWriteLock();
242
243        // check if client has a free core for a new job
244        // if true, ask scheduler for a new job for this client
245        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
246          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
247        else
248          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
249
250        response.Success = true;
251        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
252
253        processJobProcess(hbData, jobAdapter, clientAdapter, response);
254        clientAdapter.Update(client);
255
256        tx.Commit();
257        return response;
258      }
259      catch (Exception ex) {
260        if (tx != null)
261          tx.Rollback();
262        throw ex;
263      }
264      finally {
265        if (session != null)
266          session.EndSession();
267      }
268    }
269
270    /// <summary>
271    /// Process the Job progress sent by a client
272    /// </summary>
273    /// <param name="hbData"></param>
274    /// <param name="jobAdapter"></param>
275    /// <param name="clientAdapter"></param>
276    /// <param name="response"></param>
277    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
278      if (hbData.JobProgress != null) {
279        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
280        if (jobsOfClient == null || jobsOfClient.Count == 0) {
281          response.Success = false;
282          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
283          return;
284        }
285
286        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
287          Job curJob = jobAdapter.GetById(jobProgress.Key);
288          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
289            response.Success = false;
290            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
291          } else if (curJob.State == State.abort) {
292            // a request to abort the job has been set
293            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
294            curJob.State = State.offline;
295          } else {
296            // save job progress
297            curJob.Percentage = jobProgress.Value;
298            jobAdapter.Update(curJob);
299
300            if (curJob.State == State.requestSnapshot) {
301              // a request for a snapshot has been set
302              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
303              curJob.State = State.calculating;
304            }
305          }
306        }
307      }
308    }
309   
310    /// <summary>
311    /// if the client was told to pull a job he calls this method
312    /// the server selects a job and sends it to the client
313    /// </summary>
314    /// <param name="clientId"></param>
315    /// <returns></returns>
316    public ResponseJob SendJob(Guid clientId) {
317      ResponseJob response = new ResponseJob();
318
319      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
320      if (job2Calculate != null) {
321        response.Job = job2Calculate;
322        response.Success = true;
323        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
324      } else {
325        response.Success = false;
326        response.Job = null;
327        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
328      }
329      return response;
330    }
331
332    private ResponseResultReceived ProcessJobResult(Guid clientId,
333      Guid jobId,
334      byte[] result,
335      double percentage,
336      Exception exception,
337      bool finished) {
338      ISession session = factory.GetSessionForCurrentThread();
339      ITransaction tx = null;
340
341      try {
342        IClientAdapter clientAdapter =
343          session.GetDataAdapter<ClientInfo, IClientAdapter>();
344        IJobAdapter jobAdapter =
345          session.GetDataAdapter<Job, IJobAdapter>();
346        IJobResultsAdapter jobResultAdapter =
347          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
348
349        tx = session.BeginTransaction();
350
351        ResponseResultReceived response = new ResponseResultReceived();
352        ClientInfo client =
353          clientAdapter.GetById(clientId);
354
355        Job job =
356          jobAdapter.GetById(jobId);
357
358        if (job == null) {
359          response.Success = false;
360          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
361          response.JobId = jobId;
362          return response;
363        }
364        if (job.Client == null) {
365          response.Success = false;
366          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
367          response.JobId = jobId;
368          return response;
369        }
370        if (job.Client.Id != clientId) {
371          response.Success = false;
372          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
373          response.JobId = jobId;
374          return response;
375        }
376        if (job.State == State.finished) {
377          response.Success = true;
378          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
379          response.JobId = jobId;
380          return response;
381        }
382        if (job.State != State.calculating) {
383          response.Success = false;
384          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
385          response.JobId = jobId;
386          return response;
387        }
388        job.SerializedJob = result;
389        job.Percentage = percentage;
390
391        if (finished) {
392          job.State = State.finished;
393          jobAdapter.Update(job);
394
395          client.State = State.idle;
396          clientAdapter.Update(client);
397
398          List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
399          foreach (JobResult currentResult in jobResults)
400            jobResultAdapter.Delete(currentResult);
401        }
402
403        JobResult jobResult =
404          new JobResult();
405        jobResult.Client = client;
406        jobResult.Job = job;
407        jobResult.Result = result;
408        jobResult.Percentage = percentage;
409        jobResult.Exception = exception;
410        jobResult.DateFinished = DateTime.Now;
411
412        jobResultAdapter.Update(jobResult);
413        jobAdapter.Update(job);
414
415        response.Success = true;
416        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
417        response.JobId = jobId;
418        response.finished = finished;
419
420        tx.Commit();
421        return response;
422      }
423      catch (Exception ex) {
424        if (tx != null)
425          tx.Rollback();
426        throw ex;
427      }
428      finally {
429        if (session != null)
430          session.EndSession();
431      }
432    }
433
434
435    /// <summary>
436    /// the client can send job results during calculating
437    /// and will send a final job result when he finished calculating
438    /// these job results will be stored in the database
439    /// </summary>
440    /// <param name="clientId"></param>
441    /// <param name="jobId"></param>
442    /// <param name="result"></param>
443    /// <param name="exception"></param>
444    /// <param name="finished"></param>
445    /// <returns></returns>
446    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
447      Guid jobId,
448      byte[] result,
449      double percentage,
450      Exception exception) {
451
452      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
453    }
454
455
456    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
457      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
458    }
459
460    /// <summary>
461    /// when a client logs out the state will be set
462    /// and the entry in the last hearbeats dictionary will be removed
463    /// </summary>
464    /// <param name="clientId"></param>
465    /// <returns></returns>                       
466    public Response Logout(Guid clientId) {
467      ISession session = factory.GetSessionForCurrentThread();
468      ITransaction tx = null;
469
470      try {
471        IClientAdapter clientAdapter =
472          session.GetDataAdapter<ClientInfo, IClientAdapter>();
473        IJobAdapter jobAdapter =
474          session.GetDataAdapter<Job, IJobAdapter>();
475
476        tx = session.BeginTransaction();
477
478        Response response = new Response();
479
480        heartbeatLock.EnterWriteLock();
481        if (lastHeartbeats.ContainsKey(clientId))
482          lastHeartbeats.Remove(clientId);
483        heartbeatLock.ExitWriteLock();
484
485        ClientInfo client = clientAdapter.GetById(clientId);
486        if (client == null) {
487          response.Success = false;
488          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
489          return response;
490        }
491        List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
492        if (client.State == State.calculating) {
493          // check wich job the client was calculating and reset it
494          foreach (Job job in allJobs) {
495            if (job.Client.Id == client.Id) {
496              jobManager.ResetJobsDependingOnResults(job);
497            }
498          }
499        }
500
501        client.State = State.offline;
502        clientAdapter.Update(client);
503
504        response.Success = true;
505        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
506
507        tx.Commit();
508        return response;
509      }
510      catch (Exception ex) {
511        if (tx != null)
512          tx.Rollback();
513        throw ex;
514      }
515      finally {
516        if (session != null)
517          session.EndSession();
518      }
519    }
520
521    /// <summary>
522    /// If a client goes offline and restores a job he was calculating
523    /// he can ask the client if he still needs the job result
524    /// </summary>
525    /// <param name="jobId"></param>
526    /// <returns></returns>
527    public Response IsJobStillNeeded(Guid jobId) {
528      ISession session = factory.GetSessionForCurrentThread();
529      ITransaction tx = null;
530
531      try {
532        IJobAdapter jobAdapter =
533          session.GetDataAdapter<Job, IJobAdapter>();
534        tx = session.BeginTransaction();
535
536        Response response = new Response();
537        Job job = jobAdapter.GetById(jobId);
538        if (job == null) {
539          response.Success = false;
540          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
541          return response;
542        }
543        if (job.State == State.finished) {
544          response.Success = true;
545          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
546          return response;
547        }
548        job.State = State.finished;
549        jobAdapter.Update(job);
550
551        response.Success = true;
552        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
553        tx.Commit();
554        return response;
555      }
556      catch (Exception ex) {
557        if (tx != null)
558          tx.Rollback();
559        throw ex;
560      }
561      finally {
562        if (session != null)
563          session.EndSession();
564      }
565    }
566
567    public ResponsePlugin SendPlugins(List<HivePluginInfo> pluginList) {
568      ResponsePlugin response = new ResponsePlugin();
569      ICollection<PluginInfo> allActivePlugins = PluginManager.Manager.ActivePlugins;
570
571      foreach (HivePluginInfo pluginInfo in pluginList) {
572        foreach (PluginInfo currPlugin in allActivePlugins) {
573          if (currPlugin.Name == pluginInfo.Name
574              && currPlugin.Version.ToString() == pluginInfo.Version
575              && currPlugin.BuildDate == pluginInfo.BuildDate) {
576
577            CachedHivePluginInfo currCachedPlugin = new CachedHivePluginInfo {
578                Name = currPlugin.Name,
579                Version = currPlugin.Version.ToString(),
580                BuildDate = currPlugin.BuildDate };
581
582            foreach (String assemblyPath in currPlugin.Assemblies) {
583              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(assemblyPath));
584            }
585            response.Plugins.Add(currCachedPlugin);
586          }
587        }
588      }
589      response.Success = true;
590      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
591
592      return response;
593
594    }
595
596    #endregion
597  }
598}
Note: See TracBrowser for help on using the repository browser.