Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 3222

Last change on this file since 3222 was 3222, checked in by kgrading, 14 years ago

fixed nullpointer (#830)

File size: 31.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39using System.Runtime.Serialization.Formatters.Binary;
40using HeuristicLab.Tracing;
41using Linq = HeuristicLab.Hive.Server.LINQDataAccess;
42using System.Transactions;
43using HeuristicLab.Hive.Server.LINQDataAccess;
44
45namespace HeuristicLab.Hive.Server.Core {
46  /// <summary>
47  /// The ClientCommunicator manages the whole communication with the client
48  /// </summary>
49  public class ClientCommunicator : IClientCommunicator,
50    IInternalClientCommunicator {
51    private static Dictionary<Guid, DateTime> lastHeartbeats =
52      new Dictionary<Guid, DateTime>();
53    private static Dictionary<Guid, int> newAssignedJobs =
54      new Dictionary<Guid, int>();
55    private static Dictionary<Guid, int> pendingJobs =
56      new Dictionary<Guid, int>();
57
58    private static ReaderWriterLockSlim heartbeatLock =
59      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
60
61    //private ISessionFactory factory;
62    private ILifecycleManager lifecycleManager;
63    private IInternalJobManager jobManager;
64    private IScheduler scheduler;
65
66    private static int PENDING_TIMEOUT = 100;
67
68    /// <summary>
69    /// Initialization of the Adapters to the database
70    /// Initialization of Eventhandler for the lifecycle management
71    /// Initialization of lastHearbeats Dictionary
72    /// </summary>
73    public ClientCommunicator() {
74      //factory = ServiceLocator.GetSessionFactory();
75
76      lifecycleManager = ServiceLocator.GetLifecycleManager();
77      jobManager = ServiceLocator.GetJobManager() as
78        IInternalJobManager;
79      scheduler = ServiceLocator.GetScheduler();
80
81      lifecycleManager.RegisterHeartbeat(
82        new EventHandler(lifecycleManager_OnServerHeartbeat));
83    }
84
85    /// <summary>
86    /// Check if online clients send their hearbeats
87    /// if not -> set them offline and check if they where calculating a job
88    /// </summary>
89    /// <param name="sender"></param>
90    /// <param name="e"></param>
91    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
92      HiveLogger.Info(this.ToString() + ": Server Heartbeat ticked");
93
94      using (TransactionScope scope = new TransactionScope()) {
95
96        List<ClientDto> allClients = new List<ClientDto>(DaoLocator.ClientDao.FindAll());
97
98        foreach (ClientDto client in allClients) {
99          if (client.State != State.offline && client.State != State.nullState) {
100            heartbeatLock.EnterUpgradeableReadLock();
101
102            if (!lastHeartbeats.ContainsKey(client.Id)) {
103              HiveLogger.Info(this.ToString() + ": Client " + client.Id +
104                              " wasn't offline but hasn't sent heartbeats - setting offline");
105              client.State = State.offline;
106              DaoLocator.ClientDao.Update(client);
107              HiveLogger.Info(this.ToString() + ": Client " + client.Id +
108                              " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
109              foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) {
110                //maybe implementa n additional Watchdog? Till then, just set them offline..
111                DaoLocator.JobDao.SetJobOffline(job);               
112                //jobManager.ResetJobsDependingOnResults(job);
113              }
114            } else {
115              DateTime lastHbOfClient = lastHeartbeats[client.Id];
116
117              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
118              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
119              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
120                // if client calculated jobs, the job must be reset
121                HiveLogger.Info(this.ToString() + ": Client timed out and is on RESET");
122                foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) {
123                  jobManager.ResetJobsDependingOnResults(job);
124                  lock (newAssignedJobs) {
125                    if (newAssignedJobs.ContainsKey(job.Id))
126                      newAssignedJobs.Remove(job.Id);
127                  }
128                }
129
130                // client must be set offline
131                client.State = State.offline;
132
133                //clientAdapter.Update(client);
134                DaoLocator.ClientDao.Update(client);
135
136                heartbeatLock.EnterWriteLock();
137                lastHeartbeats.Remove(client.Id);
138                heartbeatLock.ExitWriteLock();
139              }
140            }
141
142            heartbeatLock.ExitUpgradeableReadLock();
143          } else {
144            //TODO: RLY neccesary?
145            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Shouldn't have offline or nullstate, has " + client.State);
146            heartbeatLock.EnterWriteLock();
147            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Resetting all his jobs");
148            if (lastHeartbeats.ContainsKey(client.Id))
149              lastHeartbeats.Remove(client.Id);
150            foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) {
151              jobManager.ResetJobsDependingOnResults(job);
152            }
153            heartbeatLock.ExitWriteLock();
154          }
155        }
156        CheckForPendingJobs();
157        DaoLocator.DestroyContext();
158        scope.Complete();
159      }
160    }
161
162    private void CheckForPendingJobs() {
163      IList<JobDto> pendingJobsInDB = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(State.pending));
164
165      foreach (JobDto currJob in pendingJobsInDB) {
166        lock (pendingJobs) {
167          if (pendingJobs.ContainsKey(currJob.Id)) {
168            if (pendingJobs[currJob.Id] <= 0) {
169              currJob.State = State.offline;
170              DaoLocator.JobDao.Update(currJob);             
171            } else {
172              pendingJobs[currJob.Id]--;
173            }
174          }
175        }
176      }
177    }
178
179    #region IClientCommunicator Members
180
181    /// <summary>
182    /// Login process for the client
183    /// A hearbeat entry is created as well (login is the first hearbeat)
184    /// </summary>
185    /// <param name="clientInfo"></param>
186    /// <returns></returns>
187    public Response Login(ClientDto clientInfo) {
188      Response response = new Response();
189
190      heartbeatLock.EnterWriteLock();
191      if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
192        lastHeartbeats[clientInfo.Id] = DateTime.Now;
193      } else {
194        lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
195      }
196      heartbeatLock.ExitWriteLock();
197
198      ClientDto dbClient = DaoLocator.ClientDao.FindById(clientInfo.Id);
199
200      //Really set offline?
201      //Reconnect issues with the currently calculating jobs
202      clientInfo.State = State.idle;
203      clientInfo.CalendarSyncStatus = dbClient != null ? dbClient.CalendarSyncStatus : CalendarState.NotAllowedToFetch;
204
205      if (dbClient == null)
206        DaoLocator.ClientDao.Insert(clientInfo);
207      else
208        DaoLocator.ClientDao.Update(clientInfo);
209      response.Success = true;
210      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
211      return response;
212    }
213
214    public ResponseCalendar GetCalendar(Guid clientId) {
215      ResponseCalendar response = new ResponseCalendar();
216
217      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
218      if(client == null) {
219        response.Success = false;
220        response.StatusMessage = ApplicationConstants.RESPONSE_CLIENT_RESOURCE_NOT_FOUND;
221        return response;
222      }
223     
224      response.ForceFetch = (client.CalendarSyncStatus == CalendarState.ForceFetch);
225     
226      IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForClient(client);
227      if (appointments.Count() == 0) {
228        response.StatusMessage = ApplicationConstants.RESPONSE_UPTIMECALENDAR_NO_CALENDAR_FOUND;
229        response.Success = false;
230      } else {
231        response.Success = true;
232        response.Appointments = appointments;
233      }
234
235      client.CalendarSyncStatus = CalendarState.Fetched;
236      DaoLocator.ClientDao.Update(client);
237      return response;
238    }
239
240    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
241      Response response = new Response();     
242      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
243      if (client == null) {
244        response.Success = false;
245        response.StatusMessage = ApplicationConstants.RESPONSE_CLIENT_RESOURCE_NOT_FOUND;
246        return response;
247      }
248     
249      client.CalendarSyncStatus = state;
250      DaoLocator.ClientDao.Update(client);
251     
252      response.Success = true;
253      response.StatusMessage = ApplicationConstants.RESPONSE_UPTIMECALENDAR_STATUS_UPDATED;
254     
255      return response;
256    }
257
258    /// <summary>
259    /// The client has to send regulary heartbeats
260    /// this hearbeats will be stored in the heartbeats dictionary
261    /// check if there is work for the client and send the client a response if he should pull a job
262    /// </summary>
263    /// <param name="hbData"></param>
264    /// <returns></returns>
265    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
266      HiveLogger.Debug(this.ToString() + ": BEGIN Processing Heartbeat for Client " + hbData.ClientId);
267      HiveLogger.Debug(this.ToString() + ": BEGIN Fetching Adapters");
268      HiveLogger.Debug(this.ToString() + ": END Fetched Adapters");
269      HiveLogger.Debug(this.ToString() + ": BEGIN Starting Transaction");
270
271      HiveLogger.Debug(this.ToString() + ": END Started Transaction");
272
273      ResponseHB response = new ResponseHB();
274      response.ActionRequest = new List<MessageContainer>();
275
276      HiveLogger.Debug(this.ToString() + ": BEGIN Started Client Fetching");
277      ClientDto client = DaoLocator.ClientDao.FindById(hbData.ClientId);
278      HiveLogger.Debug(this.ToString() + ": END Finished Client Fetching");
279      // check if the client is logged in
280      if (client.State == State.offline || client.State == State.nullState) {
281        response.Success = false;
282        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
283        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
284
285        HiveLogger.Error(this.ToString() + " ProcessHeartBeat: Client state null or offline: " + client);
286
287        return response;
288      }
289
290      client.NrOfFreeCores = hbData.FreeCores;
291      client.FreeMemory = hbData.FreeMemory;
292
293      // save timestamp of this heartbeat
294      HiveLogger.Debug(this.ToString() + ": BEGIN Locking for Heartbeats");
295      heartbeatLock.EnterWriteLock();
296      HiveLogger.Debug(this.ToString() + ": END Locked for Heartbeats");
297      if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
298        lastHeartbeats[hbData.ClientId] = DateTime.Now;
299      } else {
300        lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
301      }
302      heartbeatLock.ExitWriteLock();
303
304      HiveLogger.Debug(this.ToString() + ": BEGIN Processing Heartbeat Jobs");
305      processJobProcess(hbData, response);
306      HiveLogger.Debug(this.ToString() + ": END Processed Heartbeat Jobs");
307
308      //check if new Cal must be loaded
309      if (client.CalendarSyncStatus == CalendarState.Fetch || client.CalendarSyncStatus == CalendarState.ForceFetch) {
310        response.Success = true;
311        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_FETCH_OR_FORCEFETCH_CALENDAR;
312        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
313       
314        //client.CalendarSyncStatus = CalendarState.Fetching;
315       
316        HiveLogger.Info(this.ToString() + " fetch or forcefetch sent");       
317      }
318
319      // check if client has a free core for a new job
320      // if true, ask scheduler for a new job for this client
321      HiveLogger.Debug(this.ToString() + ": BEGIN Looking for Client Jobs");
322      if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData)) {
323        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
324      } else {
325        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
326      }
327      HiveLogger.Debug(this.ToString() + ": END Looked for Client Jobs");
328      response.Success = true;
329      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
330
331      DaoLocator.ClientDao.Update(client);
332
333      //tx.Commit();
334      HiveLogger.Debug(this.ToString() + ": END Processed Heartbeat for Client " + hbData.ClientId);
335      return response;
336    }
337
338    /// <summary>
339    /// Process the Job progress sent by a client
340    /// </summary>
341    /// <param name="hbData"></param>
342    /// <param name="jobAdapter"></param>
343    /// <param name="clientAdapter"></param>
344    /// <param name="response"></param>
345    private void processJobProcess(HeartBeatData hbData, ResponseHB response) {
346      HiveLogger.Info(this.ToString() + " processJobProcess: Started for Client " + hbData.ClientId);
347      List<JobDto> jobsOfClient = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfClient(DaoLocator.ClientDao.FindById(hbData.ClientId)));
348      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {       
349        if (jobsOfClient == null || jobsOfClient.Count == 0) {
350          response.Success = false;
351          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
352          HiveLogger.Error(this.ToString() + " processJobProcess: There is no job calculated by this user " + hbData.ClientId);
353          return;
354        }
355
356        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
357          JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
358          curJob.Client = DaoLocator.ClientDao.GetClientForJob(curJob.Id);
359          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
360            response.Success = false;
361            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
362            HiveLogger.Error(this.ToString() + " processJobProcess: There is no job calculated by this user " + hbData.ClientId + " Job: " + curJob);
363          } else if (curJob.State == State.abort) {
364            // a request to abort the job has been set
365            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
366            curJob.State = State.finished;
367          } else {
368            // save job progress
369            curJob.Percentage = jobProgress.Value;
370
371            if (curJob.State == State.requestSnapshot) {
372              // a request for a snapshot has been set
373              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
374              curJob.State = State.requestSnapshotSent;
375            }
376          }
377          DaoLocator.JobDao.Update(curJob);
378        }
379       }
380      foreach (JobDto currJob in jobsOfClient) {
381        bool found = false;
382        if(hbData.JobProgress != null) {
383          foreach (Guid jobId in hbData.JobProgress.Keys) {
384            if (jobId == currJob.Id) {
385              found = true;
386              break;
387            }
388          }
389        }
390        if (!found) {
391          lock (newAssignedJobs) {
392            if (newAssignedJobs.ContainsKey(currJob.Id)) {
393              newAssignedJobs[currJob.Id]--;
394              HiveLogger.Error(this.ToString() + " processJobProcess: Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Client);
395              if (newAssignedJobs[currJob.Id] <= 0) {
396                HiveLogger.Error(this.ToString() + " processJobProcess: Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Client);
397
398                currJob.State = State.offline;
399                DaoLocator.JobDao.Update(currJob);
400
401                response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
402
403                newAssignedJobs.Remove(currJob.Id);
404              }
405            } else {
406              HiveLogger.Error(this.ToString() + " processJobProcess: Job ID wasn't with the heartbeats:  " + currJob);
407              currJob.State = State.offline;
408              DaoLocator.JobDao.Update(currJob);
409            }
410          } // lock
411        } else {
412          lock (newAssignedJobs) {
413
414            if (newAssignedJobs.ContainsKey(currJob.Id)) {
415              HiveLogger.Info(this.ToString() + " processJobProcess: Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
416              newAssignedJobs.Remove(currJob.Id);
417            }
418          }
419        }
420      }
421    }
422
423    /// <summary>
424    /// if the client was told to pull a job he calls this method
425    /// the server selects a job and sends it to the client
426    /// </summary>
427    /// <param name="clientId"></param>
428    /// <returns></returns>
429    /*public ResponseSerializedJob SendSerializedJob(Guid clientId) {
430      ISession session = factory.GetSessionForCurrentThread();
431      ITransaction tx = null;
432
433      try {
434        IJobAdapter jobAdapter =
435          session.GetDataAdapter<JobDto, IJobAdapter>();
436
437        tx = session.BeginTransaction();
438
439        ResponseSerializedJob response = new ResponseSerializedJob();
440
441        JobDto job2Calculate = scheduler.GetNextJobForClient(clientId);
442        if (job2Calculate != null) {
443          SerializedJob computableJob =
444            jobAdapter.GetSerializedJob(job2Calculate.Id);
445
446          response.Job = computableJob;
447          response.Success = true;
448          HiveLogger.Info(this.ToString() + " SendSerializedJob: Job pulled: " + computableJob.JobInfo + " for user " + clientId);                     
449          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
450          lock (newAssignedJobs) {
451            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
452              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
453          }
454        } else {
455          HiveLogger.Info(this.ToString() + " SendSerializedJob: No more Jobs left for " + clientId);                     
456          response.Success = false;
457          response.Job = null;
458          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
459        }
460
461        tx.Commit();
462
463        return response;
464      }
465      catch (Exception ex) {
466        if (tx != null)
467          tx.Rollback();
468        throw ex;
469      }
470      finally {
471        if (session != null)
472          session.EndSession();
473      }
474    }     */
475
476    /// <summary>
477    /// if the client was told to pull a job he calls this method
478    /// the server selects a job and sends it to the client
479    /// </summary>
480    /// <param name="clientId"></param>
481    /// <returns></returns>
482    public ResponseJob SendJob(Guid clientId) {
483
484      ResponseJob response = new ResponseJob();
485
486      JobDto job2Calculate = scheduler.GetNextJobForClient(clientId);
487      if (job2Calculate != null) {
488        response.Job = job2Calculate;
489        response.Success = true;
490        HiveLogger.Info(this.ToString() + " SendSerializedJob: Job pulled: " + job2Calculate + " for user " + clientId);
491        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
492        lock (newAssignedJobs) {
493          if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
494            newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
495        }
496      } else {
497        response.Success = false;
498        response.Job = null;
499        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
500        HiveLogger.Info(this.ToString() + " SendSerializedJob: No more Jobs left for " + clientId);
501      }
502
503
504
505      return response;
506    }
507
508    public ResponseResultReceived ProcessJobResult(
509      Stream stream,
510      bool finished) {
511
512      HiveLogger.Info(this.ToString() + " ProcessJobResult: BEGIN Job received for Storage - main method:");
513
514
515      Stream jobResultStream = null;
516      Stream jobStream = null;
517
518      //try {
519      BinaryFormatter formatter =
520        new BinaryFormatter();
521
522      JobResult result =
523        (JobResult)formatter.Deserialize(stream);
524
525      //important - repeatable read isolation level is required here,
526      //otherwise race conditions could occur when writing the stream into the DB
527      //just removed TransactionIsolationLevel.RepeatableRead
528      //tx = session.BeginTransaction();
529
530      ResponseResultReceived response =
531        ProcessJobResult(
532        result.ClientId,
533        result.JobId,
534        new byte[] { },
535        result.Percentage,
536        result.Exception,
537        finished);
538
539      if (response.Success) {
540        jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
541
542        byte[] buffer = new byte[3024];
543        int read = 0;
544        while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
545          jobStream.Write(buffer, 0, read);
546        }
547        jobStream.Close();
548      }
549      HiveLogger.Info(this.ToString() + " ProcessJobResult: END Job received for Storage:");
550      return response;
551    }
552
553
554    private ResponseResultReceived ProcessJobResult(Guid clientId,
555      Guid jobId,
556      byte[] result,
557      double percentage,
558      Exception exception,
559      bool finished) {
560
561      HiveLogger.Info(this.ToString() + " ProcessJobResult: BEGIN Job received for Storage - SUB method: " + jobId);
562
563      ResponseResultReceived response = new ResponseResultReceived();
564      ClientDto client =
565        DaoLocator.ClientDao.FindById(clientId);
566
567      SerializedJob job =
568        new SerializedJob();
569
570      if (job != null) {
571        job.JobInfo =
572          DaoLocator.JobDao.FindById(jobId);
573        job.JobInfo.Client = job.JobInfo.Client = DaoLocator.ClientDao.GetClientForJob(jobId);
574      }
575
576      if (job == null && job.JobInfo != null) {
577        response.Success = false;
578        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
579        response.JobId = jobId;
580
581        HiveLogger.Error(this.ToString() + " ProcessJobResult: No job with Id " + jobId);
582
583        //tx.Rollback();
584        return response;
585      }
586      if (job.JobInfo.State == State.abort) {
587        response.Success = false;
588        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
589
590        HiveLogger.Error(this.ToString() + " ProcessJobResult: Job was aborted! " + job.JobInfo);
591
592        //tx.Rollback();
593        return response;
594      }
595      if (job.JobInfo.Client == null) {
596        response.Success = false;
597        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
598        response.JobId = jobId;
599
600        HiveLogger.Error(this.ToString() + " ProcessJobResult: Job is not being calculated (client = null)! " + job.JobInfo);
601
602        //tx.Rollback();
603        return response;
604      }
605      if (job.JobInfo.Client.Id != clientId) {
606        response.Success = false;
607        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
608        response.JobId = jobId;
609
610        HiveLogger.Error(this.ToString() + " ProcessJobResult: Wrong Client for this Job! " + job.JobInfo + ", Sending Client is: " + clientId);
611
612        //tx.Rollback();
613        return response;
614      }
615      if (job.JobInfo.State == State.finished) {
616        response.Success = true;
617        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
618        response.JobId = jobId;
619
620        HiveLogger.Error(this.ToString() + " ProcessJobResult: Job already finished! " + job.JobInfo + ", Sending Client is: " + clientId);
621
622        //tx.Rollback();
623        return response;
624      }
625      if (job.JobInfo.State == State.requestSnapshotSent) {
626        job.JobInfo.State = State.calculating;
627      }
628      if (job.JobInfo.State != State.calculating &&
629        job.JobInfo.State != State.pending) {
630        response.Success = false;
631        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
632        response.JobId = jobId;
633
634        HiveLogger.Error(this.ToString() + " ProcessJobResult: Wrong Job State, job is: " + job.JobInfo);
635
636        //tx.Rollback();
637        return response;
638      }
639      job.JobInfo.Percentage = percentage;
640
641      if (finished) {
642        job.JobInfo.State = State.finished;
643      }
644
645      job.SerializedJobData = result;
646
647      DaoLocator.JobDao.Update(job.JobInfo);
648
649      response.Success = true;
650      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
651      response.JobId = jobId;
652      response.finished = finished;
653
654      HiveLogger.Info(this.ToString() + " ProcessJobResult: END Job received for Storage - SUB method: " + jobId);
655      return response;
656
657    }
658
659
660    /// <summary>
661    /// the client can send job results during calculating
662    /// and will send a final job result when he finished calculating
663    /// these job results will be stored in the database
664    /// </summary>
665    /// <param name="clientId"></param>
666    /// <param name="jobId"></param>
667    /// <param name="result"></param>
668    /// <param name="exception"></param>
669    /// <param name="finished"></param>
670    /// <returns></returns>
671    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
672      Guid jobId,
673      byte[] result,
674      double percentage,
675      Exception exception) {
676
677      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
678    }
679
680
681    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
682      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
683    }
684
685    /// <summary>
686    /// when a client logs out the state will be set
687    /// and the entry in the last hearbeats dictionary will be removed
688    /// </summary>
689    /// <param name="clientId"></param>
690    /// <returns></returns>                       
691    public Response Logout(Guid clientId) {
692
693      HiveLogger.Info("Client logged out " + clientId);
694     
695      Response response = new Response();
696
697      heartbeatLock.EnterWriteLock();
698      if (lastHeartbeats.ContainsKey(clientId))
699        lastHeartbeats.Remove(clientId);
700      heartbeatLock.ExitWriteLock();
701
702      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
703      if (client == null) {
704        response.Success = false;
705        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
706        return response;
707      }
708      if (client.State == State.calculating) {
709        // check wich job the client was calculating and reset it
710        IEnumerable<JobDto> jobsOfClient = DaoLocator.JobDao.FindActiveJobsOfClient(client);
711        foreach (JobDto job in jobsOfClient) {
712          if (job.State != State.finished)
713            jobManager.ResetJobsDependingOnResults(job);
714        }
715      }
716
717      client.State = State.offline;
718      DaoLocator.ClientDao.Update(client);
719
720      response.Success = true;
721      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
722
723      return response;
724    }
725
726    /// <summary>
727    /// If a client goes offline and restores a job he was calculating
728    /// he can ask the client if he still needs the job result
729    /// </summary>
730    /// <param name="jobId"></param>
731    /// <returns></returns>
732    public Response IsJobStillNeeded(Guid jobId) {
733      Response response = new Response();
734      JobDto job = DaoLocator.JobDao.FindById(jobId);
735      if (job == null) {
736        response.Success = false;
737        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
738        HiveLogger.Error(this.ToString() + " IsJobStillNeeded: Job doesn't exist (anymore)! " + jobId);
739        return response;
740      }
741      if (job.State == State.finished) {
742        response.Success = true;
743        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
744        HiveLogger.Error(this.ToString() + " IsJobStillNeeded: already finished! " + job);
745        return response;
746      }
747      job.State = State.pending;
748      lock (pendingJobs) {
749        pendingJobs.Add(job.Id, PENDING_TIMEOUT);
750      }
751
752      DaoLocator.JobDao.Update(job);
753
754      response.Success = true;
755      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
756      return response;
757    }
758
759    public ResponsePlugin SendPlugins(List<HivePluginInfoDto> pluginList) {
760      ResponsePlugin response = new ResponsePlugin();
761      foreach (HivePluginInfoDto pluginInfo in pluginList) {
762        // TODO: BuildDate deleted, not needed???
763        // TODO: Split version to major, minor and revision number
764        foreach (IPluginDescription currPlugin in ApplicationManager.Manager.Plugins) {
765          if (currPlugin.Name == pluginInfo.Name) {
766
767            CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
768              Name = currPlugin.Name,
769              Version = currPlugin.Version.ToString(),
770              BuildDate = currPlugin.BuildDate
771            };
772
773            foreach (string fileName in from file in currPlugin.Files select file.Name) {
774              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(fileName));
775            }
776            response.Plugins.Add(currCachedPlugin);
777          }
778        }
779      }
780      response.Success = true;
781      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
782
783      return response;
784
785    }
786
787    #endregion
788  }
789}
Note: See TracBrowser for help on using the repository browser.