Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 2122

Last change on this file since 2122 was 2122, checked in by svonolfe, 15 years ago

Fixed issue related to the streaming of results (#680)

File size: 28.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39using System.Runtime.Serialization.Formatters.Binary;
40
41namespace HeuristicLab.Hive.Server.Core {
42  /// <summary>
43  /// The ClientCommunicator manages the whole communication with the client
44  /// </summary>
45  public class ClientCommunicator: IClientCommunicator,
46    IInternalClientCommunicator{
47    private static Dictionary<Guid, DateTime> lastHeartbeats =
48      new Dictionary<Guid,DateTime>();
49    private static Dictionary<Guid, int> newAssignedJobs =
50      new Dictionary<Guid, int>();
51    private static Dictionary<Guid, int> pendingJobs =
52      new Dictionary<Guid, int>();
53
54    private static ReaderWriterLockSlim heartbeatLock =
55      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
56
57    private ISessionFactory factory;
58    private ILifecycleManager lifecycleManager;
59    private IInternalJobManager jobManager;
60    private IScheduler scheduler;
61
62    private static int PENDING_TIMEOUT = 100;
63
64    /// <summary>
65    /// Initialization of the Adapters to the database
66    /// Initialization of Eventhandler for the lifecycle management
67    /// Initialization of lastHearbeats Dictionary
68    /// </summary>
69    public ClientCommunicator() {
70      factory = ServiceLocator.GetSessionFactory();
71     
72      lifecycleManager = ServiceLocator.GetLifecycleManager();
73      jobManager = ServiceLocator.GetJobManager() as
74        IInternalJobManager;
75      scheduler = ServiceLocator.GetScheduler();
76
77      lifecycleManager.RegisterHeartbeat(
78        new EventHandler(lifecycleManager_OnServerHeartbeat));
79    }
80
81    /// <summary>
82    /// Check if online clients send their hearbeats
83    /// if not -> set them offline and check if they where calculating a job
84    /// </summary>
85    /// <param name="sender"></param>
86    /// <param name="e"></param>
87    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
88      ISession session = factory.GetSessionForCurrentThread();
89      ITransaction tx = null;
90
91      try {
92        IClientAdapter clientAdapter =
93          session.GetDataAdapter<ClientInfo, IClientAdapter>();
94        IJobAdapter jobAdapter =
95          session.GetDataAdapter<Job, IJobAdapter>();
96
97        tx = session.BeginTransaction();
98
99        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
100
101        foreach (ClientInfo client in allClients) {
102          if (client.State != State.offline && client.State != State.nullState) {
103            heartbeatLock.EnterUpgradeableReadLock();
104
105            if (!lastHeartbeats.ContainsKey(client.Id)) {
106              client.State = State.offline;
107              clientAdapter.Update(client);
108              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
109                jobManager.ResetJobsDependingOnResults(job);
110              }
111            } else {
112              DateTime lastHbOfClient = lastHeartbeats[client.Id];
113
114              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
115              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
116              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
117                // if client calculated jobs, the job must be reset
118                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
119                  jobManager.ResetJobsDependingOnResults(job);
120                  lock (newAssignedJobs) {
121                    if (newAssignedJobs.ContainsKey(job.Id))
122                      newAssignedJobs.Remove(job.Id);
123                  }
124                }
125
126                // client must be set offline
127                client.State = State.offline;
128                clientAdapter.Update(client);
129
130                heartbeatLock.EnterWriteLock();
131                lastHeartbeats.Remove(client.Id);
132                heartbeatLock.ExitWriteLock();
133              }
134            }
135
136            heartbeatLock.ExitUpgradeableReadLock();
137          } else {
138            heartbeatLock.EnterWriteLock();
139            if (lastHeartbeats.ContainsKey(client.Id))
140              lastHeartbeats.Remove(client.Id);
141            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
142              jobManager.ResetJobsDependingOnResults(job);
143            }
144            heartbeatLock.ExitWriteLock();
145          }
146        }
147        CheckForPendingJobs(jobAdapter);
148
149        tx.Commit();
150      }
151      catch (Exception ex) {
152        if (tx != null)
153          tx.Rollback();
154        throw ex;
155      }
156      finally {
157        if (session != null)
158          session.EndSession();
159      }
160    }
161
162    private void CheckForPendingJobs(IJobAdapter jobAdapter) {
163      IList<Job> pendingJobsInDB = new List<Job>(jobAdapter.GetJobsByState(State.pending));
164
165      foreach (Job currJob in pendingJobsInDB) {
166        lock (pendingJobs) {
167          if (pendingJobs.ContainsKey(currJob.Id)) {
168            if (pendingJobs[currJob.Id] <= 0) {
169              currJob.State = State.offline;
170              jobAdapter.Update(currJob);
171            } else {
172              pendingJobs[currJob.Id]--;
173            }
174          }
175        }
176      }
177    }
178
179    #region IClientCommunicator Members
180
181    /// <summary>
182    /// Login process for the client
183    /// A hearbeat entry is created as well (login is the first hearbeat)
184    /// </summary>
185    /// <param name="clientInfo"></param>
186    /// <returns></returns>
187    public Response Login(ClientInfo clientInfo) {
188      ISession session = factory.GetSessionForCurrentThread();
189      ITransaction tx = null;
190
191      try {
192        IClientAdapter clientAdapter =
193          session.GetDataAdapter<ClientInfo, IClientAdapter>();
194
195        tx = session.BeginTransaction();
196
197        Response response = new Response();
198
199        heartbeatLock.EnterWriteLock();
200        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
201          lastHeartbeats[clientInfo.Id] = DateTime.Now;
202        } else {
203          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
204        }
205        heartbeatLock.ExitWriteLock();
206
207        clientInfo.State = State.idle;
208        clientAdapter.Update(clientInfo);
209        response.Success = true;
210        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
211
212        tx.Commit();
213        return response;
214      }
215      catch (Exception ex) {
216        if (tx != null)
217          tx.Rollback();
218        throw ex;
219      }
220      finally {
221        if (session != null)
222          session.EndSession();
223      }
224    }
225
226    /// <summary>
227    /// The client has to send regulary heartbeats
228    /// this hearbeats will be stored in the heartbeats dictionary
229    /// check if there is work for the client and send the client a response if he should pull a job
230    /// </summary>
231    /// <param name="hbData"></param>
232    /// <returns></returns>
233    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
234      ISession session = factory.GetSessionForCurrentThread();
235      ITransaction tx = null;
236
237      try {
238        IClientAdapter clientAdapter =
239          session.GetDataAdapter<ClientInfo, IClientAdapter>();
240
241        IJobAdapter jobAdapter =
242          session.GetDataAdapter<Job, IJobAdapter>();
243
244        tx = session.BeginTransaction();
245
246        ResponseHB response = new ResponseHB();
247        response.ActionRequest = new List<MessageContainer>();
248
249        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
250
251        // check if the client is logged in
252        if (client.State == State.offline || client.State == State.nullState) {
253          response.Success = false;
254          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
255          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
256          return response;
257        }
258
259        client.NrOfFreeCores = hbData.FreeCores;
260        client.FreeMemory = hbData.FreeMemory;
261
262        // save timestamp of this heartbeat
263        heartbeatLock.EnterWriteLock();
264        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
265          lastHeartbeats[hbData.ClientId] = DateTime.Now;
266        } else {
267          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
268        }
269        heartbeatLock.ExitWriteLock();
270
271        // check if client has a free core for a new job
272        // if true, ask scheduler for a new job for this client
273        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
274          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
275        else
276          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
277
278        response.Success = true;
279        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
280
281        processJobProcess(hbData, jobAdapter, clientAdapter, response);
282        clientAdapter.Update(client);
283
284        tx.Commit();
285        return response;
286      }
287      catch (Exception ex) {
288        if (tx != null)
289          tx.Rollback();
290        throw ex;
291      }
292      finally {
293        if (session != null)
294          session.EndSession();
295      }
296    }
297
298    /// <summary>
299    /// Process the Job progress sent by a client
300    /// </summary>
301    /// <param name="hbData"></param>
302    /// <param name="jobAdapter"></param>
303    /// <param name="clientAdapter"></param>
304    /// <param name="response"></param>
305    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
306      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
307        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
308        if (jobsOfClient == null || jobsOfClient.Count == 0) {
309          response.Success = false;
310          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
311          return;
312        }
313
314        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
315          Job curJob = jobAdapter.GetById(jobProgress.Key);
316          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
317            response.Success = false;
318            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
319          } else if (curJob.State == State.abort) {
320            // a request to abort the job has been set
321            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
322            curJob.State = State.finished;
323          } else {
324            // save job progress
325            curJob.Percentage = jobProgress.Value;
326
327            if (curJob.State == State.requestSnapshot) {
328              // a request for a snapshot has been set
329              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
330              curJob.State = State.requestSnapshotSent;
331            }
332          }
333          jobAdapter.Update(curJob);
334        }
335        foreach (Job currJob in jobsOfClient) {
336          bool found = false;
337          foreach (Guid jobId in hbData.JobProgress.Keys) {
338            if (jobId == currJob.Id) {
339              found = true;
340              break;
341            }
342          }
343          if (!found) {
344            lock (newAssignedJobs) {
345              if (newAssignedJobs.ContainsKey(currJob.Id)) {
346                newAssignedJobs[currJob.Id]--;
347
348                if (newAssignedJobs[currJob.Id] <= 0) {
349                  currJob.State = State.offline;
350                  jobAdapter.Update(currJob);
351                  newAssignedJobs.Remove(currJob.Id);
352                }
353              } else {
354                currJob.State = State.offline;
355                jobAdapter.Update(currJob);
356              }
357            } // lock
358          } else {
359            lock (newAssignedJobs) {
360              if (newAssignedJobs.ContainsKey(currJob.Id))
361                newAssignedJobs.Remove(currJob.Id);
362            }
363          }
364        }
365      }
366    }
367   
368    /// <summary>
369    /// if the client was told to pull a job he calls this method
370    /// the server selects a job and sends it to the client
371    /// </summary>
372    /// <param name="clientId"></param>
373    /// <returns></returns>
374    public ResponseSerializedJob SendSerializedJob(Guid clientId) {
375      ISession session = factory.GetSessionForCurrentThread();
376      ITransaction tx = null;
377
378      try {
379        IJobAdapter jobAdapter =
380          session.GetDataAdapter<Job, IJobAdapter>();
381
382        tx = session.BeginTransaction();
383
384        ResponseSerializedJob response = new ResponseSerializedJob();
385
386        Job job2Calculate = scheduler.GetNextJobForClient(clientId);
387        if (job2Calculate != null) {
388          SerializedJob computableJob =
389            jobAdapter.GetSerializedJob(job2Calculate.Id);
390
391          response.Job = computableJob;
392          response.Success = true;
393          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
394          lock (newAssignedJobs) {
395            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
396              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
397          }
398        } else {
399          response.Success = false;
400          response.Job = null;
401          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
402        }
403
404        tx.Commit();
405
406        return response;
407      }
408      catch (Exception ex) {
409        if (tx != null)
410          tx.Rollback();
411        throw ex;
412      }
413      finally {
414        if (session != null)
415          session.EndSession();
416      }
417    }
418
419    /// <summary>
420    /// if the client was told to pull a job he calls this method
421    /// the server selects a job and sends it to the client
422    /// </summary>
423    /// <param name="clientId"></param>
424    /// <returns></returns>
425    public ResponseJob SendJob(Guid clientId) {
426      ISession session = factory.GetSessionForCurrentThread();
427      ITransaction tx = null;
428
429      try {
430        IJobAdapter jobAdapter =
431          session.GetDataAdapter<Job, IJobAdapter>();
432
433        tx = session.BeginTransaction();
434
435        ResponseJob response = new ResponseJob();
436
437        Job job2Calculate = scheduler.GetNextJobForClient(clientId);
438        if (job2Calculate != null) {
439          response.Job = job2Calculate;
440          response.Success = true;
441          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
442          lock (newAssignedJobs) {
443            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
444              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
445          }
446        } else {
447          response.Success = false;
448          response.Job = null;
449          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
450        }
451
452        tx.Commit();
453
454        return response;
455      }
456      catch (Exception ex) {
457        if (tx != null)
458          tx.Rollback();
459        throw ex;
460      }
461      finally {
462        if (session != null)
463          session.EndSession();
464      }
465    }
466
467    public ResponseResultReceived ProcessJobResult(
468      Stream stream,
469      bool finished) {
470      ISession session = factory.GetSessionForCurrentThread();
471      ITransaction tx = null;
472      Stream jobResultStream = null;
473      Stream jobStream = null;
474
475      try {
476        BinaryFormatter formatter =
477          new BinaryFormatter();
478
479        JobResult result =
480          (JobResult)formatter.Deserialize(stream);
481
482        tx = session.BeginTransaction();
483
484        ResponseResultReceived response =
485          ProcessJobResult(
486          result.ClientId,
487          result.JobId,
488          new byte[] {},
489          result.Percentage,
490          result.Exception,
491          finished);
492
493        if (response.Success) {
494          //second deserialize the BLOB
495          IJobResultsAdapter jobResultsAdapter =
496            session.GetDataAdapter<JobResult, IJobResultsAdapter>();
497
498          IJobAdapter jobAdapter =
499            session.GetDataAdapter<Job, IJobAdapter>();
500
501          jobResultStream =
502            jobResultsAdapter.GetSerializedJobResultStream(response.JobResultId, true);
503
504          jobStream =
505            jobAdapter.GetSerializedJobStream(result.JobId, true);
506
507          byte[] buffer = new byte[3024];
508          int read = 0;
509          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
510            jobResultStream.Write(buffer, 0, read);
511            jobStream.Write(buffer, 0, read);
512          }
513
514          jobResultStream.Close();
515          jobStream.Close();
516
517          tx.Commit();
518        }
519
520        return response;
521      }
522      catch (Exception ex) {
523        if (tx != null)
524          tx.Rollback();
525        throw ex;
526      }
527      finally {
528        if (jobStream != null)
529          jobStream.Dispose();
530
531        if (jobResultStream != null)
532          jobResultStream.Dispose();
533
534        if (session != null)
535          session.EndSession();
536      }
537    }
538
539    private ResponseResultReceived ProcessJobResult(Guid clientId,
540      Guid jobId,
541      byte[] result,
542      double percentage,
543      Exception exception,
544      bool finished) {
545      ISession session = factory.GetSessionForCurrentThread();
546      ITransaction tx = null;
547
548      try {
549        IClientAdapter clientAdapter =
550          session.GetDataAdapter<ClientInfo, IClientAdapter>();
551        IJobAdapter jobAdapter =
552          session.GetDataAdapter<Job, IJobAdapter>();
553        IJobResultsAdapter jobResultAdapter =
554          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
555
556        tx = session.BeginTransaction();
557
558        ResponseResultReceived response = new ResponseResultReceived();
559        ClientInfo client =
560          clientAdapter.GetById(clientId);
561
562        SerializedJob job =
563          new SerializedJob();
564
565        if (job != null) {
566          job.JobInfo =
567            jobAdapter.GetById(jobId);
568        }
569       
570        if (job == null && job.JobInfo != null) {
571          response.Success = false;
572          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
573          response.JobId = jobId;
574          tx.Rollback();
575          return response;
576        }
577        if (job.JobInfo.State == State.abort) {
578          response.Success = false;
579          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
580          tx.Rollback();
581          return response;
582        }
583        if (job.JobInfo.Client == null) {
584          response.Success = false;
585          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
586          response.JobId = jobId;
587          tx.Rollback();
588          return response;
589        }
590        if (job.JobInfo.Client.Id != clientId) {
591          response.Success = false;
592          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
593          response.JobId = jobId;
594          tx.Rollback();
595          return response;
596        }
597        if (job.JobInfo.State == State.finished) {
598          response.Success = true;
599          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
600          response.JobId = jobId;
601          tx.Rollback();
602          return response;
603        }
604        if (job.JobInfo.State == State.requestSnapshotSent) {
605          job.JobInfo.State = State.calculating;
606        }
607        if (job.JobInfo.State != State.calculating &&
608          job.JobInfo.State != State.pending) {
609          response.Success = false;
610          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
611          response.JobId = jobId;
612          tx.Rollback();
613          return response;
614        }
615        job.JobInfo.Percentage = percentage;
616
617        if (finished) {
618          job.JobInfo.State = State.finished;
619        }
620
621        job.SerializedJobData = result;
622        jobAdapter.UpdateSerializedJob(job);
623
624        List<JobResult> jobResults = new List<JobResult>(
625          jobResultAdapter.GetResultsOf(job.JobInfo.Id));
626        foreach (JobResult currentResult in jobResults)
627          jobResultAdapter.Delete(currentResult);
628
629        SerializedJobResult serializedjobResult =
630          new SerializedJobResult();
631        JobResult jobResult = new JobResult();
632        jobResult.ClientId = client.Id;
633        jobResult.JobId = job.JobInfo.Id;
634        jobResult.Percentage = percentage;
635        jobResult.Exception = exception;
636        jobResult.DateFinished = DateTime.Now;
637        serializedjobResult.JobResult = jobResult;
638        serializedjobResult.SerializedJobResultData =
639          result;
640
641        jobResultAdapter.UpdateSerializedJobResult(serializedjobResult);
642
643        response.Success = true;
644        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
645        response.JobId = jobId;
646        response.finished = finished;
647        response.JobResultId = jobResult.Id;
648
649        tx.Commit();
650        return response;
651      }
652      catch (Exception ex) {
653        if (tx != null)
654          tx.Rollback();
655        throw ex;
656      }
657      finally {
658        if (session != null)
659          session.EndSession();
660      }
661    }
662
663
664    /// <summary>
665    /// the client can send job results during calculating
666    /// and will send a final job result when he finished calculating
667    /// these job results will be stored in the database
668    /// </summary>
669    /// <param name="clientId"></param>
670    /// <param name="jobId"></param>
671    /// <param name="result"></param>
672    /// <param name="exception"></param>
673    /// <param name="finished"></param>
674    /// <returns></returns>
675    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
676      Guid jobId,
677      byte[] result,
678      double percentage,
679      Exception exception) {
680
681      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
682    }
683
684
685    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
686      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
687    }
688
689    /// <summary>
690    /// when a client logs out the state will be set
691    /// and the entry in the last hearbeats dictionary will be removed
692    /// </summary>
693    /// <param name="clientId"></param>
694    /// <returns></returns>                       
695    public Response Logout(Guid clientId) {
696      ISession session = factory.GetSessionForCurrentThread();
697      ITransaction tx = null;
698
699      try {
700        IClientAdapter clientAdapter =
701          session.GetDataAdapter<ClientInfo, IClientAdapter>();
702        IJobAdapter jobAdapter =
703          session.GetDataAdapter<Job, IJobAdapter>();
704
705        tx = session.BeginTransaction();
706
707        Response response = new Response();
708
709        heartbeatLock.EnterWriteLock();
710        if (lastHeartbeats.ContainsKey(clientId))
711          lastHeartbeats.Remove(clientId);
712        heartbeatLock.ExitWriteLock();
713
714        ClientInfo client = clientAdapter.GetById(clientId);
715        if (client == null) {
716          response.Success = false;
717          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
718          return response;
719        }
720        if (client.State == State.calculating) {
721          // check wich job the client was calculating and reset it
722          ICollection<Job> jobsOfClient = jobAdapter.GetJobsOf(client);
723          foreach (Job job in jobsOfClient) {
724            if (job.State != State.finished)
725              jobManager.ResetJobsDependingOnResults(job);
726          }
727        }
728
729        client.State = State.offline;
730        clientAdapter.Update(client);
731
732        response.Success = true;
733        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
734
735        tx.Commit();
736        return response;
737      }
738      catch (Exception ex) {
739        if (tx != null)
740          tx.Rollback();
741        throw ex;
742      }
743      finally {
744        if (session != null)
745          session.EndSession();
746      }
747    }
748
749    /// <summary>
750    /// If a client goes offline and restores a job he was calculating
751    /// he can ask the client if he still needs the job result
752    /// </summary>
753    /// <param name="jobId"></param>
754    /// <returns></returns>
755    public Response IsJobStillNeeded(Guid jobId) {
756      ISession session = factory.GetSessionForCurrentThread();
757      ITransaction tx = null;
758
759      try {
760        IJobAdapter jobAdapter =
761          session.GetDataAdapter<Job, IJobAdapter>();
762        tx = session.BeginTransaction();
763
764        Response response = new Response();
765        Job job = jobAdapter.GetById(jobId);
766        if (job == null) {
767          response.Success = false;
768          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
769          return response;
770        }
771        if (job.State == State.finished) {
772          response.Success = true;
773          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
774          return response;
775        }
776        job.State = State.pending;
777        lock (pendingJobs) {
778          pendingJobs.Add(job.Id, PENDING_TIMEOUT);
779        }
780
781        jobAdapter.Update(job);
782
783        response.Success = true;
784        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
785        tx.Commit();
786        return response;
787      }
788      catch (Exception ex) {
789        if (tx != null)
790          tx.Rollback();
791        throw ex;
792      }
793      finally {
794        if (session != null)
795          session.EndSession();
796      }
797    }
798
799    public ResponsePlugin SendPlugins(List<HivePluginInfo> pluginList) {
800      ResponsePlugin response = new ResponsePlugin();
801      PluginManager.Manager.Initialize();
802      ICollection<PluginInfo> allActivePlugins = PluginManager.Manager.ActivePlugins;
803
804      foreach (HivePluginInfo pluginInfo in pluginList) {
805        // TODO: BuildDate deleted, not needed???
806        // TODO: Split version to major, minor and revision number
807        foreach (PluginInfo currPlugin in allActivePlugins) {
808          if (currPlugin.Name == pluginInfo.Name) {
809
810            CachedHivePluginInfo currCachedPlugin = new CachedHivePluginInfo {
811                Name = currPlugin.Name,
812                Version = currPlugin.Version.ToString(),
813                BuildDate = currPlugin.BuildDate };
814
815            foreach (String assemblyPath in currPlugin.Assemblies) {
816              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(assemblyPath));
817            }
818            response.Plugins.Add(currCachedPlugin);
819          }
820        }
821      }
822      response.Success = true;
823      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
824
825      return response;
826
827    }
828
829    #endregion
830  }
831}
Note: See TracBrowser for help on using the repository browser.