Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 3203

Last change on this file since 3203 was 3203, checked in by kgrading, 14 years ago

implemented the server on the client, using push & force push, added refresh buttons, added auto calender methods that traverse the tree... (#908)

File size: 30.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39using System.Runtime.Serialization.Formatters.Binary;
40using HeuristicLab.Tracing;
41using Linq = HeuristicLab.Hive.Server.LINQDataAccess;
42using System.Transactions;
43
44namespace HeuristicLab.Hive.Server.Core {
45  /// <summary>
46  /// The ClientCommunicator manages the whole communication with the client
47  /// </summary>
48  public class ClientCommunicator : IClientCommunicator,
49    IInternalClientCommunicator {
50    private static Dictionary<Guid, DateTime> lastHeartbeats =
51      new Dictionary<Guid, DateTime>();
52    private static Dictionary<Guid, int> newAssignedJobs =
53      new Dictionary<Guid, int>();
54    private static Dictionary<Guid, int> pendingJobs =
55      new Dictionary<Guid, int>();
56
57    private static ReaderWriterLockSlim heartbeatLock =
58      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
59
60    //private ISessionFactory factory;
61    private ILifecycleManager lifecycleManager;
62    private IInternalJobManager jobManager;
63    private IScheduler scheduler;
64
65    private static int PENDING_TIMEOUT = 100;
66
67    /// <summary>
68    /// Initialization of the Adapters to the database
69    /// Initialization of Eventhandler for the lifecycle management
70    /// Initialization of lastHearbeats Dictionary
71    /// </summary>
72    public ClientCommunicator() {
73      //factory = ServiceLocator.GetSessionFactory();
74
75      lifecycleManager = ServiceLocator.GetLifecycleManager();
76      jobManager = ServiceLocator.GetJobManager() as
77        IInternalJobManager;
78      scheduler = ServiceLocator.GetScheduler();
79
80      lifecycleManager.RegisterHeartbeat(
81        new EventHandler(lifecycleManager_OnServerHeartbeat));
82    }
83
84    /// <summary>
85    /// Check if online clients send their hearbeats
86    /// if not -> set them offline and check if they where calculating a job
87    /// </summary>
88    /// <param name="sender"></param>
89    /// <param name="e"></param>
90    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
91      HiveLogger.Info(this.ToString() + ": Server Heartbeat ticked");
92
93      using (TransactionScope scope = new TransactionScope()) {
94
95        List<ClientDto> allClients = new List<ClientDto>(DaoLocator.ClientDao.FindAll());
96
97        foreach (ClientDto client in allClients) {
98          if (client.State != State.offline && client.State != State.nullState) {
99            heartbeatLock.EnterUpgradeableReadLock();
100
101            if (!lastHeartbeats.ContainsKey(client.Id)) {
102              HiveLogger.Info(this.ToString() + ": Client " + client.Id +
103                              " wasn't offline but hasn't sent heartbeats - setting offline");
104              client.State = State.offline;
105              DaoLocator.ClientDao.Update(client);
106              HiveLogger.Info(this.ToString() + ": Client " + client.Id +
107                              " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
108              foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) {
109                jobManager.ResetJobsDependingOnResults(job);
110              }
111            } else {
112              DateTime lastHbOfClient = lastHeartbeats[client.Id];
113
114              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
115              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
116              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
117                // if client calculated jobs, the job must be reset
118                HiveLogger.Info(this.ToString() + ": Client timed out and is on RESET");
119                foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) {
120                  jobManager.ResetJobsDependingOnResults(job);
121                  lock (newAssignedJobs) {
122                    if (newAssignedJobs.ContainsKey(job.Id))
123                      newAssignedJobs.Remove(job.Id);
124                  }
125                }
126
127                // client must be set offline
128                client.State = State.offline;
129
130                //clientAdapter.Update(client);
131                DaoLocator.ClientDao.Update(client);
132
133                heartbeatLock.EnterWriteLock();
134                lastHeartbeats.Remove(client.Id);
135                heartbeatLock.ExitWriteLock();
136              }
137            }
138
139            heartbeatLock.ExitUpgradeableReadLock();
140          } else {
141            //TODO: RLY neccesary?
142            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Shouldn't have offline or nullstate, has " + client.State);
143            heartbeatLock.EnterWriteLock();
144            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Resetting all his jobs");
145            if (lastHeartbeats.ContainsKey(client.Id))
146              lastHeartbeats.Remove(client.Id);
147            foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) {
148              jobManager.ResetJobsDependingOnResults(job);
149            }
150            heartbeatLock.ExitWriteLock();
151          }
152        }
153        CheckForPendingJobs();
154        DaoLocator.DestroyContext();
155        scope.Complete();
156      }
157    }
158
159    private void CheckForPendingJobs() {
160      IList<JobDto> pendingJobsInDB = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(State.pending));
161
162      foreach (JobDto currJob in pendingJobsInDB) {
163        lock (pendingJobs) {
164          if (pendingJobs.ContainsKey(currJob.Id)) {
165            if (pendingJobs[currJob.Id] <= 0) {
166              currJob.State = State.offline;
167              DaoLocator.JobDao.Update(currJob);             
168            } else {
169              pendingJobs[currJob.Id]--;
170            }
171          }
172        }
173      }
174    }
175
176    #region IClientCommunicator Members
177
178    /// <summary>
179    /// Login process for the client
180    /// A hearbeat entry is created as well (login is the first hearbeat)
181    /// </summary>
182    /// <param name="clientInfo"></param>
183    /// <returns></returns>
184    public Response Login(ClientDto clientInfo) {
185      Response response = new Response();
186
187      heartbeatLock.EnterWriteLock();
188      if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
189        lastHeartbeats[clientInfo.Id] = DateTime.Now;
190      } else {
191        lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
192      }
193      heartbeatLock.ExitWriteLock();
194
195      clientInfo.State = State.idle;
196
197      if (DaoLocator.ClientDao.FindById(clientInfo.Id) == null)
198        DaoLocator.ClientDao.Insert(clientInfo);
199      else
200        DaoLocator.ClientDao.Update(clientInfo);
201      response.Success = true;
202      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
203      return response;
204    }
205
206    public ResponseCalendar GetCalendar(Guid clientId) {
207      ResponseCalendar response = new ResponseCalendar();
208
209      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
210      if(client == null) {
211        response.Success = false;
212        response.StatusMessage = ApplicationConstants.RESPONSE_CLIENT_RESOURCE_NOT_FOUND;
213        return response;
214      }
215     
216      response.ForceFetch = (client.CalendarSyncStatus == CalendarState.ForceFetch);
217     
218      IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForClient(client);
219      if (appointments.Count() == 0) {
220        response.StatusMessage = ApplicationConstants.RESPONSE_UPTIMECALENDAR_NO_CALENDAR_FOUND;
221        response.Success = false;
222      } else {
223        response.Success = true;
224        response.Appointments = appointments;
225      }
226
227      client.CalendarSyncStatus = CalendarState.Fetched;
228      DaoLocator.ClientDao.Update(client);
229      return response;
230    }
231
232    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
233      Response response = new Response();     
234      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
235      if (client == null) {
236        response.Success = false;
237        response.StatusMessage = ApplicationConstants.RESPONSE_CLIENT_RESOURCE_NOT_FOUND;
238      }
239     
240      client.CalendarSyncStatus = state;
241      DaoLocator.ClientDao.Update(client);
242     
243      response.Success = true;
244      response.StatusMessage = ApplicationConstants.RESPONSE_UPTIMECALENDAR_STATUS_UPDATED;
245     
246      return response;
247    }
248
249    /// <summary>
250    /// The client has to send regulary heartbeats
251    /// this hearbeats will be stored in the heartbeats dictionary
252    /// check if there is work for the client and send the client a response if he should pull a job
253    /// </summary>
254    /// <param name="hbData"></param>
255    /// <returns></returns>
256    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
257      HiveLogger.Debug(this.ToString() + ": BEGIN Processing Heartbeat for Client " + hbData.ClientId);
258      HiveLogger.Debug(this.ToString() + ": BEGIN Fetching Adapters");
259      HiveLogger.Debug(this.ToString() + ": END Fetched Adapters");
260      HiveLogger.Debug(this.ToString() + ": BEGIN Starting Transaction");
261
262      HiveLogger.Debug(this.ToString() + ": END Started Transaction");
263
264      ResponseHB response = new ResponseHB();
265      response.ActionRequest = new List<MessageContainer>();
266
267      HiveLogger.Debug(this.ToString() + ": BEGIN Started Client Fetching");
268      ClientDto client = DaoLocator.ClientDao.FindById(hbData.ClientId);
269      HiveLogger.Debug(this.ToString() + ": END Finished Client Fetching");
270      // check if the client is logged in
271      if (client.State == State.offline || client.State == State.nullState) {
272        response.Success = false;
273        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
274        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
275
276        HiveLogger.Error(this.ToString() + " ProcessHeartBeat: Client state null or offline: " + client);
277
278        return response;
279      }
280
281      client.NrOfFreeCores = hbData.FreeCores;
282      client.FreeMemory = hbData.FreeMemory;
283
284      // save timestamp of this heartbeat
285      HiveLogger.Debug(this.ToString() + ": BEGIN Locking for Heartbeats");
286      heartbeatLock.EnterWriteLock();
287      HiveLogger.Debug(this.ToString() + ": END Locked for Heartbeats");
288      if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
289        lastHeartbeats[hbData.ClientId] = DateTime.Now;
290      } else {
291        lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
292      }
293      heartbeatLock.ExitWriteLock();
294
295      HiveLogger.Debug(this.ToString() + ": BEGIN Processing Heartbeat Jobs");
296      processJobProcess(hbData, response);
297      HiveLogger.Debug(this.ToString() + ": END Processed Heartbeat Jobs");
298
299      //check if new Cal must be loaded
300      if (client.CalendarSyncStatus == CalendarState.Fetch || client.CalendarSyncStatus == CalendarState.ForceFetch) {
301        response.Success = true;
302        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_FETCH_OR_FORCEFETCH_CALENDAR;
303        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
304       
305        //client.CalendarSyncStatus = CalendarState.Fetching;
306       
307        HiveLogger.Info(this.ToString() + " fetch or forcefetch sent");       
308      }
309
310      // check if client has a free core for a new job
311      // if true, ask scheduler for a new job for this client
312      HiveLogger.Debug(this.ToString() + ": BEGIN Looking for Client Jobs");
313      if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData)) {
314        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
315      } else {
316        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
317      }
318      HiveLogger.Debug(this.ToString() + ": END Looked for Client Jobs");
319      response.Success = true;
320      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
321
322      DaoLocator.ClientDao.Update(client);
323
324      //tx.Commit();
325      HiveLogger.Debug(this.ToString() + ": END Processed Heartbeat for Client " + hbData.ClientId);
326      return response;
327    }
328
329    /// <summary>
330    /// Process the Job progress sent by a client
331    /// </summary>
332    /// <param name="hbData"></param>
333    /// <param name="jobAdapter"></param>
334    /// <param name="clientAdapter"></param>
335    /// <param name="response"></param>
336    private void processJobProcess(HeartBeatData hbData, ResponseHB response) {
337      HiveLogger.Info(this.ToString() + " processJobProcess: Started for Client " + hbData.ClientId);
338
339      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
340        List<JobDto> jobsOfClient = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfClient(DaoLocator.ClientDao.FindById(hbData.ClientId)));
341        if (jobsOfClient == null || jobsOfClient.Count == 0) {
342          response.Success = false;
343          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
344          HiveLogger.Error(this.ToString() + " processJobProcess: There is no job calculated by this user " + hbData.ClientId);
345          return;
346        }
347
348        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
349          JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
350          curJob.Client = DaoLocator.ClientDao.GetClientForJob(curJob.Id);
351          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
352            response.Success = false;
353            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
354            HiveLogger.Error(this.ToString() + " processJobProcess: There is no job calculated by this user " + hbData.ClientId + " Job: " + curJob);
355          } else if (curJob.State == State.abort) {
356            // a request to abort the job has been set
357            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
358            curJob.State = State.finished;
359          } else {
360            // save job progress
361            curJob.Percentage = jobProgress.Value;
362
363            if (curJob.State == State.requestSnapshot) {
364              // a request for a snapshot has been set
365              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
366              curJob.State = State.requestSnapshotSent;
367            }
368          }
369          DaoLocator.JobDao.Update(curJob);
370        }
371        foreach (JobDto currJob in jobsOfClient) {
372          bool found = false;
373          foreach (Guid jobId in hbData.JobProgress.Keys) {
374            if (jobId == currJob.Id) {
375              found = true;
376              break;
377            }
378          }
379          if (!found) {
380            lock (newAssignedJobs) {
381              if (newAssignedJobs.ContainsKey(currJob.Id)) {
382                newAssignedJobs[currJob.Id]--;
383                HiveLogger.Error(this.ToString() + " processJobProcess: Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Client);
384                if (newAssignedJobs[currJob.Id] <= 0) {
385                  HiveLogger.Error(this.ToString() + " processJobProcess: Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Client);
386
387                  currJob.State = State.offline;
388                  DaoLocator.JobDao.Update(currJob);
389
390                  response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
391
392                  newAssignedJobs.Remove(currJob.Id);
393                }
394              } else {
395                HiveLogger.Error(this.ToString() + " processJobProcess: Job ID wasn't with the heartbeats:  " + currJob);
396                currJob.State = State.offline;
397                DaoLocator.JobDao.Update(currJob);
398              }
399            } // lock
400          } else {
401            lock (newAssignedJobs) {
402
403              if (newAssignedJobs.ContainsKey(currJob.Id)) {
404                HiveLogger.Info(this.ToString() + " processJobProcess: Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
405                newAssignedJobs.Remove(currJob.Id);
406              }
407            }
408          }
409        }
410      }
411    }
412
413    /// <summary>
414    /// if the client was told to pull a job he calls this method
415    /// the server selects a job and sends it to the client
416    /// </summary>
417    /// <param name="clientId"></param>
418    /// <returns></returns>
419    /*public ResponseSerializedJob SendSerializedJob(Guid clientId) {
420      ISession session = factory.GetSessionForCurrentThread();
421      ITransaction tx = null;
422
423      try {
424        IJobAdapter jobAdapter =
425          session.GetDataAdapter<JobDto, IJobAdapter>();
426
427        tx = session.BeginTransaction();
428
429        ResponseSerializedJob response = new ResponseSerializedJob();
430
431        JobDto job2Calculate = scheduler.GetNextJobForClient(clientId);
432        if (job2Calculate != null) {
433          SerializedJob computableJob =
434            jobAdapter.GetSerializedJob(job2Calculate.Id);
435
436          response.Job = computableJob;
437          response.Success = true;
438          HiveLogger.Info(this.ToString() + " SendSerializedJob: Job pulled: " + computableJob.JobInfo + " for user " + clientId);                     
439          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
440          lock (newAssignedJobs) {
441            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
442              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
443          }
444        } else {
445          HiveLogger.Info(this.ToString() + " SendSerializedJob: No more Jobs left for " + clientId);                     
446          response.Success = false;
447          response.Job = null;
448          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
449        }
450
451        tx.Commit();
452
453        return response;
454      }
455      catch (Exception ex) {
456        if (tx != null)
457          tx.Rollback();
458        throw ex;
459      }
460      finally {
461        if (session != null)
462          session.EndSession();
463      }
464    }     */
465
466    /// <summary>
467    /// if the client was told to pull a job he calls this method
468    /// the server selects a job and sends it to the client
469    /// </summary>
470    /// <param name="clientId"></param>
471    /// <returns></returns>
472    public ResponseJob SendJob(Guid clientId) {
473
474      ResponseJob response = new ResponseJob();
475
476      JobDto job2Calculate = scheduler.GetNextJobForClient(clientId);
477      if (job2Calculate != null) {
478        response.Job = job2Calculate;
479        response.Success = true;
480        HiveLogger.Info(this.ToString() + " SendSerializedJob: Job pulled: " + job2Calculate + " for user " + clientId);
481        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
482        lock (newAssignedJobs) {
483          if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
484            newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
485        }
486      } else {
487        response.Success = false;
488        response.Job = null;
489        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
490        HiveLogger.Info(this.ToString() + " SendSerializedJob: No more Jobs left for " + clientId);
491      }
492
493
494
495      return response;
496    }
497
498    public ResponseResultReceived ProcessJobResult(
499      Stream stream,
500      bool finished) {
501
502      HiveLogger.Info(this.ToString() + " ProcessJobResult: BEGIN Job received for Storage - main method:");
503
504
505      Stream jobResultStream = null;
506      Stream jobStream = null;
507
508      //try {
509      BinaryFormatter formatter =
510        new BinaryFormatter();
511
512      JobResult result =
513        (JobResult)formatter.Deserialize(stream);
514
515      //important - repeatable read isolation level is required here,
516      //otherwise race conditions could occur when writing the stream into the DB
517      //just removed TransactionIsolationLevel.RepeatableRead
518      //tx = session.BeginTransaction();
519
520      ResponseResultReceived response =
521        ProcessJobResult(
522        result.ClientId,
523        result.JobId,
524        new byte[] { },
525        result.Percentage,
526        result.Exception,
527        finished);
528
529      if (response.Success) {
530        jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
531
532        byte[] buffer = new byte[3024];
533        int read = 0;
534        while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
535          jobStream.Write(buffer, 0, read);
536        }
537        jobStream.Close();
538      }
539      HiveLogger.Info(this.ToString() + " ProcessJobResult: END Job received for Storage:");
540      return response;
541    }
542
543
544    private ResponseResultReceived ProcessJobResult(Guid clientId,
545      Guid jobId,
546      byte[] result,
547      double percentage,
548      Exception exception,
549      bool finished) {
550
551      HiveLogger.Info(this.ToString() + " ProcessJobResult: BEGIN Job received for Storage - SUB method: " + jobId);
552
553      ResponseResultReceived response = new ResponseResultReceived();
554      ClientDto client =
555        DaoLocator.ClientDao.FindById(clientId);
556
557      SerializedJob job =
558        new SerializedJob();
559
560      if (job != null) {
561        job.JobInfo =
562          DaoLocator.JobDao.FindById(jobId);
563        job.JobInfo.Client = job.JobInfo.Client = DaoLocator.ClientDao.GetClientForJob(jobId);
564      }
565
566      if (job == null && job.JobInfo != null) {
567        response.Success = false;
568        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
569        response.JobId = jobId;
570
571        HiveLogger.Error(this.ToString() + " ProcessJobResult: No job with Id " + jobId);
572
573        //tx.Rollback();
574        return response;
575      }
576      if (job.JobInfo.State == State.abort) {
577        response.Success = false;
578        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
579
580        HiveLogger.Error(this.ToString() + " ProcessJobResult: Job was aborted! " + job.JobInfo);
581
582        //tx.Rollback();
583        return response;
584      }
585      if (job.JobInfo.Client == null) {
586        response.Success = false;
587        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
588        response.JobId = jobId;
589
590        HiveLogger.Error(this.ToString() + " ProcessJobResult: Job is not being calculated (client = null)! " + job.JobInfo);
591
592        //tx.Rollback();
593        return response;
594      }
595      if (job.JobInfo.Client.Id != clientId) {
596        response.Success = false;
597        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
598        response.JobId = jobId;
599
600        HiveLogger.Error(this.ToString() + " ProcessJobResult: Wrong Client for this Job! " + job.JobInfo + ", Sending Client is: " + clientId);
601
602        //tx.Rollback();
603        return response;
604      }
605      if (job.JobInfo.State == State.finished) {
606        response.Success = true;
607        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
608        response.JobId = jobId;
609
610        HiveLogger.Error(this.ToString() + " ProcessJobResult: Job already finished! " + job.JobInfo + ", Sending Client is: " + clientId);
611
612        //tx.Rollback();
613        return response;
614      }
615      if (job.JobInfo.State == State.requestSnapshotSent) {
616        job.JobInfo.State = State.calculating;
617      }
618      if (job.JobInfo.State != State.calculating &&
619        job.JobInfo.State != State.pending) {
620        response.Success = false;
621        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
622        response.JobId = jobId;
623
624        HiveLogger.Error(this.ToString() + " ProcessJobResult: Wrong Job State, job is: " + job.JobInfo);
625
626        //tx.Rollback();
627        return response;
628      }
629      job.JobInfo.Percentage = percentage;
630
631      if (finished) {
632        job.JobInfo.State = State.finished;
633      }
634
635      job.SerializedJobData = result;
636
637      DaoLocator.JobDao.Update(job.JobInfo);
638
639      response.Success = true;
640      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
641      response.JobId = jobId;
642      response.finished = finished;
643
644      HiveLogger.Info(this.ToString() + " ProcessJobResult: END Job received for Storage - SUB method: " + jobId);
645      return response;
646
647    }
648
649
650    /// <summary>
651    /// the client can send job results during calculating
652    /// and will send a final job result when he finished calculating
653    /// these job results will be stored in the database
654    /// </summary>
655    /// <param name="clientId"></param>
656    /// <param name="jobId"></param>
657    /// <param name="result"></param>
658    /// <param name="exception"></param>
659    /// <param name="finished"></param>
660    /// <returns></returns>
661    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
662      Guid jobId,
663      byte[] result,
664      double percentage,
665      Exception exception) {
666
667      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
668    }
669
670
671    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
672      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
673    }
674
675    /// <summary>
676    /// when a client logs out the state will be set
677    /// and the entry in the last hearbeats dictionary will be removed
678    /// </summary>
679    /// <param name="clientId"></param>
680    /// <returns></returns>                       
681    public Response Logout(Guid clientId) {
682
683      HiveLogger.Info("Client logged out " + clientId);
684     
685      Response response = new Response();
686
687      heartbeatLock.EnterWriteLock();
688      if (lastHeartbeats.ContainsKey(clientId))
689        lastHeartbeats.Remove(clientId);
690      heartbeatLock.ExitWriteLock();
691
692      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
693      if (client == null) {
694        response.Success = false;
695        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
696        return response;
697      }
698      if (client.State == State.calculating) {
699        // check wich job the client was calculating and reset it
700        IEnumerable<JobDto> jobsOfClient = DaoLocator.JobDao.FindActiveJobsOfClient(client);
701        foreach (JobDto job in jobsOfClient) {
702          if (job.State != State.finished)
703            jobManager.ResetJobsDependingOnResults(job);
704        }
705      }
706
707      client.State = State.offline;
708      DaoLocator.ClientDao.Update(client);
709
710      response.Success = true;
711      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
712
713      return response;
714    }
715
716    /// <summary>
717    /// If a client goes offline and restores a job he was calculating
718    /// he can ask the client if he still needs the job result
719    /// </summary>
720    /// <param name="jobId"></param>
721    /// <returns></returns>
722    public Response IsJobStillNeeded(Guid jobId) {
723      Response response = new Response();
724      JobDto job = DaoLocator.JobDao.FindById(jobId);
725      if (job == null) {
726        response.Success = false;
727        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
728        HiveLogger.Error(this.ToString() + " IsJobStillNeeded: Job doesn't exist (anymore)! " + jobId);
729        return response;
730      }
731      if (job.State == State.finished) {
732        response.Success = true;
733        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
734        HiveLogger.Error(this.ToString() + " IsJobStillNeeded: already finished! " + job);
735        return response;
736      }
737      job.State = State.pending;
738      lock (pendingJobs) {
739        pendingJobs.Add(job.Id, PENDING_TIMEOUT);
740      }
741
742      DaoLocator.JobDao.Update(job);
743
744      response.Success = true;
745      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
746      return response;
747    }
748
749    public ResponsePlugin SendPlugins(List<HivePluginInfoDto> pluginList) {
750      ResponsePlugin response = new ResponsePlugin();
751      foreach (HivePluginInfoDto pluginInfo in pluginList) {
752        // TODO: BuildDate deleted, not needed???
753        // TODO: Split version to major, minor and revision number
754        foreach (IPluginDescription currPlugin in ApplicationManager.Manager.Plugins) {
755          if (currPlugin.Name == pluginInfo.Name) {
756
757            CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
758              Name = currPlugin.Name,
759              Version = currPlugin.Version.ToString(),
760              BuildDate = currPlugin.BuildDate
761            };
762
763            foreach (string fileName in from file in currPlugin.Files select file.Name) {
764              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(fileName));
765            }
766            response.Plugins.Add(currCachedPlugin);
767          }
768        }
769      }
770      response.Success = true;
771      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
772
773      return response;
774
775    }
776
777    #endregion
778  }
779}
Note: See TracBrowser for help on using the repository browser.