Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 2930

Last change on this file since 2930 was 2904, checked in by kgrading, 14 years ago

added functionality (#830)

File size: 34.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39using System.Runtime.Serialization.Formatters.Binary;
40using HeuristicLab.Tracing;
41using Linq = HeuristicLab.Hive.Server.LINQDataAccess;
42
43namespace HeuristicLab.Hive.Server.Core {
44  /// <summary>
45  /// The ClientCommunicator manages the whole communication with the client
46  /// </summary>
47  public class ClientCommunicator : IClientCommunicator,
48    IInternalClientCommunicator {
49    private static Dictionary<Guid, DateTime> lastHeartbeats =
50      new Dictionary<Guid, DateTime>();
51    private static Dictionary<Guid, int> newAssignedJobs =
52      new Dictionary<Guid, int>();
53    private static Dictionary<Guid, int> pendingJobs =
54      new Dictionary<Guid, int>();
55
56    private static ReaderWriterLockSlim heartbeatLock =
57      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
58
59    private ISessionFactory factory;
60    private ILifecycleManager lifecycleManager;
61    private IInternalJobManager jobManager;
62    private IScheduler scheduler;
63
64    Linq.ClientDao clientDao = new Linq.ClientDao();
65
66    private static int PENDING_TIMEOUT = 100;
67
68    /// <summary>
69    /// Initialization of the Adapters to the database
70    /// Initialization of Eventhandler for the lifecycle management
71    /// Initialization of lastHearbeats Dictionary
72    /// </summary>
73    public ClientCommunicator() {
74      factory = ServiceLocator.GetSessionFactory();
75
76      lifecycleManager = ServiceLocator.GetLifecycleManager();
77      jobManager = ServiceLocator.GetJobManager() as
78        IInternalJobManager;
79      scheduler = ServiceLocator.GetScheduler();
80
81      lifecycleManager.RegisterHeartbeat(
82        new EventHandler(lifecycleManager_OnServerHeartbeat));
83    }
84
85    /// <summary>
86    /// Check if online clients send their hearbeats
87    /// if not -> set them offline and check if they where calculating a job
88    /// </summary>
89    /// <param name="sender"></param>
90    /// <param name="e"></param>
91    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
92      ISession session = factory.GetSessionForCurrentThread();
93      ITransaction tx = null;
94
95      HiveLogger.Info(this.ToString() + ": Server Heartbeat ticked");
96
97      try {
98        IClientAdapter clientAdapter =
99          session.GetDataAdapter<ClientInfo, IClientAdapter>();
100        IJobAdapter jobAdapter =
101          session.GetDataAdapter<Job, IJobAdapter>();
102
103        tx = session.BeginTransaction();
104
105        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
106
107        foreach (ClientInfo client in allClients) {
108          if (client.State != State.offline && client.State != State.nullState) {
109            heartbeatLock.EnterUpgradeableReadLock();
110
111            if (!lastHeartbeats.ContainsKey(client.Id)) {
112              HiveLogger.Info(this.ToString() + ": Client " + client.Id + " wasn't offline but hasn't sent heartbeats - setting offline");
113              client.State = State.offline;
114              clientAdapter.Update(client);
115              HiveLogger.Info(this.ToString() + ": Client " + client.Id + " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
116              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {               
117                jobManager.ResetJobsDependingOnResults(job);
118              }
119            } else {
120              DateTime lastHbOfClient = lastHeartbeats[client.Id];
121
122              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
123              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
124              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
125                // if client calculated jobs, the job must be reset
126                HiveLogger.Info(this.ToString() + ": Client timed out and is on RESET");
127                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
128                  jobManager.ResetJobsDependingOnResults(job);
129                  lock (newAssignedJobs) {
130                    if (newAssignedJobs.ContainsKey(job.Id))
131                      newAssignedJobs.Remove(job.Id);
132                  }
133                }
134
135                // client must be set offline
136                client.State = State.offline;
137                clientAdapter.Update(client);
138
139                heartbeatLock.EnterWriteLock();
140                lastHeartbeats.Remove(client.Id);
141                heartbeatLock.ExitWriteLock();
142              }
143            }
144
145            heartbeatLock.ExitUpgradeableReadLock();
146          } else {
147            //TODO: RLY neccesary?
148            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Shouldn't have offline or nullstate, has " + client.State);
149            heartbeatLock.EnterWriteLock();
150            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Resetting all his jobs");
151            if (lastHeartbeats.ContainsKey(client.Id))
152              lastHeartbeats.Remove(client.Id);
153            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
154              jobManager.ResetJobsDependingOnResults(job);
155            }
156            heartbeatLock.ExitWriteLock();
157          }
158        }
159        CheckForPendingJobs(jobAdapter);
160
161        tx.Commit();
162      }
163      catch (Exception ex) {
164        if (tx != null)
165          tx.Rollback();
166        throw ex;
167      }
168      finally {
169        if (session != null)
170          session.EndSession();
171      }
172    }
173
174    private void CheckForPendingJobs(IJobAdapter jobAdapter) {
175      IList<Job> pendingJobsInDB = new List<Job>(jobAdapter.GetJobsByState(State.pending));
176
177      foreach (Job currJob in pendingJobsInDB) {
178        lock (pendingJobs) {
179          if (pendingJobs.ContainsKey(currJob.Id)) {
180            if (pendingJobs[currJob.Id] <= 0) {
181              currJob.State = State.offline;
182              jobAdapter.Update(currJob);
183            } else {
184              pendingJobs[currJob.Id]--;
185            }
186          }
187        }
188      }
189    }
190
191    #region IClientCommunicator Members
192
193    /// <summary>
194    /// Login process for the client
195    /// A hearbeat entry is created as well (login is the first hearbeat)
196    /// </summary>
197    /// <param name="clientInfo"></param>
198    /// <returns></returns>
199    public Response Login(ClientInfo clientInfo) {
200    //  ISession session = factory.GetSessionForCurrentThread();
201    //  ITransaction tx = null;
202
203    //  try {
204    //    IClientAdapter clientAdapter =
205    //      session.GetDataAdapter<ClientInfo, IClientAdapter>();
206
207    //    tx = session.BeginTransaction();
208
209        Response response = new Response();
210
211        heartbeatLock.EnterWriteLock();
212        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
213          lastHeartbeats[clientInfo.Id] = DateTime.Now;
214        } else {
215          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
216        }
217        heartbeatLock.ExitWriteLock();
218
219        clientInfo.State = State.idle;
220
221        if (clientDao.FindById(clientInfo.Id) == null)
222          clientDao.Insert(clientInfo);
223        else
224          clientDao.Update(clientInfo);
225        //clientAdapter.Update(clientInfo);
226        response.Success = true;
227        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
228
229        //tx.Commit();
230        return response;
231      //}
232      /*catch (Exception ex) {
233        if (tx != null)
234          tx.Rollback();
235        throw ex;
236      }
237      finally {
238        if (session != null)
239          session.EndSession();
240      } */
241    }
242
243    /// <summary>
244    /// The client has to send regulary heartbeats
245    /// this hearbeats will be stored in the heartbeats dictionary
246    /// check if there is work for the client and send the client a response if he should pull a job
247    /// </summary>
248    /// <param name="hbData"></param>
249    /// <returns></returns>
250    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
251     
252      ISession session = factory.GetSessionForCurrentThread();
253      ITransaction tx = null;
254
255      HiveLogger.Info(this.ToString() + ": BEGIN Processing Heartbeat for Client " + hbData.ClientId);
256
257      try {
258        HiveLogger.Info(this.ToString() + ": BEGIN Fetching Adapters");
259        IClientAdapter clientAdapter =
260          session.GetDataAdapter<ClientInfo, IClientAdapter>();
261
262        IJobAdapter jobAdapter =       
263          session.GetDataAdapter<Job, IJobAdapter>();
264        HiveLogger.Info(this.ToString() + ": END Fetched Adapters");
265        HiveLogger.Info(this.ToString() + ": BEGIN Starting Transaction");
266        tx = session.BeginTransaction();
267        HiveLogger.Info(this.ToString() + ": END Started Transaction");
268
269        ResponseHB response = new ResponseHB();
270        response.ActionRequest = new List<MessageContainer>();
271
272        HiveLogger.Info(this.ToString() + ": BEGIN Started Client Fetching");
273        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
274        HiveLogger.Info(this.ToString() + ": END Finished Client Fetching");
275        // check if the client is logged in
276        if (client.State == State.offline || client.State == State.nullState) {
277          response.Success = false;
278          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;         
279          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
280
281          HiveLogger.Error(this.ToString() + " ProcessHeartBeat: Client state null or offline: " + client);
282
283          return response;
284        }
285
286        client.NrOfFreeCores = hbData.FreeCores;
287        client.FreeMemory = hbData.FreeMemory;
288
289        // save timestamp of this heartbeat
290        HiveLogger.Info(this.ToString() + ": BEGIN Locking for Heartbeats");
291        heartbeatLock.EnterWriteLock();
292        HiveLogger.Info(this.ToString() + ": END Locked for Heartbeats");
293        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
294          lastHeartbeats[hbData.ClientId] = DateTime.Now;
295        } else {
296          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
297        }
298        heartbeatLock.ExitWriteLock();
299
300        // check if client has a free core for a new job
301        // if true, ask scheduler for a new job for this client
302        HiveLogger.Info(this.ToString() + ": BEGIN Looking for Client Jobs");
303        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData)) {
304          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
305        } else {
306          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
307        }
308        HiveLogger.Info(this.ToString() + ": END Looked for Client Jobs");
309        response.Success = true;
310        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
311
312        HiveLogger.Info(this.ToString() + ": BEGIN Processing Heartbeat Jobs");
313        processJobProcess(hbData, jobAdapter, clientAdapter, response);
314        HiveLogger.Info(this.ToString() + ": END Processed Heartbeat Jobs");
315       
316        clientAdapter.Update(client);
317
318        tx.Commit();
319        HiveLogger.Info(this.ToString() + ": END Processed Heartbeat for Client " + hbData.ClientId);
320        return response;
321      }
322      catch (Exception ex) {
323        if (tx != null)
324          tx.Rollback();
325        throw ex;
326      }
327      finally {
328        if (session != null)
329          session.EndSession();
330      }
331    }
332
333    /// <summary>
334    /// Process the Job progress sent by a client
335    /// </summary>
336    /// <param name="hbData"></param>
337    /// <param name="jobAdapter"></param>
338    /// <param name="clientAdapter"></param>
339    /// <param name="response"></param>
340    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
341      HiveLogger.Info(this.ToString() + " processJobProcess: Started for Client " + hbData.ClientId);     
342     
343      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
344        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
345        if (jobsOfClient == null || jobsOfClient.Count == 0) {
346          response.Success = false;
347          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
348          HiveLogger.Error(this.ToString() + " processJobProcess: There is no job calculated by this user " + hbData.ClientId);     
349          return;
350        }
351
352        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
353          Job curJob = jobAdapter.GetById(jobProgress.Key);
354          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
355            response.Success = false;
356            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
357            HiveLogger.Error(this.ToString() + " processJobProcess: There is no job calculated by this user " + hbData.ClientId + " Job: " + curJob);     
358          } else if (curJob.State == State.abort) {
359            // a request to abort the job has been set
360            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
361            curJob.State = State.finished;
362          } else {
363            // save job progress
364            curJob.Percentage = jobProgress.Value;
365
366            if (curJob.State == State.requestSnapshot) {
367              // a request for a snapshot has been set
368              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
369              curJob.State = State.requestSnapshotSent;
370            }
371          }
372          jobAdapter.Update(curJob);
373        }
374        foreach (Job currJob in jobsOfClient) {
375          bool found = false;
376          foreach (Guid jobId in hbData.JobProgress.Keys) {
377            if (jobId == currJob.Id) {
378              found = true;
379              break;
380            }
381          }
382          if (!found) {
383            lock (newAssignedJobs) {
384              if (newAssignedJobs.ContainsKey(currJob.Id)) {
385                newAssignedJobs[currJob.Id]--;
386                HiveLogger.Error(this.ToString() + " processJobProcess: Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Client);                     
387                if (newAssignedJobs[currJob.Id] <= 0) {
388                  HiveLogger.Error(this.ToString() + " processJobProcess: Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Client);                 
389
390                  currJob.State = State.offline;
391                  jobAdapter.Update(currJob);
392
393                  response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
394
395                  newAssignedJobs.Remove(currJob.Id);
396                }
397              } else {
398                HiveLogger.Error(this.ToString() + " processJobProcess: Job ID wasn't with the heartbeats:  " + currJob);                     
399                currJob.State = State.offline;
400                jobAdapter.Update(currJob);
401              }
402            } // lock
403          } else {
404            lock (newAssignedJobs) {
405
406              if (newAssignedJobs.ContainsKey(currJob.Id)) {
407                HiveLogger.Info(this.ToString() + " processJobProcess: Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);                     
408                newAssignedJobs.Remove(currJob.Id);
409              }
410            }
411          }
412        }
413      }
414    }
415
416    /// <summary>
417    /// if the client was told to pull a job he calls this method
418    /// the server selects a job and sends it to the client
419    /// </summary>
420    /// <param name="clientId"></param>
421    /// <returns></returns>
422    public ResponseSerializedJob SendSerializedJob(Guid clientId) {
423      ISession session = factory.GetSessionForCurrentThread();
424      ITransaction tx = null;
425
426      try {
427        IJobAdapter jobAdapter =
428          session.GetDataAdapter<Job, IJobAdapter>();
429
430        tx = session.BeginTransaction();
431
432        ResponseSerializedJob response = new ResponseSerializedJob();
433
434        Job job2Calculate = scheduler.GetNextJobForClient(clientId);
435        if (job2Calculate != null) {
436          SerializedJob computableJob =
437            jobAdapter.GetSerializedJob(job2Calculate.Id);
438
439          response.Job = computableJob;
440          response.Success = true;
441          HiveLogger.Info(this.ToString() + " SendSerializedJob: Job pulled: " + computableJob.JobInfo + " for user " + clientId);                     
442          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
443          lock (newAssignedJobs) {
444            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
445              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
446          }
447        } else {
448          HiveLogger.Info(this.ToString() + " SendSerializedJob: No more Jobs left for " + clientId);                     
449          response.Success = false;
450          response.Job = null;
451          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
452        }
453
454        tx.Commit();
455
456        return response;
457      }
458      catch (Exception ex) {
459        if (tx != null)
460          tx.Rollback();
461        throw ex;
462      }
463      finally {
464        if (session != null)
465          session.EndSession();
466      }
467    }
468
469    /// <summary>
470    /// if the client was told to pull a job he calls this method
471    /// the server selects a job and sends it to the client
472    /// </summary>
473    /// <param name="clientId"></param>
474    /// <returns></returns>
475    public ResponseJob SendJob(Guid clientId) {
476      ISession session = factory.GetSessionForCurrentThread();
477      ITransaction tx = null;
478
479      try {
480        IJobAdapter jobAdapter =
481          session.GetDataAdapter<Job, IJobAdapter>();
482
483        tx = session.BeginTransaction();
484
485        ResponseJob response = new ResponseJob();
486
487        Job job2Calculate = scheduler.GetNextJobForClient(clientId);
488        if (job2Calculate != null) {
489          response.Job = job2Calculate;
490          response.Success = true;
491          HiveLogger.Info(this.ToString() + " SendSerializedJob: Job pulled: " + job2Calculate + " for user " + clientId);                     
492          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
493          lock (newAssignedJobs) {
494            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
495              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
496          }
497        } else {
498          response.Success = false;
499          response.Job = null;
500          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
501          HiveLogger.Info(this.ToString() + " SendSerializedJob: No more Jobs left for " + clientId);                     
502        }
503
504        tx.Commit();
505
506        return response;
507      }
508      catch (Exception ex) {
509        if (tx != null)
510          tx.Rollback();
511        throw ex;
512      }
513      finally {
514        if (session != null)
515          session.EndSession();
516      }
517    }
518
519    public ResponseResultReceived ProcessJobResult(
520      Stream stream,
521      bool finished) {
522
523      HiveLogger.Info(this.ToString() + " ProcessJobResult: BEGIN Job received for Storage - main method:");
524     
525      ISession session = factory.GetSessionForCurrentThread();
526      ITransaction tx = null;
527      Stream jobResultStream = null;
528      Stream jobStream = null;
529
530      try {
531        BinaryFormatter formatter =
532          new BinaryFormatter();
533
534        JobResult result =
535          (JobResult)formatter.Deserialize(stream);
536
537        //important - repeatable read isolation level is required here,
538        //otherwise race conditions could occur when writing the stream into the DB
539        //just removed TransactionIsolationLevel.RepeatableRead
540        tx = session.BeginTransaction();
541
542        ResponseResultReceived response =
543          ProcessJobResult(
544          result.ClientId,
545          result.JobId,
546          new byte[] { },
547          result.Percentage,
548          result.Exception,
549          finished);
550
551        if (response.Success) {
552          //second deserialize the BLOB
553          IJobResultsAdapter jobResultsAdapter =
554            session.GetDataAdapter<JobResult, IJobResultsAdapter>();
555
556          IJobAdapter jobAdapter =
557            session.GetDataAdapter<Job, IJobAdapter>();
558
559          jobResultStream =
560            jobResultsAdapter.GetSerializedJobResultStream(response.JobResultId, true);
561
562          jobStream =
563            jobAdapter.GetSerializedJobStream(result.JobId, true);
564
565          byte[] buffer = new byte[3024];
566          int read = 0;
567          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
568            jobResultStream.Write(buffer, 0, read);
569            jobStream.Write(buffer, 0, read);
570          }
571
572          jobResultStream.Close();
573          jobStream.Close();
574
575          tx.Commit();
576        }
577        HiveLogger.Info(this.ToString() + " ProcessJobResult: END Job received for Storage:");
578        return response;
579      }
580      catch (Exception ex) {
581        HiveLogger.Error(this.ToString() + " ProcessJobResult: Exception raised: " + ex);
582        if (tx != null)
583          tx.Rollback();
584        throw ex;
585      }
586      finally {
587        if (jobStream != null)
588          jobStream.Dispose();
589
590        if (jobResultStream != null)
591          jobResultStream.Dispose();
592
593        if (session != null)
594          session.EndSession();
595      }
596    }
597
598    private ResponseResultReceived ProcessJobResult(Guid clientId,
599      Guid jobId,
600      byte[] result,
601      double percentage,
602      Exception exception,
603      bool finished) {
604     
605      HiveLogger.Info(this.ToString() + " ProcessJobResult: BEGIN Job received for Storage - SUB method: " + jobId);
606
607      ISession session = factory.GetSessionForCurrentThread();
608           
609      ITransaction tx = null;
610
611      try {
612        IClientAdapter clientAdapter =
613          session.GetDataAdapter<ClientInfo, IClientAdapter>();
614        IJobAdapter jobAdapter =
615          session.GetDataAdapter<Job, IJobAdapter>();
616        IJobResultsAdapter jobResultAdapter =
617          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
618       
619        //should fetch the existing transaction       
620        tx = session.BeginTransaction();
621
622        ResponseResultReceived response = new ResponseResultReceived();
623        ClientInfo client =
624          clientAdapter.GetById(clientId);
625
626        SerializedJob job =
627          new SerializedJob();
628
629        if (job != null) {
630          job.JobInfo =
631            jobAdapter.GetById(jobId);
632        }
633
634        if (job == null && job.JobInfo != null) {
635          response.Success = false;
636          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
637          response.JobId = jobId;
638         
639          HiveLogger.Error(this.ToString() + " ProcessJobResult: No job with Id " + jobId);                     
640         
641          tx.Rollback();
642          return response;
643        }
644        if (job.JobInfo.State == State.abort) {
645          response.Success = false;
646          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
647         
648          HiveLogger.Error(this.ToString() + " ProcessJobResult: Job was aborted! " + job.JobInfo);                     
649         
650          tx.Rollback();
651          return response;
652        }
653        if (job.JobInfo.Client == null) {
654          response.Success = false;
655          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
656          response.JobId = jobId;
657
658          HiveLogger.Error(this.ToString() + " ProcessJobResult: Job is not being calculated (client = null)! " + job.JobInfo);                     
659
660          tx.Rollback();
661          return response;
662        }
663        if (job.JobInfo.Client.Id != clientId) {
664          response.Success = false;
665          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
666          response.JobId = jobId;
667
668          HiveLogger.Error(this.ToString() + " ProcessJobResult: Wrong Client for this Job! " + job.JobInfo + ", Sending Client is: " + clientId);                     
669
670          tx.Rollback();
671          return response;
672        }
673        if (job.JobInfo.State == State.finished) {
674          response.Success = true;
675          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
676          response.JobId = jobId;
677
678          HiveLogger.Error(this.ToString() + " ProcessJobResult: Job already finished! " + job.JobInfo + ", Sending Client is: " + clientId);                     
679
680          tx.Rollback();
681          return response;
682        }
683        if (job.JobInfo.State == State.requestSnapshotSent) {
684          job.JobInfo.State = State.calculating;
685        }
686        if (job.JobInfo.State != State.calculating &&
687          job.JobInfo.State != State.pending) {
688          response.Success = false;
689          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
690          response.JobId = jobId;
691
692          HiveLogger.Error(this.ToString() + " ProcessJobResult: Wrong Job State, job is: " + job.JobInfo);                     
693
694          tx.Rollback();
695          return response;
696        }
697        job.JobInfo.Percentage = percentage;
698
699        if (finished) {
700          job.JobInfo.State = State.finished;
701        }
702
703        job.SerializedJobData = result;
704        jobAdapter.UpdateSerializedJob(job);
705
706        List<JobResult> jobResults = new List<JobResult>(
707          jobResultAdapter.GetResultsOf(job.JobInfo.Id));
708        foreach (JobResult currentResult in jobResults)
709          jobResultAdapter.Delete(currentResult);
710
711        SerializedJobResult serializedjobResult =
712          new SerializedJobResult();
713        JobResult jobResult = new JobResult();
714        jobResult.ClientId = client.Id;
715        jobResult.JobId = job.JobInfo.Id;
716        jobResult.Percentage = percentage;
717        jobResult.Exception = exception;
718        jobResult.DateFinished = DateTime.Now;
719        serializedjobResult.JobResult = jobResult;
720        serializedjobResult.SerializedJobResultData =
721          result;
722
723        jobResultAdapter.UpdateSerializedJobResult(serializedjobResult);
724
725        response.Success = true;
726        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
727        response.JobId = jobId;
728        response.finished = finished;
729        response.JobResultId = jobResult.Id;
730               
731        tx.Commit();
732        HiveLogger.Info(this.ToString() + " ProcessJobResult: END Job received for Storage - SUB method: " + jobId);       
733        return response;
734      }
735      catch (Exception ex) {
736        if (tx != null)
737          tx.Rollback();
738        throw ex;
739      }
740      finally {
741        if (session != null)
742          session.EndSession();
743      }
744    }
745
746
747    /// <summary>
748    /// the client can send job results during calculating
749    /// and will send a final job result when he finished calculating
750    /// these job results will be stored in the database
751    /// </summary>
752    /// <param name="clientId"></param>
753    /// <param name="jobId"></param>
754    /// <param name="result"></param>
755    /// <param name="exception"></param>
756    /// <param name="finished"></param>
757    /// <returns></returns>
758    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
759      Guid jobId,
760      byte[] result,
761      double percentage,
762      Exception exception) {
763
764      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
765    }
766
767
768    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
769      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
770    }
771
772    /// <summary>
773    /// when a client logs out the state will be set
774    /// and the entry in the last hearbeats dictionary will be removed
775    /// </summary>
776    /// <param name="clientId"></param>
777    /// <returns></returns>                       
778    public Response Logout(Guid clientId) {
779
780      HiveLogger.Info("Client logged out " + clientId);
781     
782      ISession session = factory.GetSessionForCurrentThread();
783      ITransaction tx = null;
784
785      try {
786        IClientAdapter clientAdapter =
787          session.GetDataAdapter<ClientInfo, IClientAdapter>();
788        IJobAdapter jobAdapter =
789          session.GetDataAdapter<Job, IJobAdapter>();
790
791        tx = session.BeginTransaction();
792
793        Response response = new Response();
794
795        heartbeatLock.EnterWriteLock();
796        if (lastHeartbeats.ContainsKey(clientId))
797          lastHeartbeats.Remove(clientId);
798        heartbeatLock.ExitWriteLock();
799
800        ClientInfo client = clientAdapter.GetById(clientId);
801        if (client == null) {
802          response.Success = false;
803          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
804          return response;
805        }
806        if (client.State == State.calculating) {
807          // check wich job the client was calculating and reset it
808          ICollection<Job> jobsOfClient = jobAdapter.GetJobsOf(client);
809          foreach (Job job in jobsOfClient) {
810            if (job.State != State.finished)
811              jobManager.ResetJobsDependingOnResults(job);
812          }
813        }
814
815        client.State = State.offline;
816        clientAdapter.Update(client);
817
818        response.Success = true;
819        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
820
821        tx.Commit();
822        return response;
823      }
824      catch (Exception ex) {
825        if (tx != null)
826          tx.Rollback();
827        throw ex;
828      }
829      finally {
830        if (session != null)
831          session.EndSession();
832      }
833    }
834
835    /// <summary>
836    /// If a client goes offline and restores a job he was calculating
837    /// he can ask the client if he still needs the job result
838    /// </summary>
839    /// <param name="jobId"></param>
840    /// <returns></returns>
841    public Response IsJobStillNeeded(Guid jobId) {
842      ISession session = factory.GetSessionForCurrentThread();
843      ITransaction tx = null;
844
845      try {
846        IJobAdapter jobAdapter =
847          session.GetDataAdapter<Job, IJobAdapter>();
848        tx = session.BeginTransaction();
849
850        Response response = new Response();
851        Job job = jobAdapter.GetById(jobId);
852        if (job == null) {
853          response.Success = false;
854          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
855          HiveLogger.Error(this.ToString() + " IsJobStillNeeded: Job doesn't exist (anymore)! " + jobId);
856          return response;
857        }
858        if (job.State == State.finished) {
859          response.Success = true;
860          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
861          HiveLogger.Error(this.ToString() + " IsJobStillNeeded: already finished! " + job);
862          return response;
863        }
864        job.State = State.pending;
865        lock (pendingJobs) {
866          pendingJobs.Add(job.Id, PENDING_TIMEOUT);
867        }
868
869        jobAdapter.Update(job);
870
871        response.Success = true;
872        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
873        tx.Commit();
874        return response;
875      }
876      catch (Exception ex) {
877        if (tx != null)
878          tx.Rollback();
879        throw ex;
880      }
881      finally {
882        if (session != null)
883          session.EndSession();
884      }
885    }
886
887    public ResponsePlugin SendPlugins(List<HivePluginInfo> pluginList) {
888      ResponsePlugin response = new ResponsePlugin();
889      foreach (HivePluginInfo pluginInfo in pluginList) {
890        // TODO: BuildDate deleted, not needed???
891        // TODO: Split version to major, minor and revision number
892        foreach (IPluginDescription currPlugin in ApplicationManager.Manager.Plugins) {
893          if (currPlugin.Name == pluginInfo.Name) {
894
895            CachedHivePluginInfo currCachedPlugin = new CachedHivePluginInfo {
896              Name = currPlugin.Name,
897              Version = currPlugin.Version.ToString(),
898              BuildDate = currPlugin.BuildDate
899            };
900
901            foreach (string fileName in from file in currPlugin.Files select file.Name) {
902              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(fileName));
903            }
904            response.Plugins.Add(currCachedPlugin);
905          }
906        }
907      }
908      response.Success = true;
909      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
910
911      return response;
912
913    }
914
915    #endregion
916  }
917}
Note: See TracBrowser for help on using the repository browser.