Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.2/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 3931

Last change on this file since 3931 was 3931, checked in by kgrading, 14 years ago

added minor speedups and better transaction handling to the server (#828)

File size: 28.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39using System.Runtime.Serialization.Formatters.Binary;
40using HeuristicLab.Tracing;
41using Linq = HeuristicLab.Hive.Server.LINQDataAccess;
42using System.Transactions;
43using HeuristicLab.Hive.Server.LINQDataAccess;
44
45namespace HeuristicLab.Hive.Server.Core {
46  /// <summary>
47  /// The ClientCommunicator manages the whole communication with the client
48  /// </summary>
49  public class ClientCommunicator : IClientCommunicator,
50    IInternalClientCommunicator {
51    private static Dictionary<Guid, DateTime> lastHeartbeats =
52      new Dictionary<Guid, DateTime>();
53    private static Dictionary<Guid, int> newAssignedJobs =
54      new Dictionary<Guid, int>();
55    private static Dictionary<Guid, int> pendingJobs =
56      new Dictionary<Guid, int>();
57
58    private static ReaderWriterLockSlim heartbeatLock =
59      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
60
61    //private ISessionFactory factory;
62    private ILifecycleManager lifecycleManager;
63    private IInternalJobManager jobManager;
64    private IScheduler scheduler;
65
66    private static int PENDING_TIMEOUT = 100;
67
68    /// <summary>
69    /// Initialization of the Adapters to the database
70    /// Initialization of Eventhandler for the lifecycle management
71    /// Initialization of lastHearbeats Dictionary
72    /// </summary>
73    public ClientCommunicator() {
74      //factory = ServiceLocator.GetSessionFactory();
75
76      lifecycleManager = ServiceLocator.GetLifecycleManager();
77      jobManager = ServiceLocator.GetJobManager() as
78        IInternalJobManager;
79      scheduler = ServiceLocator.GetScheduler();
80
81      lifecycleManager.RegisterHeartbeat(
82        new EventHandler(lifecycleManager_OnServerHeartbeat));
83    }
84
85    /// <summary>
86    /// Check if online clients send their hearbeats
87    /// if not -> set them offline and check if they where calculating a job
88    /// </summary>
89    /// <param name="sender"></param>
90    /// <param name="e"></param>
91    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
92      Logger.Debug("Server Heartbeat ticked");
93
94      using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
95
96        List<ClientDto> allClients = new List<ClientDto>(DaoLocator.ClientDao.FindAll());
97
98        foreach (ClientDto client in allClients) {
99          if (client.State != State.offline && client.State != State.nullState) {
100            heartbeatLock.EnterUpgradeableReadLock();
101
102            if (!lastHeartbeats.ContainsKey(client.Id)) {
103              Logger.Info("Client " + client.Id +
104                              " wasn't offline but hasn't sent heartbeats - setting offline");
105              client.State = State.offline;
106              DaoLocator.ClientDao.Update(client);
107              Logger.Info("Client " + client.Id +
108                              " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
109              foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) {
110                //maybe implementa n additional Watchdog? Till then, just set them offline..
111                DaoLocator.JobDao.SetJobOffline(job);                               
112              }
113            } else {
114              DateTime lastHbOfClient = lastHeartbeats[client.Id];
115
116              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
117              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
118              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
119                // if client calculated jobs, the job must be reset
120                Logger.Info("Client timed out and is on RESET");
121                foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) {
122                  DaoLocator.JobDao.SetJobOffline(job);
123                  lock (newAssignedJobs) {
124                    if (newAssignedJobs.ContainsKey(job.Id))
125                      newAssignedJobs.Remove(job.Id);
126                  }
127                }
128                Logger.Debug("setting client offline");
129                // client must be set offline
130                client.State = State.offline;
131
132                //clientAdapter.Update(client);
133                DaoLocator.ClientDao.Update(client);
134
135                Logger.Debug("removing it from the heartbeats list");
136                heartbeatLock.EnterWriteLock();
137                lastHeartbeats.Remove(client.Id);
138                heartbeatLock.ExitWriteLock();
139              }
140            }
141
142            heartbeatLock.ExitUpgradeableReadLock();
143          } else {
144            //TODO: RLY neccesary?
145            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Shouldn't have offline or nullstate, has " + client.State);
146            heartbeatLock.EnterWriteLock();
147            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Resetting all his jobs");
148            if (lastHeartbeats.ContainsKey(client.Id))
149              lastHeartbeats.Remove(client.Id);
150            foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) {
151              DaoLocator.JobDao.SetJobOffline(job);
152            }
153            heartbeatLock.ExitWriteLock();
154          }
155        }
156        CheckForPendingJobs();
157        DaoLocator.DestroyContext();
158        scope.Complete();
159      }
160    }
161
162    private void CheckForPendingJobs() {
163      IList<JobDto> pendingJobsInDB = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(State.pending));
164
165      foreach (JobDto currJob in pendingJobsInDB) {
166        lock (pendingJobs) {
167          if (pendingJobs.ContainsKey(currJob.Id)) {
168            if (pendingJobs[currJob.Id] <= 0) {
169              currJob.State = State.offline;
170              DaoLocator.JobDao.Update(currJob);             
171            } else {
172              pendingJobs[currJob.Id]--;
173            }
174          }
175        }
176      }
177    }
178
179    #region IClientCommunicator Members
180
181    /// <summary>
182    /// Login process for the client
183    /// A hearbeat entry is created as well (login is the first hearbeat)
184    /// </summary>
185    /// <param name="clientInfo"></param>
186    /// <returns></returns>
187    public Response Login(ClientDto clientInfo) {
188      Response response = new Response();
189
190      heartbeatLock.EnterWriteLock();
191      if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
192        lastHeartbeats[clientInfo.Id] = DateTime.Now;
193      } else {
194        lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
195      }
196      heartbeatLock.ExitWriteLock();
197
198      ClientDto dbClient = DaoLocator.ClientDao.FindById(clientInfo.Id);
199
200      //Really set offline?
201      //Reconnect issues with the currently calculating jobs
202      clientInfo.State = State.idle;
203      clientInfo.CalendarSyncStatus = dbClient != null ? dbClient.CalendarSyncStatus : CalendarState.NotAllowedToFetch;
204
205      if (dbClient == null)
206        DaoLocator.ClientDao.Insert(clientInfo);
207      else
208        DaoLocator.ClientDao.Update(clientInfo);
209      response.Success = true;
210      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
211      return response;
212    }
213
214    public ResponseCalendar GetCalendar(Guid clientId) {
215      ResponseCalendar response = new ResponseCalendar();
216
217      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
218      if(client == null) {
219        response.Success = false;
220        response.StatusMessage = ApplicationConstants.RESPONSE_CLIENT_RESOURCE_NOT_FOUND;
221        return response;
222      }
223     
224      response.ForceFetch = (client.CalendarSyncStatus == CalendarState.ForceFetch);
225     
226      IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForClient(client);
227      if (appointments.Count() == 0) {
228        response.StatusMessage = ApplicationConstants.RESPONSE_UPTIMECALENDAR_NO_CALENDAR_FOUND;
229        response.Success = false;
230      } else {
231        response.Success = true;
232        response.Appointments = appointments;
233      }
234
235      client.CalendarSyncStatus = CalendarState.Fetched;
236      DaoLocator.ClientDao.Update(client);
237      return response;
238    }
239
240    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
241      Response response = new Response();     
242      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
243      if (client == null) {
244        response.Success = false;
245        response.StatusMessage = ApplicationConstants.RESPONSE_CLIENT_RESOURCE_NOT_FOUND;
246        return response;
247      }
248     
249      client.CalendarSyncStatus = state;
250      DaoLocator.ClientDao.Update(client);
251     
252      response.Success = true;
253      response.StatusMessage = ApplicationConstants.RESPONSE_UPTIMECALENDAR_STATUS_UPDATED;
254     
255      return response;
256    }
257
258    /// <summary>
259    /// The client has to send regulary heartbeats
260    /// this hearbeats will be stored in the heartbeats dictionary
261    /// check if there is work for the client and send the client a response if he should pull a job
262    /// </summary>
263    /// <param name="hbData"></param>
264    /// <returns></returns>
265    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
266      Logger.Debug("BEGIN Processing Heartbeat for Client " + hbData.ClientId);
267
268      ResponseHB response = new ResponseHB();
269      response.ActionRequest = new List<MessageContainer>();
270
271      Logger.Debug("BEGIN Started Client Fetching");
272      ClientDto client = DaoLocator.ClientDao.FindById(hbData.ClientId);
273      Logger.Debug("END Finished Client Fetching");
274      // check if the client is logged in
275      if (client.State == State.offline || client.State == State.nullState) {
276        response.Success = false;
277        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
278        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
279
280        Logger.Error("ProcessHeartBeat: Client state null or offline: " + client);
281
282        return response;
283      }
284
285      client.NrOfFreeCores = hbData.FreeCores;
286      client.FreeMemory = hbData.FreeMemory;
287
288      // save timestamp of this heartbeat
289      Logger.Debug("BEGIN Locking for Heartbeats");
290      heartbeatLock.EnterWriteLock();
291      Logger.Debug("END Locked for Heartbeats");
292      if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
293        lastHeartbeats[hbData.ClientId] = DateTime.Now;
294      } else {
295        lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
296      }
297      heartbeatLock.ExitWriteLock();
298
299      Logger.Debug("BEGIN Processing Heartbeat Jobs");
300      processJobProcess(hbData, response);
301      Logger.Debug("END Processed Heartbeat Jobs");
302
303      //check if new Cal must be loaded
304      if (client.CalendarSyncStatus == CalendarState.Fetch || client.CalendarSyncStatus == CalendarState.ForceFetch) {
305        response.Success = true;
306        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_FETCH_OR_FORCEFETCH_CALENDAR;
307        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
308       
309        //client.CalendarSyncStatus = CalendarState.Fetching;
310       
311        Logger.Info("fetch or forcefetch sent");       
312      }
313
314      // check if client has a free core for a new job
315      // if true, ask scheduler for a new job for this client
316      Logger.Debug(" BEGIN Looking for Client Jobs");
317      if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData)) {
318        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
319      } else {
320        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
321      }
322      Logger.Debug(" END Looked for Client Jobs");
323      response.Success = true;
324      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
325
326      DaoLocator.ClientDao.Update(client);
327
328      //tx.Commit();
329      Logger.Debug(" END Processed Heartbeat for Client " + hbData.ClientId);
330      return response;
331    }
332
333    /// <summary>
334    /// Process the Job progress sent by a client
335    /// </summary>
336    /// <param name="hbData"></param>
337    /// <param name="jobAdapter"></param>
338    /// <param name="clientAdapter"></param>
339    /// <param name="response"></param>
340    private void processJobProcess(HeartBeatData hbData, ResponseHB response) {
341      Logger.Debug("Started for Client " + hbData.ClientId);
342      List<JobDto> jobsOfClient = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfClient(DaoLocator.ClientDao.FindById(hbData.ClientId)));
343      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {       
344        if (jobsOfClient == null || jobsOfClient.Count == 0) {
345          response.Success = false;
346          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
347          Logger.Error("There is no job calculated by this user " + hbData.ClientId);
348          return;
349        }
350
351        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
352          JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
353          curJob.Client = DaoLocator.ClientDao.GetClientForJob(curJob.Id);
354          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
355            response.Success = false;
356            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
357            Logger.Error("There is no job calculated by this user " + hbData.ClientId + " Job: " + curJob);
358          } else if (curJob.State == State.abort) {
359            // a request to abort the job has been set
360            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
361            curJob.State = State.finished;
362          } else {
363            // save job progress
364            curJob.Percentage = jobProgress.Value;
365
366            if (curJob.State == State.requestSnapshot) {
367              // a request for a snapshot has been set
368              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
369              curJob.State = State.requestSnapshotSent;
370            }
371          }
372          DaoLocator.JobDao.Update(curJob);
373        }
374       }
375      foreach (JobDto currJob in jobsOfClient) {
376        bool found = false;
377        if(hbData.JobProgress != null) {
378          foreach (Guid jobId in hbData.JobProgress.Keys) {
379            if (jobId == currJob.Id) {
380              found = true;
381              break;
382            }
383          }
384        }
385        if (!found) {
386          lock (newAssignedJobs) {
387            if (newAssignedJobs.ContainsKey(currJob.Id)) {
388              newAssignedJobs[currJob.Id]--;
389              Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Client);
390              if (newAssignedJobs[currJob.Id] <= 0) {
391                Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Client);
392
393                currJob.State = State.offline;
394                DaoLocator.JobDao.Update(currJob);
395
396                response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
397
398                newAssignedJobs.Remove(currJob.Id);
399              }
400            } else {
401              Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
402              currJob.State = State.offline;
403              DaoLocator.JobDao.Update(currJob);
404            }
405          } // lock
406        } else {
407          lock (newAssignedJobs) {
408
409            if (newAssignedJobs.ContainsKey(currJob.Id)) {
410              Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
411              newAssignedJobs.Remove(currJob.Id);
412            }
413          }
414        }
415      }
416    }
417
418    /// <summary>
419    /// if the client was told to pull a job he calls this method
420    /// the server selects a job and sends it to the client
421    /// </summary>
422    /// <param name="clientId"></param>
423    /// <returns></returns>
424    public ResponseJob SendJob(Guid clientId) {
425
426      ResponseJob response = new ResponseJob();
427
428      JobDto job2Calculate = scheduler.GetNextJobForClient(clientId);
429      if (job2Calculate != null) {
430        response.Job = job2Calculate;       
431        response.Job.PluginsNeeded = DaoLocator.PluginInfoDao.GetPluginDependenciesForJob(response.Job);
432        response.Success = true;
433        Logger.Info("Job pulled: " + job2Calculate + " for user " + clientId);
434        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
435        lock (newAssignedJobs) {
436          if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
437            newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
438        }
439      } else {                 
440        response.Success = false;
441        response.Job = null;
442        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
443        Logger.Info("No more Jobs left for " + clientId);
444      }
445
446
447
448      return response;
449    }
450
451    public ResponseResultReceived ProcessJobResult(
452      Stream stream,
453      bool finished) {
454
455      Logger.Info("BEGIN Job received for Storage - main method:");
456
457
458      Stream jobResultStream = null;
459      Stream jobStream = null;
460
461      //try {
462      BinaryFormatter formatter =
463        new BinaryFormatter();
464
465      JobResult result =
466        (JobResult)formatter.Deserialize(stream);
467
468      //important - repeatable read isolation level is required here,
469      //otherwise race conditions could occur when writing the stream into the DB
470      //just removed TransactionIsolationLevel.RepeatableRead
471      //tx = session.BeginTransaction();
472
473      ResponseResultReceived response =
474        ProcessJobResult(
475        result.ClientId,
476        result.JobId,
477        new byte[] { },
478        result.Percentage,
479        result.Exception,
480        finished);
481
482      if (response.Success) {
483        Logger.Debug("Trying to aquire WCF Job Stream");
484        //jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
485        //Logger.Debug("Job Stream Aquired");
486        byte[] buffer = new byte[3024];
487        List<byte> serializedJob = new List<byte>();
488        int read = 0;
489        int i = 0;       
490        while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {         
491          for (int j = 0; j < read; j++) {
492            serializedJob.Add(buffer[j]);
493          }
494          if (i% 100 == 0)
495            Logger.Debug("Writing to stream: " + i);         
496          //jobStream.Write(buffer, 0, read);
497          i++;
498        }
499        Logger.Debug("Done Writing, closing the stream!");               
500        //jobStream.Close();
501
502        DaoLocator.JobDao.SetBinaryJobFile(result.JobId, serializedJob.ToArray());
503      }
504      Logger.Info("END Job received for Storage:");
505      stream.Dispose();
506      return response;
507    }
508
509
510    private ResponseResultReceived ProcessJobResult(Guid clientId,
511      Guid jobId,
512      byte[] result,
513      double percentage,
514      Exception exception,
515      bool finished) {
516
517      Logger.Info("BEGIN Job received for Storage - SUB method: " + jobId);
518
519      ResponseResultReceived response = new ResponseResultReceived();
520      ClientDto client =
521        DaoLocator.ClientDao.FindById(clientId);
522
523      SerializedJob job =
524        new SerializedJob();
525
526      if (job != null) {
527        job.JobInfo =
528          DaoLocator.JobDao.FindById(jobId);
529        job.JobInfo.Client = job.JobInfo.Client = DaoLocator.ClientDao.GetClientForJob(jobId);
530      }
531
532      if (job == null && job.JobInfo != null) {
533        response.Success = false;
534        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
535        response.JobId = jobId;
536
537        Logger.Error("No job with Id " + jobId);
538
539        //tx.Rollback();
540        return response;
541      }
542      if (job.JobInfo.State == State.abort) {
543        response.Success = false;
544        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
545
546        Logger.Error("Job was aborted! " + job.JobInfo);
547
548        //tx.Rollback();
549        return response;
550      }
551      if (job.JobInfo.Client == null) {
552        response.Success = false;
553        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
554        response.JobId = jobId;
555
556        Logger.Error("Job is not being calculated (client = null)! " + job.JobInfo);
557
558        //tx.Rollback();
559        return response;
560      }
561      if (job.JobInfo.Client.Id != clientId) {
562        response.Success = false;
563        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
564        response.JobId = jobId;
565
566        Logger.Error("Wrong Client for this Job! " + job.JobInfo + ", Sending Client is: " + clientId);
567
568        //tx.Rollback();
569        return response;
570      }
571      if (job.JobInfo.State == State.finished) {
572        response.Success = true;
573        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
574        response.JobId = jobId;
575
576        Logger.Error("Job already finished! " + job.JobInfo + ", Sending Client is: " + clientId);
577
578        //tx.Rollback();
579        return response;
580      }
581      //Todo: RequestsnapshotSent => calculating?
582      if (job.JobInfo.State == State.requestSnapshotSent) {
583        job.JobInfo.State = State.calculating;
584      }
585      if (job.JobInfo.State != State.calculating &&
586        job.JobInfo.State != State.pending) {
587        response.Success = false;
588        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
589        response.JobId = jobId;
590
591        Logger.Error("Wrong Job State, job is: " + job.JobInfo);
592
593        //tx.Rollback();
594        return response;
595      }
596      job.JobInfo.Percentage = percentage;
597
598      if (finished) {
599        job.JobInfo.State = State.finished;
600        job.JobInfo.DateFinished = DateTime.Now;
601      }
602
603      job.SerializedJobData = result;
604
605      DaoLocator.JobDao.Update(job.JobInfo);
606
607      response.Success = true;
608      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
609      response.JobId = jobId;
610      response.finished = finished;
611
612      Logger.Info("END Job received for Storage - SUB method: " + jobId);
613      return response;
614
615    }
616
617
618    /// <summary>
619    /// the client can send job results during calculating
620    /// and will send a final job result when he finished calculating
621    /// these job results will be stored in the database
622    /// </summary>
623    /// <param name="clientId"></param>
624    /// <param name="jobId"></param>
625    /// <param name="result"></param>
626    /// <param name="exception"></param>
627    /// <param name="finished"></param>
628    /// <returns></returns>
629    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
630      Guid jobId,
631      byte[] result,
632      double percentage,
633      Exception exception) {
634
635      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
636    }
637
638
639    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
640      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
641    }
642
643    /// <summary>
644    /// when a client logs out the state will be set
645    /// and the entry in the last hearbeats dictionary will be removed
646    /// </summary>
647    /// <param name="clientId"></param>
648    /// <returns></returns>                       
649    public Response Logout(Guid clientId) {
650
651      Logger.Info("Client logged out " + clientId);
652     
653      Response response = new Response();
654
655      heartbeatLock.EnterWriteLock();
656      if (lastHeartbeats.ContainsKey(clientId))
657        lastHeartbeats.Remove(clientId);
658      heartbeatLock.ExitWriteLock();
659
660      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
661      if (client == null) {
662        response.Success = false;
663        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
664        return response;
665      }
666      if (client.State == State.calculating) {
667        // check wich job the client was calculating and reset it
668        IEnumerable<JobDto> jobsOfClient = DaoLocator.JobDao.FindActiveJobsOfClient(client);
669        foreach (JobDto job in jobsOfClient) {
670          if (job.State != State.finished)
671            DaoLocator.JobDao.SetJobOffline(job);
672        }
673      }
674
675      client.State = State.offline;
676      DaoLocator.ClientDao.Update(client);
677
678      response.Success = true;
679      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
680
681      return response;
682    }
683
684    /// <summary>
685    /// If a client goes offline and restores a job he was calculating
686    /// he can ask the client if he still needs the job result
687    /// </summary>
688    /// <param name="jobId"></param>
689    /// <returns></returns>
690    public Response IsJobStillNeeded(Guid jobId) {
691      Response response = new Response();
692      JobDto job = DaoLocator.JobDao.FindById(jobId);
693      if (job == null) {
694        response.Success = false;
695        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
696        Logger.Error("Job doesn't exist (anymore)! " + jobId);
697        return response;
698      }
699      if (job.State == State.finished) {
700        response.Success = true;
701        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
702        Logger.Error("already finished! " + job);
703        return response;
704      }
705      job.State = State.pending;
706      lock (pendingJobs) {
707        pendingJobs.Add(job.Id, PENDING_TIMEOUT);
708      }
709
710      DaoLocator.JobDao.Update(job);
711
712      response.Success = true;
713      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
714      return response;
715    }
716
717    public ResponsePlugin SendPlugins(List<HivePluginInfoDto> pluginList) {
718      ResponsePlugin response = new ResponsePlugin();
719      foreach (HivePluginInfoDto pluginInfo in pluginList) {
720        // TODO: BuildDate deleted, not needed???
721        // TODO: Split version to major, minor and revision number
722        foreach (IPluginDescription currPlugin in ApplicationManager.Manager.Plugins) {
723          if (currPlugin.Name == pluginInfo.Name) {
724
725            CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
726              Name = currPlugin.Name,
727              Version = currPlugin.Version.ToString(),
728              BuildDate = currPlugin.BuildDate
729            };
730
731            foreach (string fileName in from file in currPlugin.Files select file.Name) {
732              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(fileName));
733            }
734            response.Plugins.Add(currCachedPlugin);
735          }
736        }
737      }
738      response.Success = true;
739      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
740
741      return response;
742
743    }
744
745    #endregion
746  }
747}
Note: See TracBrowser for help on using the repository browser.