Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Modeling Database Backend/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 2219

Last change on this file since 2219 was 2123, checked in by svonolfe, 16 years ago

Avoided possible race conditions when streaming data from/into the DB (#680)

File size: 28.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39using System.Runtime.Serialization.Formatters.Binary;
40
41namespace HeuristicLab.Hive.Server.Core {
42  /// <summary>
43  /// The ClientCommunicator manages the whole communication with the client
44  /// </summary>
45  public class ClientCommunicator: IClientCommunicator,
46    IInternalClientCommunicator{
47    private static Dictionary<Guid, DateTime> lastHeartbeats =
48      new Dictionary<Guid,DateTime>();
49    private static Dictionary<Guid, int> newAssignedJobs =
50      new Dictionary<Guid, int>();
51    private static Dictionary<Guid, int> pendingJobs =
52      new Dictionary<Guid, int>();
53
54    private static ReaderWriterLockSlim heartbeatLock =
55      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
56
57    private ISessionFactory factory;
58    private ILifecycleManager lifecycleManager;
59    private IInternalJobManager jobManager;
60    private IScheduler scheduler;
61
62    private static int PENDING_TIMEOUT = 100;
63
64    /// <summary>
65    /// Initialization of the Adapters to the database
66    /// Initialization of Eventhandler for the lifecycle management
67    /// Initialization of lastHearbeats Dictionary
68    /// </summary>
69    public ClientCommunicator() {
70      factory = ServiceLocator.GetSessionFactory();
71     
72      lifecycleManager = ServiceLocator.GetLifecycleManager();
73      jobManager = ServiceLocator.GetJobManager() as
74        IInternalJobManager;
75      scheduler = ServiceLocator.GetScheduler();
76
77      lifecycleManager.RegisterHeartbeat(
78        new EventHandler(lifecycleManager_OnServerHeartbeat));
79    }
80
81    /// <summary>
82    /// Check if online clients send their hearbeats
83    /// if not -> set them offline and check if they where calculating a job
84    /// </summary>
85    /// <param name="sender"></param>
86    /// <param name="e"></param>
87    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
88      ISession session = factory.GetSessionForCurrentThread();
89      ITransaction tx = null;
90
91      try {
92        IClientAdapter clientAdapter =
93          session.GetDataAdapter<ClientInfo, IClientAdapter>();
94        IJobAdapter jobAdapter =
95          session.GetDataAdapter<Job, IJobAdapter>();
96
97        tx = session.BeginTransaction();
98
99        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
100
101        foreach (ClientInfo client in allClients) {
102          if (client.State != State.offline && client.State != State.nullState) {
103            heartbeatLock.EnterUpgradeableReadLock();
104
105            if (!lastHeartbeats.ContainsKey(client.Id)) {
106              client.State = State.offline;
107              clientAdapter.Update(client);
108              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
109                jobManager.ResetJobsDependingOnResults(job);
110              }
111            } else {
112              DateTime lastHbOfClient = lastHeartbeats[client.Id];
113
114              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
115              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
116              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
117                // if client calculated jobs, the job must be reset
118                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
119                  jobManager.ResetJobsDependingOnResults(job);
120                  lock (newAssignedJobs) {
121                    if (newAssignedJobs.ContainsKey(job.Id))
122                      newAssignedJobs.Remove(job.Id);
123                  }
124                }
125
126                // client must be set offline
127                client.State = State.offline;
128                clientAdapter.Update(client);
129
130                heartbeatLock.EnterWriteLock();
131                lastHeartbeats.Remove(client.Id);
132                heartbeatLock.ExitWriteLock();
133              }
134            }
135
136            heartbeatLock.ExitUpgradeableReadLock();
137          } else {
138            heartbeatLock.EnterWriteLock();
139            if (lastHeartbeats.ContainsKey(client.Id))
140              lastHeartbeats.Remove(client.Id);
141            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
142              jobManager.ResetJobsDependingOnResults(job);
143            }
144            heartbeatLock.ExitWriteLock();
145          }
146        }
147        CheckForPendingJobs(jobAdapter);
148
149        tx.Commit();
150      }
151      catch (Exception ex) {
152        if (tx != null)
153          tx.Rollback();
154        throw ex;
155      }
156      finally {
157        if (session != null)
158          session.EndSession();
159      }
160    }
161
162    private void CheckForPendingJobs(IJobAdapter jobAdapter) {
163      IList<Job> pendingJobsInDB = new List<Job>(jobAdapter.GetJobsByState(State.pending));
164
165      foreach (Job currJob in pendingJobsInDB) {
166        lock (pendingJobs) {
167          if (pendingJobs.ContainsKey(currJob.Id)) {
168            if (pendingJobs[currJob.Id] <= 0) {
169              currJob.State = State.offline;
170              jobAdapter.Update(currJob);
171            } else {
172              pendingJobs[currJob.Id]--;
173            }
174          }
175        }
176      }
177    }
178
179    #region IClientCommunicator Members
180
181    /// <summary>
182    /// Login process for the client
183    /// A hearbeat entry is created as well (login is the first hearbeat)
184    /// </summary>
185    /// <param name="clientInfo"></param>
186    /// <returns></returns>
187    public Response Login(ClientInfo clientInfo) {
188      ISession session = factory.GetSessionForCurrentThread();
189      ITransaction tx = null;
190
191      try {
192        IClientAdapter clientAdapter =
193          session.GetDataAdapter<ClientInfo, IClientAdapter>();
194
195        tx = session.BeginTransaction();
196
197        Response response = new Response();
198
199        heartbeatLock.EnterWriteLock();
200        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
201          lastHeartbeats[clientInfo.Id] = DateTime.Now;
202        } else {
203          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
204        }
205        heartbeatLock.ExitWriteLock();
206
207        clientInfo.State = State.idle;
208        clientAdapter.Update(clientInfo);
209        response.Success = true;
210        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
211
212        tx.Commit();
213        return response;
214      }
215      catch (Exception ex) {
216        if (tx != null)
217          tx.Rollback();
218        throw ex;
219      }
220      finally {
221        if (session != null)
222          session.EndSession();
223      }
224    }
225
226    /// <summary>
227    /// The client has to send regulary heartbeats
228    /// this hearbeats will be stored in the heartbeats dictionary
229    /// check if there is work for the client and send the client a response if he should pull a job
230    /// </summary>
231    /// <param name="hbData"></param>
232    /// <returns></returns>
233    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
234      ISession session = factory.GetSessionForCurrentThread();
235      ITransaction tx = null;
236
237      try {
238        IClientAdapter clientAdapter =
239          session.GetDataAdapter<ClientInfo, IClientAdapter>();
240
241        IJobAdapter jobAdapter =
242          session.GetDataAdapter<Job, IJobAdapter>();
243
244        tx = session.BeginTransaction();
245
246        ResponseHB response = new ResponseHB();
247        response.ActionRequest = new List<MessageContainer>();
248
249        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
250
251        // check if the client is logged in
252        if (client.State == State.offline || client.State == State.nullState) {
253          response.Success = false;
254          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
255          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
256          return response;
257        }
258
259        client.NrOfFreeCores = hbData.FreeCores;
260        client.FreeMemory = hbData.FreeMemory;
261
262        // save timestamp of this heartbeat
263        heartbeatLock.EnterWriteLock();
264        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
265          lastHeartbeats[hbData.ClientId] = DateTime.Now;
266        } else {
267          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
268        }
269        heartbeatLock.ExitWriteLock();
270
271        // check if client has a free core for a new job
272        // if true, ask scheduler for a new job for this client
273        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
274          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
275        else
276          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
277
278        response.Success = true;
279        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
280
281        processJobProcess(hbData, jobAdapter, clientAdapter, response);
282        clientAdapter.Update(client);
283
284        tx.Commit();
285        return response;
286      }
287      catch (Exception ex) {
288        if (tx != null)
289          tx.Rollback();
290        throw ex;
291      }
292      finally {
293        if (session != null)
294          session.EndSession();
295      }
296    }
297
298    /// <summary>
299    /// Process the Job progress sent by a client
300    /// </summary>
301    /// <param name="hbData"></param>
302    /// <param name="jobAdapter"></param>
303    /// <param name="clientAdapter"></param>
304    /// <param name="response"></param>
305    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
306      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
307        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
308        if (jobsOfClient == null || jobsOfClient.Count == 0) {
309          response.Success = false;
310          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
311          return;
312        }
313
314        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
315          Job curJob = jobAdapter.GetById(jobProgress.Key);
316          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
317            response.Success = false;
318            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
319          } else if (curJob.State == State.abort) {
320            // a request to abort the job has been set
321            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
322            curJob.State = State.finished;
323          } else {
324            // save job progress
325            curJob.Percentage = jobProgress.Value;
326
327            if (curJob.State == State.requestSnapshot) {
328              // a request for a snapshot has been set
329              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
330              curJob.State = State.requestSnapshotSent;
331            }
332          }
333          jobAdapter.Update(curJob);
334        }
335        foreach (Job currJob in jobsOfClient) {
336          bool found = false;
337          foreach (Guid jobId in hbData.JobProgress.Keys) {
338            if (jobId == currJob.Id) {
339              found = true;
340              break;
341            }
342          }
343          if (!found) {
344            lock (newAssignedJobs) {
345              if (newAssignedJobs.ContainsKey(currJob.Id)) {
346                newAssignedJobs[currJob.Id]--;
347
348                if (newAssignedJobs[currJob.Id] <= 0) {
349                  currJob.State = State.offline;
350                  jobAdapter.Update(currJob);
351                  newAssignedJobs.Remove(currJob.Id);
352                }
353              } else {
354                currJob.State = State.offline;
355                jobAdapter.Update(currJob);
356              }
357            } // lock
358          } else {
359            lock (newAssignedJobs) {
360              if (newAssignedJobs.ContainsKey(currJob.Id))
361                newAssignedJobs.Remove(currJob.Id);
362            }
363          }
364        }
365      }
366    }
367   
368    /// <summary>
369    /// if the client was told to pull a job he calls this method
370    /// the server selects a job and sends it to the client
371    /// </summary>
372    /// <param name="clientId"></param>
373    /// <returns></returns>
374    public ResponseSerializedJob SendSerializedJob(Guid clientId) {
375      ISession session = factory.GetSessionForCurrentThread();
376      ITransaction tx = null;
377
378      try {
379        IJobAdapter jobAdapter =
380          session.GetDataAdapter<Job, IJobAdapter>();
381
382        tx = session.BeginTransaction();
383
384        ResponseSerializedJob response = new ResponseSerializedJob();
385
386        Job job2Calculate = scheduler.GetNextJobForClient(clientId);
387        if (job2Calculate != null) {
388          SerializedJob computableJob =
389            jobAdapter.GetSerializedJob(job2Calculate.Id);
390
391          response.Job = computableJob;
392          response.Success = true;
393          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
394          lock (newAssignedJobs) {
395            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
396              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
397          }
398        } else {
399          response.Success = false;
400          response.Job = null;
401          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
402        }
403
404        tx.Commit();
405
406        return response;
407      }
408      catch (Exception ex) {
409        if (tx != null)
410          tx.Rollback();
411        throw ex;
412      }
413      finally {
414        if (session != null)
415          session.EndSession();
416      }
417    }
418
419    /// <summary>
420    /// if the client was told to pull a job he calls this method
421    /// the server selects a job and sends it to the client
422    /// </summary>
423    /// <param name="clientId"></param>
424    /// <returns></returns>
425    public ResponseJob SendJob(Guid clientId) {
426      ISession session = factory.GetSessionForCurrentThread();
427      ITransaction tx = null;
428
429      try {
430        IJobAdapter jobAdapter =
431          session.GetDataAdapter<Job, IJobAdapter>();
432
433        tx = session.BeginTransaction();
434
435        ResponseJob response = new ResponseJob();
436
437        Job job2Calculate = scheduler.GetNextJobForClient(clientId);
438        if (job2Calculate != null) {
439          response.Job = job2Calculate;
440          response.Success = true;
441          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
442          lock (newAssignedJobs) {
443            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
444              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
445          }
446        } else {
447          response.Success = false;
448          response.Job = null;
449          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
450        }
451
452        tx.Commit();
453
454        return response;
455      }
456      catch (Exception ex) {
457        if (tx != null)
458          tx.Rollback();
459        throw ex;
460      }
461      finally {
462        if (session != null)
463          session.EndSession();
464      }
465    }
466
467    public ResponseResultReceived ProcessJobResult(
468      Stream stream,
469      bool finished) {
470      ISession session = factory.GetSessionForCurrentThread();
471      ITransaction tx = null;
472      Stream jobResultStream = null;
473      Stream jobStream = null;
474
475      try {
476        BinaryFormatter formatter =
477          new BinaryFormatter();
478
479        JobResult result =
480          (JobResult)formatter.Deserialize(stream);
481
482        //important - repeatable read isolation level is required here,
483        //otherwise race conditions could occur when writing the stream into the DB
484        tx = session.BeginTransaction(
485          TransactionIsolationLevel.RepeatableRead);
486
487        ResponseResultReceived response =
488          ProcessJobResult(
489          result.ClientId,
490          result.JobId,
491          new byte[] {},
492          result.Percentage,
493          result.Exception,
494          finished);
495
496        if (response.Success) {
497          //second deserialize the BLOB
498          IJobResultsAdapter jobResultsAdapter =
499            session.GetDataAdapter<JobResult, IJobResultsAdapter>();
500
501          IJobAdapter jobAdapter =
502            session.GetDataAdapter<Job, IJobAdapter>();
503
504          jobResultStream =
505            jobResultsAdapter.GetSerializedJobResultStream(response.JobResultId, true);
506
507          jobStream =
508            jobAdapter.GetSerializedJobStream(result.JobId, true);
509
510          byte[] buffer = new byte[3024];
511          int read = 0;
512          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
513            jobResultStream.Write(buffer, 0, read);
514            jobStream.Write(buffer, 0, read);
515          }
516
517          jobResultStream.Close();
518          jobStream.Close();
519
520          tx.Commit();
521        }
522
523        return response;
524      }
525      catch (Exception ex) {
526        if (tx != null)
527          tx.Rollback();
528        throw ex;
529      }
530      finally {
531        if (jobStream != null)
532          jobStream.Dispose();
533
534        if (jobResultStream != null)
535          jobResultStream.Dispose();
536
537        if (session != null)
538          session.EndSession();
539      }
540    }
541
542    private ResponseResultReceived ProcessJobResult(Guid clientId,
543      Guid jobId,
544      byte[] result,
545      double percentage,
546      Exception exception,
547      bool finished) {
548      ISession session = factory.GetSessionForCurrentThread();
549      ITransaction tx = null;
550
551      try {
552        IClientAdapter clientAdapter =
553          session.GetDataAdapter<ClientInfo, IClientAdapter>();
554        IJobAdapter jobAdapter =
555          session.GetDataAdapter<Job, IJobAdapter>();
556        IJobResultsAdapter jobResultAdapter =
557          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
558
559        tx = session.BeginTransaction();
560
561        ResponseResultReceived response = new ResponseResultReceived();
562        ClientInfo client =
563          clientAdapter.GetById(clientId);
564
565        SerializedJob job =
566          new SerializedJob();
567
568        if (job != null) {
569          job.JobInfo =
570            jobAdapter.GetById(jobId);
571        }
572       
573        if (job == null && job.JobInfo != null) {
574          response.Success = false;
575          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
576          response.JobId = jobId;
577          tx.Rollback();
578          return response;
579        }
580        if (job.JobInfo.State == State.abort) {
581          response.Success = false;
582          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
583          tx.Rollback();
584          return response;
585        }
586        if (job.JobInfo.Client == null) {
587          response.Success = false;
588          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
589          response.JobId = jobId;
590          tx.Rollback();
591          return response;
592        }
593        if (job.JobInfo.Client.Id != clientId) {
594          response.Success = false;
595          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
596          response.JobId = jobId;
597          tx.Rollback();
598          return response;
599        }
600        if (job.JobInfo.State == State.finished) {
601          response.Success = true;
602          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
603          response.JobId = jobId;
604          tx.Rollback();
605          return response;
606        }
607        if (job.JobInfo.State == State.requestSnapshotSent) {
608          job.JobInfo.State = State.calculating;
609        }
610        if (job.JobInfo.State != State.calculating &&
611          job.JobInfo.State != State.pending) {
612          response.Success = false;
613          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
614          response.JobId = jobId;
615          tx.Rollback();
616          return response;
617        }
618        job.JobInfo.Percentage = percentage;
619
620        if (finished) {
621          job.JobInfo.State = State.finished;
622        }
623
624        job.SerializedJobData = result;
625        jobAdapter.UpdateSerializedJob(job);
626
627        List<JobResult> jobResults = new List<JobResult>(
628          jobResultAdapter.GetResultsOf(job.JobInfo.Id));
629        foreach (JobResult currentResult in jobResults)
630          jobResultAdapter.Delete(currentResult);
631
632        SerializedJobResult serializedjobResult =
633          new SerializedJobResult();
634        JobResult jobResult = new JobResult();
635        jobResult.ClientId = client.Id;
636        jobResult.JobId = job.JobInfo.Id;
637        jobResult.Percentage = percentage;
638        jobResult.Exception = exception;
639        jobResult.DateFinished = DateTime.Now;
640        serializedjobResult.JobResult = jobResult;
641        serializedjobResult.SerializedJobResultData =
642          result;
643
644        jobResultAdapter.UpdateSerializedJobResult(serializedjobResult);
645
646        response.Success = true;
647        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
648        response.JobId = jobId;
649        response.finished = finished;
650        response.JobResultId = jobResult.Id;
651
652        tx.Commit();
653        return response;
654      }
655      catch (Exception ex) {
656        if (tx != null)
657          tx.Rollback();
658        throw ex;
659      }
660      finally {
661        if (session != null)
662          session.EndSession();
663      }
664    }
665
666
667    /// <summary>
668    /// the client can send job results during calculating
669    /// and will send a final job result when he finished calculating
670    /// these job results will be stored in the database
671    /// </summary>
672    /// <param name="clientId"></param>
673    /// <param name="jobId"></param>
674    /// <param name="result"></param>
675    /// <param name="exception"></param>
676    /// <param name="finished"></param>
677    /// <returns></returns>
678    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
679      Guid jobId,
680      byte[] result,
681      double percentage,
682      Exception exception) {
683
684      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
685    }
686
687
688    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
689      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
690    }
691
692    /// <summary>
693    /// when a client logs out the state will be set
694    /// and the entry in the last hearbeats dictionary will be removed
695    /// </summary>
696    /// <param name="clientId"></param>
697    /// <returns></returns>                       
698    public Response Logout(Guid clientId) {
699      ISession session = factory.GetSessionForCurrentThread();
700      ITransaction tx = null;
701
702      try {
703        IClientAdapter clientAdapter =
704          session.GetDataAdapter<ClientInfo, IClientAdapter>();
705        IJobAdapter jobAdapter =
706          session.GetDataAdapter<Job, IJobAdapter>();
707
708        tx = session.BeginTransaction();
709
710        Response response = new Response();
711
712        heartbeatLock.EnterWriteLock();
713        if (lastHeartbeats.ContainsKey(clientId))
714          lastHeartbeats.Remove(clientId);
715        heartbeatLock.ExitWriteLock();
716
717        ClientInfo client = clientAdapter.GetById(clientId);
718        if (client == null) {
719          response.Success = false;
720          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
721          return response;
722        }
723        if (client.State == State.calculating) {
724          // check wich job the client was calculating and reset it
725          ICollection<Job> jobsOfClient = jobAdapter.GetJobsOf(client);
726          foreach (Job job in jobsOfClient) {
727            if (job.State != State.finished)
728              jobManager.ResetJobsDependingOnResults(job);
729          }
730        }
731
732        client.State = State.offline;
733        clientAdapter.Update(client);
734
735        response.Success = true;
736        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
737
738        tx.Commit();
739        return response;
740      }
741      catch (Exception ex) {
742        if (tx != null)
743          tx.Rollback();
744        throw ex;
745      }
746      finally {
747        if (session != null)
748          session.EndSession();
749      }
750    }
751
752    /// <summary>
753    /// If a client goes offline and restores a job he was calculating
754    /// he can ask the client if he still needs the job result
755    /// </summary>
756    /// <param name="jobId"></param>
757    /// <returns></returns>
758    public Response IsJobStillNeeded(Guid jobId) {
759      ISession session = factory.GetSessionForCurrentThread();
760      ITransaction tx = null;
761
762      try {
763        IJobAdapter jobAdapter =
764          session.GetDataAdapter<Job, IJobAdapter>();
765        tx = session.BeginTransaction();
766
767        Response response = new Response();
768        Job job = jobAdapter.GetById(jobId);
769        if (job == null) {
770          response.Success = false;
771          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
772          return response;
773        }
774        if (job.State == State.finished) {
775          response.Success = true;
776          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
777          return response;
778        }
779        job.State = State.pending;
780        lock (pendingJobs) {
781          pendingJobs.Add(job.Id, PENDING_TIMEOUT);
782        }
783
784        jobAdapter.Update(job);
785
786        response.Success = true;
787        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
788        tx.Commit();
789        return response;
790      }
791      catch (Exception ex) {
792        if (tx != null)
793          tx.Rollback();
794        throw ex;
795      }
796      finally {
797        if (session != null)
798          session.EndSession();
799      }
800    }
801
802    public ResponsePlugin SendPlugins(List<HivePluginInfo> pluginList) {
803      ResponsePlugin response = new ResponsePlugin();
804      PluginManager.Manager.Initialize();
805      ICollection<PluginInfo> allActivePlugins = PluginManager.Manager.ActivePlugins;
806
807      foreach (HivePluginInfo pluginInfo in pluginList) {
808        // TODO: BuildDate deleted, not needed???
809        // TODO: Split version to major, minor and revision number
810        foreach (PluginInfo currPlugin in allActivePlugins) {
811          if (currPlugin.Name == pluginInfo.Name) {
812
813            CachedHivePluginInfo currCachedPlugin = new CachedHivePluginInfo {
814                Name = currPlugin.Name,
815                Version = currPlugin.Version.ToString(),
816                BuildDate = currPlugin.BuildDate };
817
818            foreach (String assemblyPath in currPlugin.Assemblies) {
819              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(assemblyPath));
820            }
821            response.Plugins.Add(currCachedPlugin);
822          }
823        }
824      }
825      response.Success = true;
826      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
827
828      return response;
829
830    }
831
832    #endregion
833  }
834}
Note: See TracBrowser for help on using the repository browser.