Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs @ 4254

Last change on this file since 4254 was 4254, checked in by cneumuel, 14 years ago

some small refactorings (#1159)

File size: 30.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Linq;
26using System.Runtime.Serialization.Formatters.Binary;
27using System.Threading;
28using System.Transactions;
29using HeuristicLab.Hive.Contracts;
30using HeuristicLab.Hive.Contracts.BusinessObjects;
31using HeuristicLab.Hive.Contracts.Interfaces;
32using HeuristicLab.Hive.Server.Core.InternalInterfaces;
33using HeuristicLab.PluginInfrastructure;
34using HeuristicLab.Tracing;
35
36namespace HeuristicLab.Hive.Server.Core {
37  /// <summary>
38  /// The ClientCommunicator manages the whole communication with the client
39  /// </summary>
40  public class SlaveCommunicator : ISlaveCommunicator,
41    IInternalSlaveCommunicator {
42    private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>();
43    private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>();
44    private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>();
45
46    private static ReaderWriterLockSlim heartbeatLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
47
48    //private ISessionFactory factory;
49    private ILifecycleManager lifecycleManager;
50    private IInternalJobManager jobManager;
51    private IScheduler scheduler;
52
53    private static int PENDING_TIMEOUT = 100;
54
55    /// <summary>
56    /// Initialization of the Adapters to the database
57    /// Initialization of Eventhandler for the lifecycle management
58    /// Initialization of lastHearbeats Dictionary
59    /// </summary>
60    public SlaveCommunicator() {
61      //factory = ServiceLocator.GetSessionFactory();
62
63      lifecycleManager = ServiceLocator.GetLifecycleManager();
64      jobManager = ServiceLocator.GetJobManager() as IInternalJobManager;
65      scheduler = ServiceLocator.GetScheduler();
66
67      lifecycleManager.RegisterHeartbeat(new EventHandler(lifecycleManager_OnServerHeartbeat));
68    }
69
70    /// <summary>
71    /// Check if online clients send their hearbeats
72    /// if not -> set them offline and check if they where calculating a job
73    /// </summary>
74    /// <param name="sender"></param>
75    /// <param name="e"></param>
76    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
77      Logger.Debug("Server Heartbeat ticked");
78
79      // [chn] why is transaction management done here
80      using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
81        List<ClientDto> allClients = new List<ClientDto>(DaoLocator.ClientDao.FindAll());
82
83        foreach (ClientDto client in allClients) {
84          if (client.State != State.Offline && client.State != State.NullState) {
85            heartbeatLock.EnterUpgradeableReadLock();
86
87            if (!lastHeartbeats.ContainsKey(client.Id)) {
88              Logger.Info("Client " + client.Id +
89                              " wasn't offline but hasn't sent heartbeats - setting offline");
90              client.State = State.Offline;
91              DaoLocator.ClientDao.Update(client);
92              Logger.Info("Client " + client.Id +
93                              " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
94              foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(client)) {
95                //maybe implementa n additional Watchdog? Till then, just set them offline..
96                DaoLocator.JobDao.SetJobOffline(job);
97              }
98            } else {
99              DateTime lastHbOfClient = lastHeartbeats[client.Id];
100
101              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
102              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
103              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
104                // if client calculated jobs, the job must be reset
105                Logger.Info("Client timed out and is on RESET");
106                foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(client)) {
107                  DaoLocator.JobDao.SetJobOffline(job);
108                  lock (newAssignedJobs) {
109                    if (newAssignedJobs.ContainsKey(job.Id))
110                      newAssignedJobs.Remove(job.Id);
111                  }
112                }
113                Logger.Debug("setting client offline");
114                // client must be set offline
115                client.State = State.Offline;
116
117                //clientAdapter.Update(client);
118                DaoLocator.ClientDao.Update(client);
119
120                Logger.Debug("removing it from the heartbeats list");
121                heartbeatLock.EnterWriteLock();
122                lastHeartbeats.Remove(client.Id);
123                heartbeatLock.ExitWriteLock();
124              }
125            }
126
127            heartbeatLock.ExitUpgradeableReadLock();
128          } else {
129            //TODO: RLY neccesary?
130            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Shouldn't have offline or nullstate, has " + client.State);
131            heartbeatLock.EnterWriteLock();
132            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Resetting all his jobs");
133            if (lastHeartbeats.ContainsKey(client.Id))
134              lastHeartbeats.Remove(client.Id);
135            foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(client)) {
136              DaoLocator.JobDao.SetJobOffline(job);
137            }
138            heartbeatLock.ExitWriteLock();
139          }
140        }
141        CheckForPendingJobs();
142        //        DaoLocator.DestroyContext();
143        scope.Complete();
144      }
145    }
146
147    private void CheckForPendingJobs() {
148      IList<JobDto> pendingJobsInDB = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(State.Pending));
149
150      foreach (JobDto currJob in pendingJobsInDB) {
151        lock (pendingJobs) {
152          if (pendingJobs.ContainsKey(currJob.Id)) {
153            if (pendingJobs[currJob.Id] <= 0) {
154              currJob.State = State.Offline;
155              DaoLocator.JobDao.Update(currJob);
156            } else {
157              pendingJobs[currJob.Id]--;
158            }
159          }
160        }
161      }
162    }
163
164    #region IClientCommunicator Members
165
166    /// <summary>
167    /// Login process for the client
168    /// A hearbeat entry is created as well (login is the first hearbeat)
169    /// </summary>
170    /// <param name="clientInfo"></param>
171    /// <returns></returns>
172    public Response Login(ClientDto clientInfo) {
173      Response response = new Response();
174
175      heartbeatLock.EnterWriteLock();
176      if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
177        lastHeartbeats[clientInfo.Id] = DateTime.Now;
178      } else {
179        lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
180      }
181      heartbeatLock.ExitWriteLock();
182
183      ClientDto dbClient = DaoLocator.ClientDao.FindById(clientInfo.Id);
184
185      //Really set offline?
186      //Reconnect issues with the currently calculating jobs
187      clientInfo.State = State.Idle;
188      clientInfo.CalendarSyncStatus = dbClient != null ? dbClient.CalendarSyncStatus : CalendarState.NotAllowedToFetch;
189
190      if (dbClient == null)
191        DaoLocator.ClientDao.Insert(clientInfo);
192      else
193        DaoLocator.ClientDao.Update(clientInfo);
194      response.Success = true;
195      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
196      return response;
197    }
198
199    public ResponseCalendar GetCalendar(Guid clientId) {
200      ResponseCalendar response = new ResponseCalendar();
201
202      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
203      if (client == null) {
204        response.Success = false;
205        response.StatusMessage = ApplicationConstants.RESPONSE_CLIENT_RESOURCE_NOT_FOUND;
206        return response;
207      }
208
209      response.ForceFetch = (client.CalendarSyncStatus == CalendarState.ForceFetch);
210
211      IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForClient(client);
212      if (appointments.Count() == 0) {
213        response.StatusMessage = ApplicationConstants.RESPONSE_UPTIMECALENDAR_NO_CALENDAR_FOUND;
214        response.Success = false;
215      } else {
216        response.Success = true;
217        response.Appointments = appointments;
218      }
219
220      client.CalendarSyncStatus = CalendarState.Fetched;
221      DaoLocator.ClientDao.Update(client);
222      return response;
223    }
224
225    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
226      Response response = new Response();
227      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
228      if (client == null) {
229        response.Success = false;
230        response.StatusMessage = ApplicationConstants.RESPONSE_CLIENT_RESOURCE_NOT_FOUND;
231        return response;
232      }
233
234      client.CalendarSyncStatus = state;
235      DaoLocator.ClientDao.Update(client);
236
237      response.Success = true;
238      response.StatusMessage = ApplicationConstants.RESPONSE_UPTIMECALENDAR_STATUS_UPDATED;
239
240      return response;
241    }
242
243    /// <summary>
244    /// The client has to send regulary heartbeats
245    /// this hearbeats will be stored in the heartbeats dictionary
246    /// check if there is work for the client and send the client a response if he should pull a job
247    /// </summary>
248    /// <param name="hbData"></param>
249    /// <returns></returns>
250    public ResponseHeartBeat ProcessHeartBeat(HeartBeatData hbData) {
251      Logger.Debug("BEGIN Processing Heartbeat for Client " + hbData.SlaveId);
252
253      ResponseHeartBeat response = new ResponseHeartBeat();
254      response.ActionRequest = new List<MessageContainer>();
255
256      Logger.Debug("BEGIN Started Client Fetching");
257      ClientDto client = DaoLocator.ClientDao.FindById(hbData.SlaveId);
258      Logger.Debug("END Finished Client Fetching");
259      // check if the client is logged in
260      if (client.State == State.Offline || client.State == State.NullState) {
261        response.Success = false;
262        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
263        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
264
265        Logger.Error("ProcessHeartBeat: Client state null or offline: " + client);
266
267        return response;
268      }
269
270      client.NrOfFreeCores = hbData.FreeCores;
271      client.FreeMemory = hbData.FreeMemory;
272
273      // save timestamp of this heartbeat
274      Logger.Debug("BEGIN Locking for Heartbeats");
275      heartbeatLock.EnterWriteLock();
276      Logger.Debug("END Locked for Heartbeats");
277      if (lastHeartbeats.ContainsKey(hbData.SlaveId)) {
278        lastHeartbeats[hbData.SlaveId] = DateTime.Now;
279      } else {
280        lastHeartbeats.Add(hbData.SlaveId, DateTime.Now);
281      }
282      heartbeatLock.ExitWriteLock();
283
284      Logger.Debug("BEGIN Processing Heartbeat Jobs");
285      ProcessJobProcess(hbData, response);
286      Logger.Debug("END Processed Heartbeat Jobs");
287
288      //check if new Cal must be loaded
289      if (client.CalendarSyncStatus == CalendarState.Fetch || client.CalendarSyncStatus == CalendarState.ForceFetch) {
290        response.Success = true;
291        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_FETCH_OR_FORCEFETCH_CALENDAR;
292        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
293
294        //client.CalendarSyncStatus = CalendarState.Fetching;
295
296        Logger.Info("fetch or forcefetch sent");
297      }
298
299      // check if client has a free core for a new job
300      // if true, ask scheduler for a new job for this client
301      Logger.Debug(" BEGIN Looking for Client Jobs");
302      if (hbData.FreeCores > 0 && scheduler.ExistsJobForSlave(hbData)) {
303        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
304      } else {
305        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
306      }
307      Logger.Debug(" END Looked for Client Jobs");
308      response.Success = true;
309      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
310
311      DaoLocator.ClientDao.Update(client);
312
313      //tx.Commit();
314      Logger.Debug(" END Processed Heartbeat for Client " + hbData.SlaveId);
315      return response;
316    }
317
318    /// <summary>
319    /// Process the Job progress sent by a client
320    /// [chn] this method needs to be refactored, because its a performance hog
321    ///
322    /// what it does:
323    /// (1) find out if the jobs that should be calculated by this client (from db) and compare if they are consistent with what the joblist the client sent
324    /// (2) find out if every job from the joblist really should be calculated by this client
325    /// (3) checks if a job should be aborted and issues Message
326    /// (4) update job-progress and write to db
327    /// (5) if snapshot is requested, issue Message
328    ///
329    /// (6) for each job from DB, check if there is a job from client (again).
330    /// (7) if job matches, it is removed from newAssigneJobs
331    /// (8) if job !matches, job's TTL is reduced by 1,
332    /// (9) if TTL==0, job is set to Abort (save to DB), and Message to Abort job is issued to client
333    ///
334    ///
335    ///
336    /// quirks:
337    /// (1) the response-object is modified during the foreach-loop (only last element counts)
338    /// (2) state Abort results in Finished. This should be: AbortRequested, Aborted.
339    /// </summary>
340    /// <param name="hbData"></param>
341    /// <param name="jobAdapter"></param>
342    /// <param name="clientAdapter"></param>
343    /// <param name="response"></param>
344    private void ProcessJobProcess(HeartBeatData hbData, ResponseHeartBeat response) {
345      Logger.Debug("Started for Client " + hbData.SlaveId);
346      List<JobDto> jobsOfClient = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.ClientDao.FindById(hbData.SlaveId)));
347      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
348        if (jobsOfClient == null || jobsOfClient.Count == 0) {
349          response.Success = false;
350          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
351
352          foreach (Guid jobId in hbData.JobProgress.Keys) {
353            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
354          }
355
356          Logger.Error("There is no job calculated by this user " + hbData.SlaveId + ", advise him to abort all");
357          return;
358        }
359
360        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
361          JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
362          curJob.Client = DaoLocator.ClientDao.GetClientForJob(curJob.Id);
363          if (curJob.Client == null || curJob.Client.Id != hbData.SlaveId) {
364            response.Success = false;
365            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
366            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
367            Logger.Error("There is no job calculated by this user " + hbData.SlaveId + " Job: " + curJob);
368          } else if (curJob.State == State.Abort) {
369            // a request to abort the job has been set
370            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
371            curJob.State = State.Finished;
372          } else {
373            // save job progress
374            curJob.Percentage = jobProgress.Value;
375
376            if (curJob.State == State.RequestSnapshot) {
377              // a request for a snapshot has been set
378              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
379              curJob.State = State.RequestSnapshotSent;
380            }
381          }
382          DaoLocator.JobDao.Update(curJob);
383        }
384      }
385      foreach (JobDto currJob in jobsOfClient) {
386        bool found = false;
387        if (hbData.JobProgress != null) {
388          foreach (Guid jobId in hbData.JobProgress.Keys) {
389            if (jobId == currJob.Id) {
390              found = true;
391              break;
392            }
393          }
394        }
395        if (!found) {
396          lock (newAssignedJobs) {
397            if (newAssignedJobs.ContainsKey(currJob.Id)) {
398              newAssignedJobs[currJob.Id]--;
399              Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Client);
400              if (newAssignedJobs[currJob.Id] <= 0) {
401                Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Client);
402
403                currJob.State = State.Offline;
404                DaoLocator.JobDao.Update(currJob);
405
406                response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
407
408                newAssignedJobs.Remove(currJob.Id);
409              }
410            } else {
411              Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
412              currJob.State = State.Offline;
413              DaoLocator.JobDao.Update(currJob);
414            }
415          } // lock
416        } else {
417          lock (newAssignedJobs) {
418
419            if (newAssignedJobs.ContainsKey(currJob.Id)) {
420              Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
421              newAssignedJobs.Remove(currJob.Id);
422            }
423          }
424        }
425      }
426    }
427
428    /// <summary>
429    /// if the client was told to pull a job he calls this method
430    /// the server selects a job and sends it to the client
431    /// </summary>
432    /// <param name="clientId"></param>
433    /// <returns></returns>
434    public ResponseObject<JobDto> GetJob(Guid clientId) {
435      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
436
437      JobDto job2Calculate = scheduler.GetNextJobForSlave(clientId);
438      if (job2Calculate != null) {
439        response.Obj = job2Calculate;
440        response.Obj.PluginsNeeded = DaoLocator.PluginInfoDao.GetPluginDependenciesForJob(response.Obj);
441        response.Success = true;
442        Logger.Info("Job pulled: " + job2Calculate + " for user " + clientId);
443        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
444        lock (newAssignedJobs) {
445          if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
446            newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
447        }
448      } else {
449        response.Success = false;
450        response.Obj = null;
451        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
452        Logger.Info("No more Jobs left for " + clientId);
453      }
454      return response;
455    }
456
457    public ResponseResultReceived ProcessJobResult(Stream stream, bool finished) {
458      Logger.Info("BEGIN Job received for Storage - main method:");
459
460      //Stream jobResultStream = null;
461      //Stream jobStream = null;
462
463      //try {
464      BinaryFormatter formatter = new BinaryFormatter();
465
466      JobResult result = (JobResult)formatter.Deserialize(stream);
467
468      //important - repeatable read isolation level is required here,
469      //otherwise race conditions could occur when writing the stream into the DB
470      //just removed TransactionIsolationLevel.RepeatableRead
471      //tx = session.BeginTransaction();
472
473      ResponseResultReceived response = ProcessJobResult(result.ClientId, result.JobId, new byte[] { }, result.Percentage, result.Exception, finished);
474
475      if (response.Success) {
476        Logger.Debug("Trying to aquire WCF Job Stream");
477        //jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
478        //Logger.Debug("Job Stream Aquired");
479        byte[] buffer = new byte[3024];
480        List<byte> serializedJob = new List<byte>();
481        int read = 0;
482        int i = 0;
483        while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
484          for (int j = 0; j < read; j++) {
485            serializedJob.Add(buffer[j]);
486          }
487          if (i % 100 == 0)
488            Logger.Debug("Writing to stream: " + i);
489          //jobStream.Write(buffer, 0, read);
490          i++;
491        }
492        Logger.Debug("Done Writing, closing the stream!");
493        //jobStream.Close();
494
495        DaoLocator.JobDao.SetBinaryJobFile(result.JobId, serializedJob.ToArray());
496      }
497      Logger.Info("END Job received for Storage:");
498      stream.Dispose();
499      return response;
500    }
501
502
503    private ResponseResultReceived ProcessJobResult(Guid clientId,
504      Guid jobId,
505      byte[] result,
506      double? percentage,
507      string exception,
508      bool finished) {
509
510      Logger.Info("BEGIN Job received for Storage - SUB method: " + jobId);
511
512      ResponseResultReceived response = new ResponseResultReceived();
513      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
514
515      SerializedJob job = new SerializedJob();
516
517      if (job != null) {
518        job.JobInfo = DaoLocator.JobDao.FindById(jobId);
519        if (job.JobInfo != null) {
520          job.JobInfo.Client = job.JobInfo.Client = DaoLocator.ClientDao.GetClientForJob(jobId);
521        }
522      }
523     
524      if (job != null && job.JobInfo == null) {
525        response.Success = false;
526        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
527        response.JobId = jobId;
528        Logger.Error("No job with Id " + jobId);
529
530        //tx.Rollback();
531        return response;
532      }
533      if (job.JobInfo.State == State.Abort) {
534        response.Success = false;
535        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
536
537        Logger.Error("Job was aborted! " + job.JobInfo);
538
539        //tx.Rollback();
540        return response;
541      }
542      if (job.JobInfo.Client == null) {
543        response.Success = false;
544        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
545        response.JobId = jobId;
546
547        Logger.Error("Job is not being calculated (client = null)! " + job.JobInfo);
548
549        //tx.Rollback();
550        return response;
551      }
552      if (job.JobInfo.Client.Id != clientId) {
553        response.Success = false;
554        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
555        response.JobId = jobId;
556
557        Logger.Error("Wrong Client for this Job! " + job.JobInfo + ", Sending Client is: " + clientId);
558
559        //tx.Rollback();
560        return response;
561      }
562      if (job.JobInfo.State == State.Finished) {
563        response.Success = true;
564        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
565        response.JobId = jobId;
566
567        Logger.Error("Job already finished! " + job.JobInfo + ", Sending Client is: " + clientId);
568
569        //tx.Rollback();
570        return response;
571      }
572      //Todo: RequestsnapshotSent => calculating?
573      if (job.JobInfo.State == State.RequestSnapshotSent) {
574        job.JobInfo.State = State.Calculating;
575      }
576      if (job.JobInfo.State != State.Calculating &&
577        job.JobInfo.State != State.Pending) {
578        response.Success = false;
579        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
580        response.JobId = jobId;
581
582        Logger.Error("Wrong Job State, job is: " + job.JobInfo);
583
584        //tx.Rollback();
585        return response;
586      }
587      job.JobInfo.Percentage = percentage;
588
589      if (!string.IsNullOrEmpty(exception)) {
590        job.JobInfo.State = State.Failed;
591        job.JobInfo.Exception = exception;
592        job.JobInfo.DateFinished = DateTime.Now;
593      } else if (finished) {
594        job.JobInfo.State = State.Finished;
595        job.JobInfo.DateFinished = DateTime.Now;
596      }
597
598      job.SerializedJobData = result;
599
600      DaoLocator.JobDao.Update(job.JobInfo);
601
602      response.Success = true;
603      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
604      response.JobId = jobId;
605      response.Finished = finished;
606
607      Logger.Info("END Job received for Storage - SUB method: " + jobId);
608      return response;
609
610    }
611
612    /// <summary>
613    /// the client can send job results during calculating
614    /// and will send a final job result when he finished calculating
615    /// these job results will be stored in the database
616    /// </summary>
617    /// <param name="clientId"></param>
618    /// <param name="jobId"></param>
619    /// <param name="result"></param>
620    /// <param name="exception"></param>
621    /// <param name="finished"></param>
622    /// <returns></returns>
623    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
624      Guid jobId,
625      byte[] result,
626      double percentage,
627      string exception) {
628
629      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
630    }
631
632    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, string exception) {
633      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
634    }
635
636    /// <summary>
637    /// when a client logs out the state will be set
638    /// and the entry in the last hearbeats dictionary will be removed
639    /// </summary>
640    /// <param name="clientId"></param>
641    /// <returns></returns>                       
642    public Response Logout(Guid clientId) {
643      Logger.Info("Client logged out " + clientId);
644
645      Response response = new Response();
646
647      heartbeatLock.EnterWriteLock();
648      if (lastHeartbeats.ContainsKey(clientId))
649        lastHeartbeats.Remove(clientId);
650      heartbeatLock.ExitWriteLock();
651
652      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
653      if (client == null) {
654        response.Success = false;
655        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
656        return response;
657      }
658      if (client.State == State.Calculating) {
659        // check wich job the client was calculating and reset it
660        IEnumerable<JobDto> jobsOfClient = DaoLocator.JobDao.FindActiveJobsOfSlave(client);
661        foreach (JobDto job in jobsOfClient) {
662          if (job.State != State.Finished)
663            DaoLocator.JobDao.SetJobOffline(job);
664        }
665      }
666
667      client.State = State.Offline;
668      DaoLocator.ClientDao.Update(client);
669
670      response.Success = true;
671      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
672
673      return response;
674    }
675
676    /// <summary>
677    /// If a client goes offline and restores a job he was calculating
678    /// he can ask the client if he still needs the job result
679    /// </summary>
680    /// <param name="jobId"></param>
681    /// <returns></returns>
682    public Response IsJobStillNeeded(Guid jobId) {
683      Response response = new Response();
684      JobDto job = DaoLocator.JobDao.FindById(jobId);
685      if (job == null) {
686        response.Success = false;
687        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
688        Logger.Error("Job doesn't exist (anymore)! " + jobId);
689        return response;
690      }
691      if (job.State == State.Finished) {
692        response.Success = true;
693        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
694        Logger.Error("already finished! " + job);
695        return response;
696      }
697      job.State = State.Pending;
698      lock (pendingJobs) {
699        pendingJobs.Add(job.Id, PENDING_TIMEOUT);
700      }
701
702      DaoLocator.JobDao.Update(job);
703
704      response.Success = true;
705      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
706      return response;
707    }
708
709    public ResponseList<CachedHivePluginInfoDto> SendPlugins(List<HivePluginInfoDto> pluginList) {
710      ResponseList<CachedHivePluginInfoDto> response = new ResponseList<CachedHivePluginInfoDto>();
711      foreach (HivePluginInfoDto pluginInfo in pluginList) {
712        if (pluginInfo.Update) {
713          //check if there is a newer version         
714          IPluginDescription ipd =
715            ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision > pluginInfo.Version.Revision).SingleOrDefault();
716          if (ipd != null) {
717            response.List.Add(convertPluginDescriptorToDto(ipd));
718          }
719        } else {
720          IPluginDescription ipd =
721            ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision >= pluginInfo.Version.Revision).SingleOrDefault();
722          if (ipd != null) {
723            response.List.Add(convertPluginDescriptorToDto(ipd));
724          } else {
725            response.Success = false;
726            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_NOT_AVAIL;
727            return response;
728          }
729        }
730      }
731      response.Success = true;
732      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
733
734      return response;
735    }
736
737    private CachedHivePluginInfoDto convertPluginDescriptorToDto(IPluginDescription currPlugin) {
738      CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
739        Name = currPlugin.Name,
740        Version = currPlugin.Version
741      };
742
743      foreach (string fileName in from file in currPlugin.Files select file.Name) {
744        currCachedPlugin.PluginFiles.Add(new HivePluginFile(File.ReadAllBytes(fileName), fileName));
745      }
746      return currCachedPlugin;
747    }
748
749    #endregion
750  }
751}
Note: See TracBrowser for help on using the repository browser.