Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 2608

Last change on this file since 2608 was 2608, checked in by kgrading, 14 years ago

changed a ton of logging, changed minor job handling and changed the DB IP to localhost (#828)

File size: 34.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39using System.Runtime.Serialization.Formatters.Binary;
40using HeuristicLab.Tracing;
41
42namespace HeuristicLab.Hive.Server.Core {
43  /// <summary>
44  /// The ClientCommunicator manages the whole communication with the client
45  /// </summary>
46  public class ClientCommunicator : IClientCommunicator,
47    IInternalClientCommunicator {
48    private static Dictionary<Guid, DateTime> lastHeartbeats =
49      new Dictionary<Guid, DateTime>();
50    private static Dictionary<Guid, int> newAssignedJobs =
51      new Dictionary<Guid, int>();
52    private static Dictionary<Guid, int> pendingJobs =
53      new Dictionary<Guid, int>();
54
55    private static ReaderWriterLockSlim heartbeatLock =
56      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
57
58    private ISessionFactory factory;
59    private ILifecycleManager lifecycleManager;
60    private IInternalJobManager jobManager;
61    private IScheduler scheduler;
62
63    private static int PENDING_TIMEOUT = 100;
64
65    /// <summary>
66    /// Initialization of the Adapters to the database
67    /// Initialization of Eventhandler for the lifecycle management
68    /// Initialization of lastHearbeats Dictionary
69    /// </summary>
70    public ClientCommunicator() {
71      factory = ServiceLocator.GetSessionFactory();
72
73      lifecycleManager = ServiceLocator.GetLifecycleManager();
74      jobManager = ServiceLocator.GetJobManager() as
75        IInternalJobManager;
76      scheduler = ServiceLocator.GetScheduler();
77
78      lifecycleManager.RegisterHeartbeat(
79        new EventHandler(lifecycleManager_OnServerHeartbeat));
80    }
81
82    /// <summary>
83    /// Check if online clients send their hearbeats
84    /// if not -> set them offline and check if they where calculating a job
85    /// </summary>
86    /// <param name="sender"></param>
87    /// <param name="e"></param>
88    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
89      ISession session = factory.GetSessionForCurrentThread();
90      ITransaction tx = null;
91
92      HiveLogger.Info(this.ToString() + ": Server Heartbeat ticked");
93
94      try {
95        IClientAdapter clientAdapter =
96          session.GetDataAdapter<ClientInfo, IClientAdapter>();
97        IJobAdapter jobAdapter =
98          session.GetDataAdapter<Job, IJobAdapter>();
99
100        tx = session.BeginTransaction();
101
102        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
103
104        foreach (ClientInfo client in allClients) {
105          if (client.State != State.offline && client.State != State.nullState) {
106            heartbeatLock.EnterUpgradeableReadLock();
107
108            if (!lastHeartbeats.ContainsKey(client.Id)) {
109              HiveLogger.Info(this.ToString() + ": Client " + client.Id + " wasn't offline but hasn't sent heartbeats - setting offline");
110              client.State = State.offline;
111              clientAdapter.Update(client);
112              HiveLogger.Info(this.ToString() + ": Client " + client.Id + " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
113              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {               
114                jobManager.ResetJobsDependingOnResults(job);
115              }
116            } else {
117              DateTime lastHbOfClient = lastHeartbeats[client.Id];
118
119              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
120              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
121              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
122                // if client calculated jobs, the job must be reset
123                HiveLogger.Info(this.ToString() + ": Client timed out and is on RESET");
124                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
125                  jobManager.ResetJobsDependingOnResults(job);
126                  lock (newAssignedJobs) {
127                    if (newAssignedJobs.ContainsKey(job.Id))
128                      newAssignedJobs.Remove(job.Id);
129                  }
130                }
131
132                // client must be set offline
133                client.State = State.offline;
134                clientAdapter.Update(client);
135
136                heartbeatLock.EnterWriteLock();
137                lastHeartbeats.Remove(client.Id);
138                heartbeatLock.ExitWriteLock();
139              }
140            }
141
142            heartbeatLock.ExitUpgradeableReadLock();
143          } else {
144            //TODO: RLY neccesary?
145            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Shouldn't have offline or nullstate, has " + client.State);
146            heartbeatLock.EnterWriteLock();
147            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Resetting all his jobs");
148            if (lastHeartbeats.ContainsKey(client.Id))
149              lastHeartbeats.Remove(client.Id);
150            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
151              jobManager.ResetJobsDependingOnResults(job);
152            }
153            heartbeatLock.ExitWriteLock();
154          }
155        }
156        CheckForPendingJobs(jobAdapter);
157
158        tx.Commit();
159      }
160      catch (Exception ex) {
161        if (tx != null)
162          tx.Rollback();
163        throw ex;
164      }
165      finally {
166        if (session != null)
167          session.EndSession();
168      }
169    }
170
171    private void CheckForPendingJobs(IJobAdapter jobAdapter) {
172      IList<Job> pendingJobsInDB = new List<Job>(jobAdapter.GetJobsByState(State.pending));
173
174      foreach (Job currJob in pendingJobsInDB) {
175        lock (pendingJobs) {
176          if (pendingJobs.ContainsKey(currJob.Id)) {
177            if (pendingJobs[currJob.Id] <= 0) {
178              currJob.State = State.offline;
179              jobAdapter.Update(currJob);
180            } else {
181              pendingJobs[currJob.Id]--;
182            }
183          }
184        }
185      }
186    }
187
188    #region IClientCommunicator Members
189
190    /// <summary>
191    /// Login process for the client
192    /// A hearbeat entry is created as well (login is the first hearbeat)
193    /// </summary>
194    /// <param name="clientInfo"></param>
195    /// <returns></returns>
196    public Response Login(ClientInfo clientInfo) {
197      ISession session = factory.GetSessionForCurrentThread();
198      ITransaction tx = null;
199
200      try {
201        IClientAdapter clientAdapter =
202          session.GetDataAdapter<ClientInfo, IClientAdapter>();
203
204        tx = session.BeginTransaction();
205
206        Response response = new Response();
207
208        heartbeatLock.EnterWriteLock();
209        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
210          lastHeartbeats[clientInfo.Id] = DateTime.Now;
211        } else {
212          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
213        }
214        heartbeatLock.ExitWriteLock();
215
216        clientInfo.State = State.idle;
217        clientAdapter.Update(clientInfo);
218        response.Success = true;
219        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
220
221        tx.Commit();
222        return response;
223      }
224      catch (Exception ex) {
225        if (tx != null)
226          tx.Rollback();
227        throw ex;
228      }
229      finally {
230        if (session != null)
231          session.EndSession();
232      }
233    }
234
235    /// <summary>
236    /// The client has to send regulary heartbeats
237    /// this hearbeats will be stored in the heartbeats dictionary
238    /// check if there is work for the client and send the client a response if he should pull a job
239    /// </summary>
240    /// <param name="hbData"></param>
241    /// <returns></returns>
242    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
243     
244      ISession session = factory.GetSessionForCurrentThread();
245      ITransaction tx = null;
246
247      HiveLogger.Info(this.ToString() + ": BEGIN Processing Heartbeat for Client " + hbData.ClientId);
248
249      try {
250        HiveLogger.Info(this.ToString() + ": BEGIN Fetching Adapters");
251        IClientAdapter clientAdapter =
252          session.GetDataAdapter<ClientInfo, IClientAdapter>();
253
254        IJobAdapter jobAdapter =       
255          session.GetDataAdapter<Job, IJobAdapter>();
256        HiveLogger.Info(this.ToString() + ": END Fetched Adapters");
257        HiveLogger.Info(this.ToString() + ": BEGIN Starting Transaction");
258        tx = session.BeginTransaction();
259        HiveLogger.Info(this.ToString() + ": END Started Transaction");
260
261        ResponseHB response = new ResponseHB();
262        response.ActionRequest = new List<MessageContainer>();
263
264        HiveLogger.Info(this.ToString() + ": BEGIN Started Client Fetching");
265        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
266        HiveLogger.Info(this.ToString() + ": END Finished Client Fetching");
267        // check if the client is logged in
268        if (client.State == State.offline || client.State == State.nullState) {
269          response.Success = false;
270          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;         
271          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
272
273          HiveLogger.Error(this.ToString() + " ProcessHeartBeat: Client state null or offline: " + client);
274
275          return response;
276        }
277
278        client.NrOfFreeCores = hbData.FreeCores;
279        client.FreeMemory = hbData.FreeMemory;
280
281        // save timestamp of this heartbeat
282        HiveLogger.Info(this.ToString() + ": BEGIN Locking for Heartbeats");
283        heartbeatLock.EnterWriteLock();
284        HiveLogger.Info(this.ToString() + ": END Locked for Heartbeats");
285        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
286          lastHeartbeats[hbData.ClientId] = DateTime.Now;
287        } else {
288          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
289        }
290        heartbeatLock.ExitWriteLock();
291
292        // check if client has a free core for a new job
293        // if true, ask scheduler for a new job for this client
294        HiveLogger.Info(this.ToString() + ": BEGIN Looking for Client Jobs");
295        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData)) {
296          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
297        } else {
298          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
299        }
300        HiveLogger.Info(this.ToString() + ": END Looked for Client Jobs");
301        response.Success = true;
302        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
303
304        HiveLogger.Info(this.ToString() + ": BEGIN Processing Heartbeat Jobs");
305        processJobProcess(hbData, jobAdapter, clientAdapter, response);
306        HiveLogger.Info(this.ToString() + ": END Processed Heartbeat Jobs");
307       
308        clientAdapter.Update(client);
309
310        tx.Commit();
311        HiveLogger.Info(this.ToString() + ": END Processed Heartbeat for Client " + hbData.ClientId);
312        return response;
313      }
314      catch (Exception ex) {
315        if (tx != null)
316          tx.Rollback();
317        throw ex;
318      }
319      finally {
320        if (session != null)
321          session.EndSession();
322      }
323    }
324
325    /// <summary>
326    /// Process the Job progress sent by a client
327    /// </summary>
328    /// <param name="hbData"></param>
329    /// <param name="jobAdapter"></param>
330    /// <param name="clientAdapter"></param>
331    /// <param name="response"></param>
332    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
333      HiveLogger.Info(this.ToString() + " processJobProcess: Started for Client " + hbData.ClientId);     
334     
335      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
336        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
337        if (jobsOfClient == null || jobsOfClient.Count == 0) {
338          response.Success = false;
339          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
340          HiveLogger.Error(this.ToString() + " processJobProcess: There is no job calculated by this user " + hbData.ClientId);     
341          return;
342        }
343
344        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
345          Job curJob = jobAdapter.GetById(jobProgress.Key);
346          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
347            response.Success = false;
348            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
349            HiveLogger.Error(this.ToString() + " processJobProcess: There is no job calculated by this user " + hbData.ClientId + " Job: " + curJob);     
350          } else if (curJob.State == State.abort) {
351            // a request to abort the job has been set
352            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
353            curJob.State = State.finished;
354          } else {
355            // save job progress
356            curJob.Percentage = jobProgress.Value;
357
358            if (curJob.State == State.requestSnapshot) {
359              // a request for a snapshot has been set
360              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
361              curJob.State = State.requestSnapshotSent;
362            }
363          }
364          jobAdapter.Update(curJob);
365        }
366        foreach (Job currJob in jobsOfClient) {
367          bool found = false;
368          foreach (Guid jobId in hbData.JobProgress.Keys) {
369            if (jobId == currJob.Id) {
370              found = true;
371              break;
372            }
373          }
374          if (!found) {
375            lock (newAssignedJobs) {
376              if (newAssignedJobs.ContainsKey(currJob.Id)) {
377                newAssignedJobs[currJob.Id]--;
378                HiveLogger.Error(this.ToString() + " processJobProcess: Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Client);                     
379                if (newAssignedJobs[currJob.Id] <= 0) {
380                  HiveLogger.Error(this.ToString() + " processJobProcess: Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Client);                 
381
382                  currJob.State = State.offline;
383                  jobAdapter.Update(currJob);
384
385                  response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
386
387                  newAssignedJobs.Remove(currJob.Id);
388                }
389              } else {
390                HiveLogger.Error(this.ToString() + " processJobProcess: Job ID wasn't with the heartbeats:  " + currJob);                     
391                currJob.State = State.offline;
392                jobAdapter.Update(currJob);
393              }
394            } // lock
395          } else {
396            lock (newAssignedJobs) {
397
398              if (newAssignedJobs.ContainsKey(currJob.Id)) {
399                HiveLogger.Info(this.ToString() + " processJobProcess: Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);                     
400                newAssignedJobs.Remove(currJob.Id);
401              }
402            }
403          }
404        }
405      }
406    }
407
408    /// <summary>
409    /// if the client was told to pull a job he calls this method
410    /// the server selects a job and sends it to the client
411    /// </summary>
412    /// <param name="clientId"></param>
413    /// <returns></returns>
414    public ResponseSerializedJob SendSerializedJob(Guid clientId) {
415      ISession session = factory.GetSessionForCurrentThread();
416      ITransaction tx = null;
417
418      try {
419        IJobAdapter jobAdapter =
420          session.GetDataAdapter<Job, IJobAdapter>();
421
422        tx = session.BeginTransaction();
423
424        ResponseSerializedJob response = new ResponseSerializedJob();
425
426        Job job2Calculate = scheduler.GetNextJobForClient(clientId);
427        if (job2Calculate != null) {
428          SerializedJob computableJob =
429            jobAdapter.GetSerializedJob(job2Calculate.Id);
430
431          response.Job = computableJob;
432          response.Success = true;
433          HiveLogger.Info(this.ToString() + " SendSerializedJob: Job pulled: " + computableJob.JobInfo + " for user " + clientId);                     
434          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
435          lock (newAssignedJobs) {
436            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
437              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
438          }
439        } else {
440          HiveLogger.Info(this.ToString() + " SendSerializedJob: No more Jobs left for " + clientId);                     
441          response.Success = false;
442          response.Job = null;
443          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
444        }
445
446        tx.Commit();
447
448        return response;
449      }
450      catch (Exception ex) {
451        if (tx != null)
452          tx.Rollback();
453        throw ex;
454      }
455      finally {
456        if (session != null)
457          session.EndSession();
458      }
459    }
460
461    /// <summary>
462    /// if the client was told to pull a job he calls this method
463    /// the server selects a job and sends it to the client
464    /// </summary>
465    /// <param name="clientId"></param>
466    /// <returns></returns>
467    public ResponseJob SendJob(Guid clientId) {
468      ISession session = factory.GetSessionForCurrentThread();
469      ITransaction tx = null;
470
471      try {
472        IJobAdapter jobAdapter =
473          session.GetDataAdapter<Job, IJobAdapter>();
474
475        tx = session.BeginTransaction();
476
477        ResponseJob response = new ResponseJob();
478
479        Job job2Calculate = scheduler.GetNextJobForClient(clientId);
480        if (job2Calculate != null) {
481          response.Job = job2Calculate;
482          response.Success = true;
483          HiveLogger.Info(this.ToString() + " SendSerializedJob: Job pulled: " + job2Calculate + " for user " + clientId);                     
484          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
485          lock (newAssignedJobs) {
486            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
487              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
488          }
489        } else {
490          response.Success = false;
491          response.Job = null;
492          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
493          HiveLogger.Info(this.ToString() + " SendSerializedJob: No more Jobs left for " + clientId);                     
494        }
495
496        tx.Commit();
497
498        return response;
499      }
500      catch (Exception ex) {
501        if (tx != null)
502          tx.Rollback();
503        throw ex;
504      }
505      finally {
506        if (session != null)
507          session.EndSession();
508      }
509    }
510
511    public ResponseResultReceived ProcessJobResult(
512      Stream stream,
513      bool finished) {
514
515      HiveLogger.Info(this.ToString() + " ProcessJobResult: BEGIN Job received for Storage - main method:");
516     
517      ISession session = factory.GetSessionForCurrentThread();
518      ITransaction tx = null;
519      Stream jobResultStream = null;
520      Stream jobStream = null;
521
522      try {
523        BinaryFormatter formatter =
524          new BinaryFormatter();
525
526        JobResult result =
527          (JobResult)formatter.Deserialize(stream);
528
529        //important - repeatable read isolation level is required here,
530        //otherwise race conditions could occur when writing the stream into the DB
531        //just removed TransactionIsolationLevel.RepeatableRead
532        tx = session.BeginTransaction();
533
534        ResponseResultReceived response =
535          ProcessJobResult(
536          result.ClientId,
537          result.JobId,
538          new byte[] { },
539          result.Percentage,
540          result.Exception,
541          finished);
542
543        if (response.Success) {
544          //second deserialize the BLOB
545          IJobResultsAdapter jobResultsAdapter =
546            session.GetDataAdapter<JobResult, IJobResultsAdapter>();
547
548          IJobAdapter jobAdapter =
549            session.GetDataAdapter<Job, IJobAdapter>();
550
551          jobResultStream =
552            jobResultsAdapter.GetSerializedJobResultStream(response.JobResultId, true);
553
554          jobStream =
555            jobAdapter.GetSerializedJobStream(result.JobId, true);
556
557          byte[] buffer = new byte[3024];
558          int read = 0;
559          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
560            jobResultStream.Write(buffer, 0, read);
561            jobStream.Write(buffer, 0, read);
562          }
563
564          jobResultStream.Close();
565          jobStream.Close();
566
567          tx.Commit();
568        }
569        HiveLogger.Info(this.ToString() + " ProcessJobResult: END Job received for Storage:");
570        return response;
571      }
572      catch (Exception ex) {
573        HiveLogger.Error(this.ToString() + " ProcessJobResult: Exception raised: " + ex);
574        if (tx != null)
575          tx.Rollback();
576        throw ex;
577      }
578      finally {
579        if (jobStream != null)
580          jobStream.Dispose();
581
582        if (jobResultStream != null)
583          jobResultStream.Dispose();
584
585        if (session != null)
586          session.EndSession();
587      }
588    }
589
590    private ResponseResultReceived ProcessJobResult(Guid clientId,
591      Guid jobId,
592      byte[] result,
593      double percentage,
594      Exception exception,
595      bool finished) {
596     
597      HiveLogger.Info(this.ToString() + " ProcessJobResult: BEGIN Job received for Storage - SUB method: " + jobId);
598
599      ISession session = factory.GetSessionForCurrentThread();
600           
601      ITransaction tx = null;
602
603      try {
604        IClientAdapter clientAdapter =
605          session.GetDataAdapter<ClientInfo, IClientAdapter>();
606        IJobAdapter jobAdapter =
607          session.GetDataAdapter<Job, IJobAdapter>();
608        IJobResultsAdapter jobResultAdapter =
609          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
610       
611        //should fetch the existing transaction       
612        tx = session.BeginTransaction();
613
614        ResponseResultReceived response = new ResponseResultReceived();
615        ClientInfo client =
616          clientAdapter.GetById(clientId);
617
618        SerializedJob job =
619          new SerializedJob();
620
621        if (job != null) {
622          job.JobInfo =
623            jobAdapter.GetById(jobId);
624        }
625
626        if (job == null && job.JobInfo != null) {
627          response.Success = false;
628          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
629          response.JobId = jobId;
630         
631          HiveLogger.Error(this.ToString() + " ProcessJobResult: No job with Id " + jobId);                     
632         
633          tx.Rollback();
634          return response;
635        }
636        if (job.JobInfo.State == State.abort) {
637          response.Success = false;
638          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
639         
640          HiveLogger.Error(this.ToString() + " ProcessJobResult: Job was aborted! " + job.JobInfo);                     
641         
642          tx.Rollback();
643          return response;
644        }
645        if (job.JobInfo.Client == null) {
646          response.Success = false;
647          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
648          response.JobId = jobId;
649
650          HiveLogger.Error(this.ToString() + " ProcessJobResult: Job is not being calculated (client = null)! " + job.JobInfo);                     
651
652          tx.Rollback();
653          return response;
654        }
655        if (job.JobInfo.Client.Id != clientId) {
656          response.Success = false;
657          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
658          response.JobId = jobId;
659
660          HiveLogger.Error(this.ToString() + " ProcessJobResult: Wrong Client for this Job! " + job.JobInfo + ", Sending Client is: " + clientId);                     
661
662          tx.Rollback();
663          return response;
664        }
665        if (job.JobInfo.State == State.finished) {
666          response.Success = true;
667          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
668          response.JobId = jobId;
669
670          HiveLogger.Error(this.ToString() + " ProcessJobResult: Job already finished! " + job.JobInfo + ", Sending Client is: " + clientId);                     
671
672          tx.Rollback();
673          return response;
674        }
675        if (job.JobInfo.State == State.requestSnapshotSent) {
676          job.JobInfo.State = State.calculating;
677        }
678        if (job.JobInfo.State != State.calculating &&
679          job.JobInfo.State != State.pending) {
680          response.Success = false;
681          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
682          response.JobId = jobId;
683
684          HiveLogger.Error(this.ToString() + " ProcessJobResult: Wrong Job State, job is: " + job.JobInfo);                     
685
686          tx.Rollback();
687          return response;
688        }
689        job.JobInfo.Percentage = percentage;
690
691        if (finished) {
692          job.JobInfo.State = State.finished;
693        }
694
695        job.SerializedJobData = result;
696        jobAdapter.UpdateSerializedJob(job);
697
698        List<JobResult> jobResults = new List<JobResult>(
699          jobResultAdapter.GetResultsOf(job.JobInfo.Id));
700        foreach (JobResult currentResult in jobResults)
701          jobResultAdapter.Delete(currentResult);
702
703        SerializedJobResult serializedjobResult =
704          new SerializedJobResult();
705        JobResult jobResult = new JobResult();
706        jobResult.ClientId = client.Id;
707        jobResult.JobId = job.JobInfo.Id;
708        jobResult.Percentage = percentage;
709        jobResult.Exception = exception;
710        jobResult.DateFinished = DateTime.Now;
711        serializedjobResult.JobResult = jobResult;
712        serializedjobResult.SerializedJobResultData =
713          result;
714
715        jobResultAdapter.UpdateSerializedJobResult(serializedjobResult);
716
717        response.Success = true;
718        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
719        response.JobId = jobId;
720        response.finished = finished;
721        response.JobResultId = jobResult.Id;
722       
723        //Removed for Testing
724        //tx.Commit();
725        HiveLogger.Info(this.ToString() + " ProcessJobResult: END Job received for Storage - SUB method: " + jobId);       
726        return response;
727      }
728      catch (Exception ex) {
729        if (tx != null)
730          tx.Rollback();
731        throw ex;
732      }
733      finally {
734        if (session != null)
735          session.EndSession();
736      }
737    }
738
739
740    /// <summary>
741    /// the client can send job results during calculating
742    /// and will send a final job result when he finished calculating
743    /// these job results will be stored in the database
744    /// </summary>
745    /// <param name="clientId"></param>
746    /// <param name="jobId"></param>
747    /// <param name="result"></param>
748    /// <param name="exception"></param>
749    /// <param name="finished"></param>
750    /// <returns></returns>
751    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
752      Guid jobId,
753      byte[] result,
754      double percentage,
755      Exception exception) {
756
757      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
758    }
759
760
761    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
762      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
763    }
764
765    /// <summary>
766    /// when a client logs out the state will be set
767    /// and the entry in the last hearbeats dictionary will be removed
768    /// </summary>
769    /// <param name="clientId"></param>
770    /// <returns></returns>                       
771    public Response Logout(Guid clientId) {
772
773      HiveLogger.Info("Client logged out " + clientId);
774     
775      ISession session = factory.GetSessionForCurrentThread();
776      ITransaction tx = null;
777
778      try {
779        IClientAdapter clientAdapter =
780          session.GetDataAdapter<ClientInfo, IClientAdapter>();
781        IJobAdapter jobAdapter =
782          session.GetDataAdapter<Job, IJobAdapter>();
783
784        tx = session.BeginTransaction();
785
786        Response response = new Response();
787
788        heartbeatLock.EnterWriteLock();
789        if (lastHeartbeats.ContainsKey(clientId))
790          lastHeartbeats.Remove(clientId);
791        heartbeatLock.ExitWriteLock();
792
793        ClientInfo client = clientAdapter.GetById(clientId);
794        if (client == null) {
795          response.Success = false;
796          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
797          return response;
798        }
799        if (client.State == State.calculating) {
800          // check wich job the client was calculating and reset it
801          ICollection<Job> jobsOfClient = jobAdapter.GetJobsOf(client);
802          foreach (Job job in jobsOfClient) {
803            if (job.State != State.finished)
804              jobManager.ResetJobsDependingOnResults(job);
805          }
806        }
807
808        client.State = State.offline;
809        clientAdapter.Update(client);
810
811        response.Success = true;
812        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
813
814        tx.Commit();
815        return response;
816      }
817      catch (Exception ex) {
818        if (tx != null)
819          tx.Rollback();
820        throw ex;
821      }
822      finally {
823        if (session != null)
824          session.EndSession();
825      }
826    }
827
828    /// <summary>
829    /// If a client goes offline and restores a job he was calculating
830    /// he can ask the client if he still needs the job result
831    /// </summary>
832    /// <param name="jobId"></param>
833    /// <returns></returns>
834    public Response IsJobStillNeeded(Guid jobId) {
835      ISession session = factory.GetSessionForCurrentThread();
836      ITransaction tx = null;
837
838      try {
839        IJobAdapter jobAdapter =
840          session.GetDataAdapter<Job, IJobAdapter>();
841        tx = session.BeginTransaction();
842
843        Response response = new Response();
844        Job job = jobAdapter.GetById(jobId);
845        if (job == null) {
846          response.Success = false;
847          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
848          HiveLogger.Error(this.ToString() + " IsJobStillNeeded: Job doesn't exist (anymore)! " + jobId);
849          return response;
850        }
851        if (job.State == State.finished) {
852          response.Success = true;
853          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
854          HiveLogger.Error(this.ToString() + " IsJobStillNeeded: already finished! " + job);
855          return response;
856        }
857        job.State = State.pending;
858        lock (pendingJobs) {
859          pendingJobs.Add(job.Id, PENDING_TIMEOUT);
860        }
861
862        jobAdapter.Update(job);
863
864        response.Success = true;
865        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
866        tx.Commit();
867        return response;
868      }
869      catch (Exception ex) {
870        if (tx != null)
871          tx.Rollback();
872        throw ex;
873      }
874      finally {
875        if (session != null)
876          session.EndSession();
877      }
878    }
879
880    public ResponsePlugin SendPlugins(List<HivePluginInfo> pluginList) {
881      ResponsePlugin response = new ResponsePlugin();
882      foreach (HivePluginInfo pluginInfo in pluginList) {
883        // TODO: BuildDate deleted, not needed???
884        // TODO: Split version to major, minor and revision number
885        foreach (IPluginDescription currPlugin in ApplicationManager.Manager.Plugins) {
886          if (currPlugin.Name == pluginInfo.Name) {
887
888            CachedHivePluginInfo currCachedPlugin = new CachedHivePluginInfo {
889              Name = currPlugin.Name,
890              Version = currPlugin.Version.ToString(),
891              BuildDate = currPlugin.BuildDate
892            };
893
894            foreach (String assemblyPath in currPlugin.Files) {
895              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(assemblyPath));
896            }
897            response.Plugins.Add(currCachedPlugin);
898          }
899        }
900      }
901      response.Success = true;
902      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
903
904      return response;
905
906    }
907
908    #endregion
909  }
910}
Note: See TracBrowser for help on using the repository browser.