Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 2688

Last change on this file since 2688 was 2688, checked in by gkronber, 14 years ago

Implemented an enumerable to iterate through all PluginFiles as suggested by swagner, replaced the Assemblies enumerable with an AssemblyName enumerable for internal usage in the plugin infrastructure and replaced Assembly.LoadFrom calls with Assembly.Load() to prevent loading from GAC as far as possible.

#850 (PluginInfrastructure should provide a way to get assemblies associated with a plug-in)

File size: 33.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39using System.Runtime.Serialization.Formatters.Binary;
40using HeuristicLab.Tracing;
41
42namespace HeuristicLab.Hive.Server.Core {
43  /// <summary>
44  /// The ClientCommunicator manages the whole communication with the client
45  /// </summary>
46  public class ClientCommunicator : IClientCommunicator,
47    IInternalClientCommunicator {
48    private static Dictionary<Guid, DateTime> lastHeartbeats =
49      new Dictionary<Guid, DateTime>();
50    private static Dictionary<Guid, int> newAssignedJobs =
51      new Dictionary<Guid, int>();
52    private static Dictionary<Guid, int> pendingJobs =
53      new Dictionary<Guid, int>();
54
55    private static ReaderWriterLockSlim heartbeatLock =
56      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
57
58    private ISessionFactory factory;
59    private ILifecycleManager lifecycleManager;
60    private IInternalJobManager jobManager;
61    private IScheduler scheduler;
62
63    private static int PENDING_TIMEOUT = 100;
64
65    /// <summary>
66    /// Initialization of the Adapters to the database
67    /// Initialization of Eventhandler for the lifecycle management
68    /// Initialization of lastHearbeats Dictionary
69    /// </summary>
70    public ClientCommunicator() {
71      factory = ServiceLocator.GetSessionFactory();
72
73      lifecycleManager = ServiceLocator.GetLifecycleManager();
74      jobManager = ServiceLocator.GetJobManager() as
75        IInternalJobManager;
76      scheduler = ServiceLocator.GetScheduler();
77
78      lifecycleManager.RegisterHeartbeat(
79        new EventHandler(lifecycleManager_OnServerHeartbeat));
80    }
81
82    /// <summary>
83    /// Check if online clients send their hearbeats
84    /// if not -> set them offline and check if they where calculating a job
85    /// </summary>
86    /// <param name="sender"></param>
87    /// <param name="e"></param>
88    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
89      ISession session = factory.GetSessionForCurrentThread();
90      ITransaction tx = null;
91
92      HiveLogger.Info(this.ToString() + ": Server Heartbeat ticked");
93
94      try {
95        IClientAdapter clientAdapter =
96          session.GetDataAdapter<ClientInfo, IClientAdapter>();
97        IJobAdapter jobAdapter =
98          session.GetDataAdapter<Job, IJobAdapter>();
99
100        tx = session.BeginTransaction();
101
102        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
103
104        foreach (ClientInfo client in allClients) {
105          if (client.State != State.offline && client.State != State.nullState) {
106            heartbeatLock.EnterUpgradeableReadLock();
107
108            if (!lastHeartbeats.ContainsKey(client.Id)) {
109              HiveLogger.Info(this.ToString() + ": Client " + client.Id + " wasn't offline but hasn't sent heartbeats - setting offline");
110              client.State = State.offline;
111              clientAdapter.Update(client);
112              HiveLogger.Info(this.ToString() + ": Client " + client.Id + " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
113              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
114                jobManager.ResetJobsDependingOnResults(job);
115              }
116            } else {
117              DateTime lastHbOfClient = lastHeartbeats[client.Id];
118
119              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
120              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
121              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
122                // if client calculated jobs, the job must be reset
123                HiveLogger.Info(this.ToString() + ": Client timed out and is on RESET");
124                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
125                  jobManager.ResetJobsDependingOnResults(job);
126                  lock (newAssignedJobs) {
127                    if (newAssignedJobs.ContainsKey(job.Id))
128                      newAssignedJobs.Remove(job.Id);
129                  }
130                }
131
132                // client must be set offline
133                client.State = State.offline;
134                clientAdapter.Update(client);
135
136                heartbeatLock.EnterWriteLock();
137                lastHeartbeats.Remove(client.Id);
138                heartbeatLock.ExitWriteLock();
139              }
140            }
141
142            heartbeatLock.ExitUpgradeableReadLock();
143          } else {
144            //TODO: RLY neccesary?
145            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Shouldn't have offline or nullstate, has " + client.State);
146            heartbeatLock.EnterWriteLock();
147            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Resetting all his jobs");
148            if (lastHeartbeats.ContainsKey(client.Id))
149              lastHeartbeats.Remove(client.Id);
150            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
151              jobManager.ResetJobsDependingOnResults(job);
152            }
153            heartbeatLock.ExitWriteLock();
154          }
155        }
156        CheckForPendingJobs(jobAdapter);
157
158        tx.Commit();
159      }
160      catch (Exception ex) {
161        if (tx != null)
162          tx.Rollback();
163        throw ex;
164      }
165      finally {
166        if (session != null)
167          session.EndSession();
168      }
169    }
170
171    private void CheckForPendingJobs(IJobAdapter jobAdapter) {
172      IList<Job> pendingJobsInDB = new List<Job>(jobAdapter.GetJobsByState(State.pending));
173
174      foreach (Job currJob in pendingJobsInDB) {
175        lock (pendingJobs) {
176          if (pendingJobs.ContainsKey(currJob.Id)) {
177            if (pendingJobs[currJob.Id] <= 0) {
178              currJob.State = State.offline;
179              jobAdapter.Update(currJob);
180            } else {
181              pendingJobs[currJob.Id]--;
182            }
183          }
184        }
185      }
186    }
187
188    #region IClientCommunicator Members
189
190    /// <summary>
191    /// Login process for the client
192    /// A hearbeat entry is created as well (login is the first hearbeat)
193    /// </summary>
194    /// <param name="clientInfo"></param>
195    /// <returns></returns>
196    public Response Login(ClientInfo clientInfo) {
197      ISession session = factory.GetSessionForCurrentThread();
198      ITransaction tx = null;
199
200      try {
201        IClientAdapter clientAdapter =
202          session.GetDataAdapter<ClientInfo, IClientAdapter>();
203
204        tx = session.BeginTransaction();
205
206        Response response = new Response();
207
208        heartbeatLock.EnterWriteLock();
209        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
210          lastHeartbeats[clientInfo.Id] = DateTime.Now;
211        } else {
212          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
213        }
214        heartbeatLock.ExitWriteLock();
215
216        clientInfo.State = State.idle;
217        clientAdapter.Update(clientInfo);
218        response.Success = true;
219        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
220
221        tx.Commit();
222        return response;
223      }
224      catch (Exception ex) {
225        if (tx != null)
226          tx.Rollback();
227        throw ex;
228      }
229      finally {
230        if (session != null)
231          session.EndSession();
232      }
233    }
234
235    /// <summary>
236    /// The client has to send regulary heartbeats
237    /// this hearbeats will be stored in the heartbeats dictionary
238    /// check if there is work for the client and send the client a response if he should pull a job
239    /// </summary>
240    /// <param name="hbData"></param>
241    /// <returns></returns>
242    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
243
244      ISession session = factory.GetSessionForCurrentThread();
245      ITransaction tx = null;
246
247      HiveLogger.Info(this.ToString() + ": BEGIN Processing Heartbeat for Client " + hbData.ClientId);
248
249      try {
250        HiveLogger.Info(this.ToString() + ": BEGIN Fetching Adapters");
251        IClientAdapter clientAdapter =
252          session.GetDataAdapter<ClientInfo, IClientAdapter>();
253
254        IJobAdapter jobAdapter =
255          session.GetDataAdapter<Job, IJobAdapter>();
256        HiveLogger.Info(this.ToString() + ": END Fetched Adapters");
257        HiveLogger.Info(this.ToString() + ": BEGIN Starting Transaction");
258        tx = session.BeginTransaction();
259        HiveLogger.Info(this.ToString() + ": END Started Transaction");
260
261        ResponseHB response = new ResponseHB();
262        response.ActionRequest = new List<MessageContainer>();
263
264        HiveLogger.Info(this.ToString() + ": BEGIN Started Client Fetching");
265        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
266        HiveLogger.Info(this.ToString() + ": END Finished Client Fetching");
267        // check if the client is logged in
268        if (client.State == State.offline || client.State == State.nullState) {
269          response.Success = false;
270          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
271          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
272
273          HiveLogger.Error(this.ToString() + " ProcessHeartBeat: Client state null or offline: " + client);
274
275          return response;
276        }
277
278        client.NrOfFreeCores = hbData.FreeCores;
279        client.FreeMemory = hbData.FreeMemory;
280
281        // save timestamp of this heartbeat
282        HiveLogger.Info(this.ToString() + ": BEGIN Locking for Heartbeats");
283        heartbeatLock.EnterWriteLock();
284        HiveLogger.Info(this.ToString() + ": END Locked for Heartbeats");
285        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
286          lastHeartbeats[hbData.ClientId] = DateTime.Now;
287        } else {
288          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
289        }
290        heartbeatLock.ExitWriteLock();
291
292        // check if client has a free core for a new job
293        // if true, ask scheduler for a new job for this client
294        HiveLogger.Info(this.ToString() + ": BEGIN Looking for Client Jobs");
295        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData)) {
296          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
297        } else {
298          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
299        }
300        HiveLogger.Info(this.ToString() + ": END Looked for Client Jobs");
301        response.Success = true;
302        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
303
304        HiveLogger.Info(this.ToString() + ": BEGIN Processing Heartbeat Jobs");
305        processJobProcess(hbData, jobAdapter, clientAdapter, response);
306        HiveLogger.Info(this.ToString() + ": END Processed Heartbeat Jobs");
307
308        clientAdapter.Update(client);
309
310        tx.Commit();
311        HiveLogger.Info(this.ToString() + ": END Processed Heartbeat for Client " + hbData.ClientId);
312        return response;
313      }
314      catch (Exception ex) {
315        if (tx != null)
316          tx.Rollback();
317        throw ex;
318      }
319      finally {
320        if (session != null)
321          session.EndSession();
322      }
323    }
324
325    /// <summary>
326    /// Process the Job progress sent by a client
327    /// </summary>
328    /// <param name="hbData"></param>
329    /// <param name="jobAdapter"></param>
330    /// <param name="clientAdapter"></param>
331    /// <param name="response"></param>
332    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
333      HiveLogger.Info(this.ToString() + " processJobProcess: Started for Client " + hbData.ClientId);
334
335      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
336        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
337        if (jobsOfClient == null || jobsOfClient.Count == 0) {
338          response.Success = false;
339          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
340          HiveLogger.Error(this.ToString() + " processJobProcess: There is no job calculated by this user " + hbData.ClientId);
341          return;
342        }
343
344        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
345          Job curJob = jobAdapter.GetById(jobProgress.Key);
346          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
347            response.Success = false;
348            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
349            HiveLogger.Error(this.ToString() + " processJobProcess: There is no job calculated by this user " + hbData.ClientId + " Job: " + curJob);
350          } else if (curJob.State == State.abort) {
351            // a request to abort the job has been set
352            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
353            curJob.State = State.finished;
354          } else {
355            // save job progress
356            curJob.Percentage = jobProgress.Value;
357
358            if (curJob.State == State.requestSnapshot) {
359              // a request for a snapshot has been set
360              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
361              curJob.State = State.requestSnapshotSent;
362            }
363          }
364          jobAdapter.Update(curJob);
365        }
366        foreach (Job currJob in jobsOfClient) {
367          bool found = false;
368          foreach (Guid jobId in hbData.JobProgress.Keys) {
369            if (jobId == currJob.Id) {
370              found = true;
371              break;
372            }
373          }
374          if (!found) {
375            lock (newAssignedJobs) {
376              if (newAssignedJobs.ContainsKey(currJob.Id)) {
377                newAssignedJobs[currJob.Id]--;
378                HiveLogger.Error(this.ToString() + " processJobProcess: Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Client);
379                if (newAssignedJobs[currJob.Id] <= 0) {
380                  HiveLogger.Error(this.ToString() + " processJobProcess: Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Client);
381
382                  currJob.State = State.offline;
383                  jobAdapter.Update(currJob);
384
385                  response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
386
387                  newAssignedJobs.Remove(currJob.Id);
388                }
389              } else {
390                HiveLogger.Error(this.ToString() + " processJobProcess: Job ID wasn't with the heartbeats:  " + currJob);
391                currJob.State = State.offline;
392                jobAdapter.Update(currJob);
393              }
394            } // lock
395          } else {
396            lock (newAssignedJobs) {
397
398              if (newAssignedJobs.ContainsKey(currJob.Id)) {
399                HiveLogger.Info(this.ToString() + " processJobProcess: Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
400                newAssignedJobs.Remove(currJob.Id);
401              }
402            }
403          }
404        }
405      }
406    }
407
408    /// <summary>
409    /// if the client was told to pull a job he calls this method
410    /// the server selects a job and sends it to the client
411    /// </summary>
412    /// <param name="clientId"></param>
413    /// <returns></returns>
414    public ResponseSerializedJob SendSerializedJob(Guid clientId) {
415      ISession session = factory.GetSessionForCurrentThread();
416      ITransaction tx = null;
417
418      try {
419        IJobAdapter jobAdapter =
420          session.GetDataAdapter<Job, IJobAdapter>();
421
422        tx = session.BeginTransaction();
423
424        ResponseSerializedJob response = new ResponseSerializedJob();
425
426        Job job2Calculate = scheduler.GetNextJobForClient(clientId);
427        if (job2Calculate != null) {
428          SerializedJob computableJob =
429            jobAdapter.GetSerializedJob(job2Calculate.Id);
430
431          response.Job = computableJob;
432          response.Success = true;
433          HiveLogger.Info(this.ToString() + " SendSerializedJob: Job pulled: " + computableJob.JobInfo + " for user " + clientId);
434          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
435          lock (newAssignedJobs) {
436            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
437              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
438          }
439        } else {
440          HiveLogger.Info(this.ToString() + " SendSerializedJob: No more Jobs left for " + clientId);
441          response.Success = false;
442          response.Job = null;
443          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
444        }
445
446        tx.Commit();
447
448        return response;
449      }
450      catch (Exception ex) {
451        if (tx != null)
452          tx.Rollback();
453        throw ex;
454      }
455      finally {
456        if (session != null)
457          session.EndSession();
458      }
459    }
460
461    /// <summary>
462    /// if the client was told to pull a job he calls this method
463    /// the server selects a job and sends it to the client
464    /// </summary>
465    /// <param name="clientId"></param>
466    /// <returns></returns>
467    public ResponseJob SendJob(Guid clientId) {
468      ISession session = factory.GetSessionForCurrentThread();
469      ITransaction tx = null;
470
471      try {
472        IJobAdapter jobAdapter =
473          session.GetDataAdapter<Job, IJobAdapter>();
474
475        tx = session.BeginTransaction();
476
477        ResponseJob response = new ResponseJob();
478
479        Job job2Calculate = scheduler.GetNextJobForClient(clientId);
480        if (job2Calculate != null) {
481          response.Job = job2Calculate;
482          response.Success = true;
483          HiveLogger.Info(this.ToString() + " SendSerializedJob: Job pulled: " + job2Calculate + " for user " + clientId);
484          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
485          lock (newAssignedJobs) {
486            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
487              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
488          }
489        } else {
490          response.Success = false;
491          response.Job = null;
492          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
493          HiveLogger.Info(this.ToString() + " SendSerializedJob: No more Jobs left for " + clientId);
494        }
495
496        tx.Commit();
497
498        return response;
499      }
500      catch (Exception ex) {
501        if (tx != null)
502          tx.Rollback();
503        throw ex;
504      }
505      finally {
506        if (session != null)
507          session.EndSession();
508      }
509    }
510
511    public ResponseResultReceived ProcessJobResult(
512      Stream stream,
513      bool finished) {
514
515      HiveLogger.Info(this.ToString() + " ProcessJobResult: BEGIN Job received for Storage - main method:");
516
517      ISession session = factory.GetSessionForCurrentThread();
518      ITransaction tx = null;
519      Stream jobResultStream = null;
520      Stream jobStream = null;
521
522      try {
523        BinaryFormatter formatter =
524          new BinaryFormatter();
525
526        JobResult result =
527          (JobResult)formatter.Deserialize(stream);
528
529        //important - repeatable read isolation level is required here,
530        //otherwise race conditions could occur when writing the stream into the DB
531        //just removed TransactionIsolationLevel.RepeatableRead
532        tx = session.BeginTransaction();
533
534        ResponseResultReceived response =
535          ProcessJobResult(
536          result.ClientId,
537          result.JobId,
538          new byte[] { },
539          result.Percentage,
540          result.Exception,
541          finished);
542
543        if (response.Success) {
544          //second deserialize the BLOB
545          IJobResultsAdapter jobResultsAdapter =
546            session.GetDataAdapter<JobResult, IJobResultsAdapter>();
547
548          IJobAdapter jobAdapter =
549            session.GetDataAdapter<Job, IJobAdapter>();
550
551          jobResultStream =
552            jobResultsAdapter.GetSerializedJobResultStream(response.JobResultId, true);
553
554          jobStream =
555            jobAdapter.GetSerializedJobStream(result.JobId, true);
556
557          byte[] buffer = new byte[3024];
558          int read = 0;
559          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
560            jobResultStream.Write(buffer, 0, read);
561            jobStream.Write(buffer, 0, read);
562          }
563
564          jobResultStream.Close();
565          jobStream.Close();
566
567          tx.Commit();
568        }
569        HiveLogger.Info(this.ToString() + " ProcessJobResult: END Job received for Storage:");
570        return response;
571      }
572      catch (Exception ex) {
573        HiveLogger.Error(this.ToString() + " ProcessJobResult: Exception raised: " + ex);
574        if (tx != null)
575          tx.Rollback();
576        throw ex;
577      }
578      finally {
579        if (jobStream != null)
580          jobStream.Dispose();
581
582        if (jobResultStream != null)
583          jobResultStream.Dispose();
584
585        if (session != null)
586          session.EndSession();
587      }
588    }
589
590    private ResponseResultReceived ProcessJobResult(Guid clientId,
591      Guid jobId,
592      byte[] result,
593      double percentage,
594      Exception exception,
595      bool finished) {
596
597      HiveLogger.Info(this.ToString() + " ProcessJobResult: BEGIN Job received for Storage - SUB method: " + jobId);
598
599      ISession session = factory.GetSessionForCurrentThread();
600
601      ITransaction tx = null;
602
603      try {
604        IClientAdapter clientAdapter =
605          session.GetDataAdapter<ClientInfo, IClientAdapter>();
606        IJobAdapter jobAdapter =
607          session.GetDataAdapter<Job, IJobAdapter>();
608        IJobResultsAdapter jobResultAdapter =
609          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
610
611        //should fetch the existing transaction       
612        tx = session.BeginTransaction();
613
614        ResponseResultReceived response = new ResponseResultReceived();
615        ClientInfo client =
616          clientAdapter.GetById(clientId);
617
618        SerializedJob job =
619          new SerializedJob();
620
621        if (job != null) {
622          job.JobInfo =
623            jobAdapter.GetById(jobId);
624        }
625
626        if (job == null && job.JobInfo != null) {
627          response.Success = false;
628          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
629          response.JobId = jobId;
630
631          HiveLogger.Error(this.ToString() + " ProcessJobResult: No job with Id " + jobId);
632
633          tx.Rollback();
634          return response;
635        }
636        if (job.JobInfo.State == State.abort) {
637          response.Success = false;
638          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
639
640          HiveLogger.Error(this.ToString() + " ProcessJobResult: Job was aborted! " + job.JobInfo);
641
642          tx.Rollback();
643          return response;
644        }
645        if (job.JobInfo.Client == null) {
646          response.Success = false;
647          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
648          response.JobId = jobId;
649
650          HiveLogger.Error(this.ToString() + " ProcessJobResult: Job is not being calculated (client = null)! " + job.JobInfo);
651
652          tx.Rollback();
653          return response;
654        }
655        if (job.JobInfo.Client.Id != clientId) {
656          response.Success = false;
657          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
658          response.JobId = jobId;
659
660          HiveLogger.Error(this.ToString() + " ProcessJobResult: Wrong Client for this Job! " + job.JobInfo + ", Sending Client is: " + clientId);
661
662          tx.Rollback();
663          return response;
664        }
665        if (job.JobInfo.State == State.finished) {
666          response.Success = true;
667          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
668          response.JobId = jobId;
669
670          HiveLogger.Error(this.ToString() + " ProcessJobResult: Job already finished! " + job.JobInfo + ", Sending Client is: " + clientId);
671
672          tx.Rollback();
673          return response;
674        }
675        if (job.JobInfo.State == State.requestSnapshotSent) {
676          job.JobInfo.State = State.calculating;
677        }
678        if (job.JobInfo.State != State.calculating &&
679          job.JobInfo.State != State.pending) {
680          response.Success = false;
681          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
682          response.JobId = jobId;
683
684          HiveLogger.Error(this.ToString() + " ProcessJobResult: Wrong Job State, job is: " + job.JobInfo);
685
686          tx.Rollback();
687          return response;
688        }
689        job.JobInfo.Percentage = percentage;
690
691        if (finished) {
692          job.JobInfo.State = State.finished;
693        }
694
695        job.SerializedJobData = result;
696        jobAdapter.UpdateSerializedJob(job);
697
698        List<JobResult> jobResults = new List<JobResult>(
699          jobResultAdapter.GetResultsOf(job.JobInfo.Id));
700        foreach (JobResult currentResult in jobResults)
701          jobResultAdapter.Delete(currentResult);
702
703        SerializedJobResult serializedjobResult =
704          new SerializedJobResult();
705        JobResult jobResult = new JobResult();
706        jobResult.ClientId = client.Id;
707        jobResult.JobId = job.JobInfo.Id;
708        jobResult.Percentage = percentage;
709        jobResult.Exception = exception;
710        jobResult.DateFinished = DateTime.Now;
711        serializedjobResult.JobResult = jobResult;
712        serializedjobResult.SerializedJobResultData =
713          result;
714
715        jobResultAdapter.UpdateSerializedJobResult(serializedjobResult);
716
717        response.Success = true;
718        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
719        response.JobId = jobId;
720        response.finished = finished;
721        response.JobResultId = jobResult.Id;
722
723        //Removed for Testing
724        //tx.Commit();
725        HiveLogger.Info(this.ToString() + " ProcessJobResult: END Job received for Storage - SUB method: " + jobId);
726        return response;
727      }
728      catch (Exception ex) {
729        if (tx != null)
730          tx.Rollback();
731        throw ex;
732      }
733      finally {
734        if (session != null)
735          session.EndSession();
736      }
737    }
738
739
740    /// <summary>
741    /// the client can send job results during calculating
742    /// and will send a final job result when he finished calculating
743    /// these job results will be stored in the database
744    /// </summary>
745    /// <param name="clientId"></param>
746    /// <param name="jobId"></param>
747    /// <param name="result"></param>
748    /// <param name="exception"></param>
749    /// <param name="finished"></param>
750    /// <returns></returns>
751    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
752      Guid jobId,
753      byte[] result,
754      double percentage,
755      Exception exception) {
756
757      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
758    }
759
760
761    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
762      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
763    }
764
765    /// <summary>
766    /// when a client logs out the state will be set
767    /// and the entry in the last hearbeats dictionary will be removed
768    /// </summary>
769    /// <param name="clientId"></param>
770    /// <returns></returns>                       
771    public Response Logout(Guid clientId) {
772
773      HiveLogger.Info("Client logged out " + clientId);
774
775      ISession session = factory.GetSessionForCurrentThread();
776      ITransaction tx = null;
777
778      try {
779        IClientAdapter clientAdapter =
780          session.GetDataAdapter<ClientInfo, IClientAdapter>();
781        IJobAdapter jobAdapter =
782          session.GetDataAdapter<Job, IJobAdapter>();
783
784        tx = session.BeginTransaction();
785
786        Response response = new Response();
787
788        heartbeatLock.EnterWriteLock();
789        if (lastHeartbeats.ContainsKey(clientId))
790          lastHeartbeats.Remove(clientId);
791        heartbeatLock.ExitWriteLock();
792
793        ClientInfo client = clientAdapter.GetById(clientId);
794        if (client == null) {
795          response.Success = false;
796          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
797          return response;
798        }
799        if (client.State == State.calculating) {
800          // check wich job the client was calculating and reset it
801          ICollection<Job> jobsOfClient = jobAdapter.GetJobsOf(client);
802          foreach (Job job in jobsOfClient) {
803            if (job.State != State.finished)
804              jobManager.ResetJobsDependingOnResults(job);
805          }
806        }
807
808        client.State = State.offline;
809        clientAdapter.Update(client);
810
811        response.Success = true;
812        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
813
814        tx.Commit();
815        return response;
816      }
817      catch (Exception ex) {
818        if (tx != null)
819          tx.Rollback();
820        throw ex;
821      }
822      finally {
823        if (session != null)
824          session.EndSession();
825      }
826    }
827
828    /// <summary>
829    /// If a client goes offline and restores a job he was calculating
830    /// he can ask the client if he still needs the job result
831    /// </summary>
832    /// <param name="jobId"></param>
833    /// <returns></returns>
834    public Response IsJobStillNeeded(Guid jobId) {
835      ISession session = factory.GetSessionForCurrentThread();
836      ITransaction tx = null;
837
838      try {
839        IJobAdapter jobAdapter =
840          session.GetDataAdapter<Job, IJobAdapter>();
841        tx = session.BeginTransaction();
842
843        Response response = new Response();
844        Job job = jobAdapter.GetById(jobId);
845        if (job == null) {
846          response.Success = false;
847          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
848          HiveLogger.Error(this.ToString() + " IsJobStillNeeded: Job doesn't exist (anymore)! " + jobId);
849          return response;
850        }
851        if (job.State == State.finished) {
852          response.Success = true;
853          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
854          HiveLogger.Error(this.ToString() + " IsJobStillNeeded: already finished! " + job);
855          return response;
856        }
857        job.State = State.pending;
858        lock (pendingJobs) {
859          pendingJobs.Add(job.Id, PENDING_TIMEOUT);
860        }
861
862        jobAdapter.Update(job);
863
864        response.Success = true;
865        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
866        tx.Commit();
867        return response;
868      }
869      catch (Exception ex) {
870        if (tx != null)
871          tx.Rollback();
872        throw ex;
873      }
874      finally {
875        if (session != null)
876          session.EndSession();
877      }
878    }
879
880    public ResponsePlugin SendPlugins(List<HivePluginInfo> pluginList) {
881      ResponsePlugin response = new ResponsePlugin();
882      foreach (HivePluginInfo pluginInfo in pluginList) {
883        // TODO: BuildDate deleted, not needed???
884        // TODO: Split version to major, minor and revision number
885        foreach (IPluginDescription currPlugin in ApplicationManager.Manager.Plugins) {
886          if (currPlugin.Name == pluginInfo.Name) {
887
888            CachedHivePluginInfo currCachedPlugin = new CachedHivePluginInfo {
889              Name = currPlugin.Name,
890              Version = currPlugin.Version.ToString(),
891              BuildDate = currPlugin.BuildDate
892            };
893
894            foreach (string fileName in from file in currPlugin.Files select file.Name) {
895              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(fileName));
896            }
897            response.Plugins.Add(currCachedPlugin);
898          }
899        }
900      }
901      response.Success = true;
902      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
903
904      return response;
905
906    }
907
908    #endregion
909  }
910}
Note: See TracBrowser for help on using the repository browser.