Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 1954

Last change on this file since 1954 was 1954, checked in by msteinbi, 15 years ago

login works also if client is logged in (#466)

File size: 22.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39
40namespace HeuristicLab.Hive.Server.Core {
41  /// <summary>
42  /// The ClientCommunicator manages the whole communication with the client
43  /// </summary>
44  public class ClientCommunicator: IClientCommunicator {
45    private static Dictionary<Guid, DateTime> lastHeartbeats =
46      new Dictionary<Guid,DateTime>();
47    private static Dictionary<Guid, int> newAssignedJobs =
48      new Dictionary<Guid, int>();
49
50    private static ReaderWriterLockSlim heartbeatLock =
51      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
52
53    private ISessionFactory factory;
54    private ILifecycleManager lifecycleManager;
55    private IInternalJobManager jobManager;
56    private IScheduler scheduler;
57
58    /// <summary>
59    /// Initialization of the Adapters to the database
60    /// Initialization of Eventhandler for the lifecycle management
61    /// Initialization of lastHearbeats Dictionary
62    /// </summary>
63    public ClientCommunicator() {
64      factory = ServiceLocator.GetSessionFactory();
65     
66      lifecycleManager = ServiceLocator.GetLifecycleManager();
67      jobManager = ServiceLocator.GetJobManager() as
68        IInternalJobManager;
69      scheduler = ServiceLocator.GetScheduler();
70
71      lifecycleManager.RegisterHeartbeat(
72        new EventHandler(lifecycleManager_OnServerHeartbeat));
73    }
74
75    /// <summary>
76    /// Check if online clients send their hearbeats
77    /// if not -> set them offline and check if they where calculating a job
78    /// </summary>
79    /// <param name="sender"></param>
80    /// <param name="e"></param>
81    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
82      ISession session = factory.GetSessionForCurrentThread();
83      ITransaction tx = null;
84
85      try {
86        IClientAdapter clientAdapter =
87          session.GetDataAdapter<ClientInfo, IClientAdapter>();
88        IJobAdapter jobAdapter =
89          session.GetDataAdapter<Job, IJobAdapter>();
90
91        tx = session.BeginTransaction();
92
93        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
94
95        foreach (ClientInfo client in allClients) {
96          if (client.State != State.offline && client.State != State.nullState) {
97            heartbeatLock.EnterUpgradeableReadLock();
98
99            if (!lastHeartbeats.ContainsKey(client.Id)) {
100              client.State = State.offline;
101              clientAdapter.Update(client);
102              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
103                jobManager.ResetJobsDependingOnResults(job);
104              }
105            } else {
106              DateTime lastHbOfClient = lastHeartbeats[client.Id];
107
108              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
109              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
110              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
111                // if client calculated jobs, the job must be reset
112                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
113                  jobManager.ResetJobsDependingOnResults(job);
114                  lock (newAssignedJobs) {
115                    if (newAssignedJobs.ContainsKey(job.Id))
116                      newAssignedJobs.Remove(job.Id);
117                  }
118                }
119
120                // client must be set offline
121                client.State = State.offline;
122                clientAdapter.Update(client);
123
124                heartbeatLock.EnterWriteLock();
125                lastHeartbeats.Remove(client.Id);
126                heartbeatLock.ExitWriteLock();
127              }
128            }
129
130            heartbeatLock.ExitUpgradeableReadLock();
131          } else {
132            heartbeatLock.EnterWriteLock();
133            if (lastHeartbeats.ContainsKey(client.Id))
134              lastHeartbeats.Remove(client.Id);
135            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
136              jobManager.ResetJobsDependingOnResults(job);
137            }
138            heartbeatLock.ExitWriteLock();
139          }
140        }
141        tx.Commit();
142      }
143      catch (Exception ex) {
144        if (tx != null)
145          tx.Rollback();
146        throw ex;
147      }
148      finally {
149        if (session != null)
150          session.EndSession();
151      }
152    }
153
154    #region IClientCommunicator Members
155
156    /// <summary>
157    /// Login process for the client
158    /// A hearbeat entry is created as well (login is the first hearbeat)
159    /// </summary>
160    /// <param name="clientInfo"></param>
161    /// <returns></returns>
162    public Response Login(ClientInfo clientInfo) {
163      ISession session = factory.GetSessionForCurrentThread();
164      ITransaction tx = null;
165
166      try {
167        IClientAdapter clientAdapter =
168          session.GetDataAdapter<ClientInfo, IClientAdapter>();
169
170        tx = session.BeginTransaction();
171
172        Response response = new Response();
173
174        heartbeatLock.EnterWriteLock();
175        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
176          lastHeartbeats[clientInfo.Id] = DateTime.Now;
177        } else {
178          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
179        }
180        heartbeatLock.ExitWriteLock();
181
182        clientInfo.State = State.idle;
183        clientAdapter.Update(clientInfo);
184        response.Success = true;
185        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
186
187        tx.Commit();
188        return response;
189      }
190      catch (Exception ex) {
191        if (tx != null)
192          tx.Rollback();
193        throw ex;
194      }
195      finally {
196        if (session != null)
197          session.EndSession();
198      }
199    }
200
201    /// <summary>
202    /// The client has to send regulary heartbeats
203    /// this hearbeats will be stored in the heartbeats dictionary
204    /// check if there is work for the client and send the client a response if he should pull a job
205    /// </summary>
206    /// <param name="hbData"></param>
207    /// <returns></returns>
208    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
209      ISession session = factory.GetSessionForCurrentThread();
210      ITransaction tx = null;
211
212      try {
213        IClientAdapter clientAdapter =
214          session.GetDataAdapter<ClientInfo, IClientAdapter>();
215
216        IJobAdapter jobAdapter =
217          session.GetDataAdapter<Job, IJobAdapter>();
218
219        tx = session.BeginTransaction();
220
221        ResponseHB response = new ResponseHB();
222        response.ActionRequest = new List<MessageContainer>();
223
224        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
225
226        // check if the client is logged in
227        if (client.State == State.offline || client.State == State.nullState) {
228          response.Success = false;
229          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
230          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
231          return response;
232        }
233
234        client.NrOfFreeCores = hbData.FreeCores;
235        client.FreeMemory = hbData.FreeMemory;
236
237        // save timestamp of this heartbeat
238        heartbeatLock.EnterWriteLock();
239        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
240          lastHeartbeats[hbData.ClientId] = DateTime.Now;
241        } else {
242          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
243        }
244        heartbeatLock.ExitWriteLock();
245
246        // check if client has a free core for a new job
247        // if true, ask scheduler for a new job for this client
248        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
249          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
250        else
251          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
252
253        response.Success = true;
254        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
255
256        processJobProcess(hbData, jobAdapter, clientAdapter, response);
257        clientAdapter.Update(client);
258
259        tx.Commit();
260        return response;
261      }
262      catch (Exception ex) {
263        if (tx != null)
264          tx.Rollback();
265        throw ex;
266      }
267      finally {
268        if (session != null)
269          session.EndSession();
270      }
271    }
272
273    /// <summary>
274    /// Process the Job progress sent by a client
275    /// </summary>
276    /// <param name="hbData"></param>
277    /// <param name="jobAdapter"></param>
278    /// <param name="clientAdapter"></param>
279    /// <param name="response"></param>
280    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
281      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
282        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
283        if (jobsOfClient == null || jobsOfClient.Count == 0) {
284          response.Success = false;
285          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
286          return;
287        }
288
289        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
290          Job curJob = jobAdapter.GetById(jobProgress.Key);
291          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
292            response.Success = false;
293            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
294          } else if (curJob.State == State.abort) {
295            // a request to abort the job has been set
296            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
297            curJob.State = State.finished;
298          } else {
299            // save job progress
300            curJob.Percentage = jobProgress.Value;
301
302            if (curJob.State == State.requestSnapshot) {
303              // a request for a snapshot has been set
304              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
305              curJob.State = State.requestSnapshotSent;
306            }
307          }
308          jobAdapter.Update(curJob);
309        }
310        foreach (Job currJob in jobsOfClient) {
311          bool found = false;
312          foreach (Guid jobId in hbData.JobProgress.Keys) {
313            if (jobId == currJob.Id) {
314              found = true;
315              break;
316            }
317          }
318          if (!found) {
319            lock (newAssignedJobs) {
320              if (newAssignedJobs.ContainsKey(currJob.Id)) {
321                newAssignedJobs[currJob.Id]--;
322
323                if (newAssignedJobs[currJob.Id] <= 0) {
324                  currJob.State = State.offline;
325                  jobAdapter.Update(currJob);
326                  newAssignedJobs.Remove(currJob.Id);
327                }
328              } else {
329                currJob.State = State.offline;
330                jobAdapter.Update(currJob);
331              }
332            } // lock
333          } else {
334            lock (newAssignedJobs) {
335              if (newAssignedJobs.ContainsKey(currJob.Id))
336                newAssignedJobs.Remove(currJob.Id);
337            }
338          }
339        }
340      }
341    }
342   
343    /// <summary>
344    /// if the client was told to pull a job he calls this method
345    /// the server selects a job and sends it to the client
346    /// </summary>
347    /// <param name="clientId"></param>
348    /// <returns></returns>
349    public ResponseJob SendJob(Guid clientId) {
350      ResponseJob response = new ResponseJob();
351
352      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
353      if (job2Calculate != null) {
354        response.Job = job2Calculate;
355        response.Success = true;
356        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
357        lock (newAssignedJobs) {
358          newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
359        }
360      } else {
361        response.Success = false;
362        response.Job = null;
363        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
364      }
365      return response;
366    }
367
368    private ResponseResultReceived ProcessJobResult(Guid clientId,
369      Guid jobId,
370      byte[] result,
371      double percentage,
372      Exception exception,
373      bool finished) {
374      ISession session = factory.GetSessionForCurrentThread();
375      ITransaction tx = null;
376
377      try {
378        IClientAdapter clientAdapter =
379          session.GetDataAdapter<ClientInfo, IClientAdapter>();
380        IJobAdapter jobAdapter =
381          session.GetDataAdapter<Job, IJobAdapter>();
382        IJobResultsAdapter jobResultAdapter =
383          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
384
385        tx = session.BeginTransaction();
386
387        ResponseResultReceived response = new ResponseResultReceived();
388        ClientInfo client =
389          clientAdapter.GetById(clientId);
390
391        Job job =
392          jobAdapter.GetById(jobId);
393       
394        if (job == null) {
395          response.Success = false;
396          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
397          response.JobId = jobId;
398          return response;
399        }
400        if (job.State == State.abort) {
401          response.Success = false;
402          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
403        }
404        if (job.Client == null) {
405          response.Success = false;
406          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
407          response.JobId = jobId;
408          return response;
409        }
410        if (job.Client.Id != clientId) {
411          response.Success = false;
412          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
413          response.JobId = jobId;
414          return response;
415        }
416        if (job.State == State.finished) {
417          response.Success = true;
418          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
419          response.JobId = jobId;
420          return response;
421        }
422        if (job.State == State.requestSnapshotSent) {
423          job.State = State.calculating;
424        }
425        if (job.State != State.calculating) {
426          response.Success = false;
427          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
428          response.JobId = jobId;
429          return response;
430        }
431        job.SerializedJob = result;
432        job.Percentage = percentage;
433
434        if (finished) {
435          job.State = State.finished;
436          jobAdapter.Update(job);
437
438          client.State = State.idle;
439          clientAdapter.Update(client);
440        }
441        List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
442        foreach (JobResult currentResult in jobResults)
443          jobResultAdapter.Delete(currentResult);
444
445        JobResult jobResult =
446          new JobResult();
447        jobResult.ClientId = client.Id;
448        jobResult.JobId = job.Id;
449        jobResult.Result = result;
450        jobResult.Percentage = percentage;
451        jobResult.Exception = exception;
452        jobResult.DateFinished = DateTime.Now;
453
454        jobResultAdapter.Update(jobResult);
455        jobAdapter.Update(job);
456
457        response.Success = true;
458        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
459        response.JobId = jobId;
460        response.finished = finished;
461
462        tx.Commit();
463        return response;
464      }
465      catch (Exception ex) {
466        if (tx != null)
467          tx.Rollback();
468        throw ex;
469      }
470      finally {
471        if (session != null)
472          session.EndSession();
473      }
474    }
475
476
477    /// <summary>
478    /// the client can send job results during calculating
479    /// and will send a final job result when he finished calculating
480    /// these job results will be stored in the database
481    /// </summary>
482    /// <param name="clientId"></param>
483    /// <param name="jobId"></param>
484    /// <param name="result"></param>
485    /// <param name="exception"></param>
486    /// <param name="finished"></param>
487    /// <returns></returns>
488    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
489      Guid jobId,
490      byte[] result,
491      double percentage,
492      Exception exception) {
493
494      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
495    }
496
497
498    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
499      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
500    }
501
502    /// <summary>
503    /// when a client logs out the state will be set
504    /// and the entry in the last hearbeats dictionary will be removed
505    /// </summary>
506    /// <param name="clientId"></param>
507    /// <returns></returns>                       
508    public Response Logout(Guid clientId) {
509      ISession session = factory.GetSessionForCurrentThread();
510      ITransaction tx = null;
511
512      try {
513        IClientAdapter clientAdapter =
514          session.GetDataAdapter<ClientInfo, IClientAdapter>();
515        IJobAdapter jobAdapter =
516          session.GetDataAdapter<Job, IJobAdapter>();
517
518        tx = session.BeginTransaction();
519
520        Response response = new Response();
521
522        heartbeatLock.EnterWriteLock();
523        if (lastHeartbeats.ContainsKey(clientId))
524          lastHeartbeats.Remove(clientId);
525        heartbeatLock.ExitWriteLock();
526
527        ClientInfo client = clientAdapter.GetById(clientId);
528        if (client == null) {
529          response.Success = false;
530          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
531          return response;
532        }
533        if (client.State == State.calculating) {
534          // check wich job the client was calculating and reset it
535          ICollection<Job> jobsOfClient = jobAdapter.GetJobsOf(client);
536          foreach (Job job in jobsOfClient) {
537            if (job.State != State.finished)
538              jobManager.ResetJobsDependingOnResults(job);
539          }
540        }
541
542        client.State = State.offline;
543        clientAdapter.Update(client);
544
545        response.Success = true;
546        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
547
548        tx.Commit();
549        return response;
550      }
551      catch (Exception ex) {
552        if (tx != null)
553          tx.Rollback();
554        throw ex;
555      }
556      finally {
557        if (session != null)
558          session.EndSession();
559      }
560    }
561
562    /// <summary>
563    /// If a client goes offline and restores a job he was calculating
564    /// he can ask the client if he still needs the job result
565    /// </summary>
566    /// <param name="jobId"></param>
567    /// <returns></returns>
568    public Response IsJobStillNeeded(Guid jobId) {
569      ISession session = factory.GetSessionForCurrentThread();
570      ITransaction tx = null;
571
572      try {
573        IJobAdapter jobAdapter =
574          session.GetDataAdapter<Job, IJobAdapter>();
575        tx = session.BeginTransaction();
576
577        Response response = new Response();
578        Job job = jobAdapter.GetById(jobId);
579        if (job == null) {
580          response.Success = false;
581          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
582          return response;
583        }
584        if (job.State == State.finished) {
585          response.Success = true;
586          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
587          return response;
588        }
589        job.State = State.finished;
590        jobAdapter.Update(job);
591
592        response.Success = true;
593        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
594        tx.Commit();
595        return response;
596      }
597      catch (Exception ex) {
598        if (tx != null)
599          tx.Rollback();
600        throw ex;
601      }
602      finally {
603        if (session != null)
604          session.EndSession();
605      }
606    }
607
608    public ResponsePlugin SendPlugins(List<HivePluginInfo> pluginList) {
609      ResponsePlugin response = new ResponsePlugin();
610      PluginManager.Manager.Initialize();
611      ICollection<PluginInfo> allActivePlugins = PluginManager.Manager.ActivePlugins;
612
613      foreach (HivePluginInfo pluginInfo in pluginList) {
614        // TODO: BuildDate deleted, not needed???
615        // TODO: Split version to major, minor and revision number
616        foreach (PluginInfo currPlugin in allActivePlugins) {
617          if (currPlugin.Name == pluginInfo.Name
618              && currPlugin.Version.ToString() == pluginInfo.Version) {
619
620            CachedHivePluginInfo currCachedPlugin = new CachedHivePluginInfo {
621                Name = currPlugin.Name,
622                Version = currPlugin.Version.ToString(),
623                BuildDate = currPlugin.BuildDate };
624
625            foreach (String assemblyPath in currPlugin.Assemblies) {
626              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(assemblyPath));
627            }
628            response.Plugins.Add(currCachedPlugin);
629          }
630        }
631      }
632      response.Success = true;
633      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
634
635      return response;
636
637    }
638
639    #endregion
640  }
641}
Note: See TracBrowser for help on using the repository browser.