Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs @ 4267

Last change on this file since 4267 was 4267, checked in by cneumuel, 14 years ago

renamed all database entities from "Client" to "Slave" (#1157)
made slave-heartbeats synchronous, also they send HBs when timetable disallows them to calculate. they will appear on the server as Idle bis IsAllowedToCalculate will be false (#1159)

File size: 29.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Linq;
26using System.Runtime.Serialization.Formatters.Binary;
27using System.Threading;
28using System.Transactions;
29using HeuristicLab.Hive.Contracts;
30using HeuristicLab.Hive.Contracts.BusinessObjects;
31using HeuristicLab.Hive.Contracts.Interfaces;
32using HeuristicLab.Hive.Server.Core.InternalInterfaces;
33using HeuristicLab.PluginInfrastructure;
34using HeuristicLab.Tracing;
35using HeuristicLab.Hive.Contracts.ResponseObjects;
36
37namespace HeuristicLab.Hive.Server.Core {
38  /// <summary>
39  /// The SlaveCommunicator manages the whole communication with the slave
40  /// </summary>
41  public class SlaveCommunicator : ISlaveCommunicator,
42    IInternalSlaveCommunicator {
43    private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>();
44    private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>();
45    private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>();
46
47    private static ReaderWriterLockSlim heartbeatLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
48
49    //private ISessionFactory factory;
50    private ILifecycleManager lifecycleManager;
51    private IInternalJobManager jobManager;
52    private IScheduler scheduler;
53
54    private static int PENDING_TIMEOUT = 100;
55
56    /// <summary>
57    /// Initialization of the Adapters to the database
58    /// Initialization of Eventhandler for the lifecycle management
59    /// Initialization of lastHearbeats Dictionary
60    /// </summary>
61    public SlaveCommunicator() {
62      //factory = ServiceLocator.GetSessionFactory();
63
64      lifecycleManager = ServiceLocator.GetLifecycleManager();
65      jobManager = ServiceLocator.GetJobManager() as IInternalJobManager;
66      scheduler = ServiceLocator.GetScheduler();
67
68      lifecycleManager.RegisterHeartbeat(new EventHandler(lifecycleManager_OnServerHeartbeat));
69    }
70
71    /// <summary>
72    /// Check if online slaves send their hearbeats
73    /// if not -> set them offline and check if they where calculating a job
74    /// </summary>
75    /// <param name="sender"></param>
76    /// <param name="e"></param>
77    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
78      Logger.Debug("Server Heartbeat ticked");
79
80      // [chn] why is transaction management done here
81      using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
82        List<SlaveDto> allSlaves = new List<SlaveDto>(DaoLocator.SlaveDao.FindAll());
83
84        foreach (SlaveDto slave in allSlaves) {
85          if (slave.State != SlaveState.Offline && slave.State != SlaveState.NullState) {
86            heartbeatLock.EnterUpgradeableReadLock();
87
88            if (!lastHeartbeats.ContainsKey(slave.Id)) {
89              Logger.Info("Slave " + slave.Id +
90                              " wasn't offline but hasn't sent heartbeats - setting offline");
91              slave.State = SlaveState.Offline;
92              DaoLocator.SlaveDao.Update(slave);
93              Logger.Info("Slave " + slave.Id +
94                              " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
95              foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
96                //maybe implementa n additional Watchdog? Till then, just set them offline..
97                DaoLocator.JobDao.SetJobOffline(job);
98              }
99            } else {
100              DateTime lastHbOfSlave = lastHeartbeats[slave.Id];
101
102              TimeSpan dif = DateTime.Now.Subtract(lastHbOfSlave);
103              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
104              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
105                // if slave calculated jobs, the job must be reset
106                Logger.Info("Slave timed out and is on RESET");
107                foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
108                  DaoLocator.JobDao.SetJobOffline(job);
109                  lock (newAssignedJobs) {
110                    if (newAssignedJobs.ContainsKey(job.Id))
111                      newAssignedJobs.Remove(job.Id);
112                  }
113                }
114                Logger.Debug("setting slave offline");
115                // slave must be set offline
116                slave.State = SlaveState.Offline;
117
118                //slaveAdapter.Update(slave);
119                DaoLocator.SlaveDao.Update(slave);
120
121                Logger.Debug("removing it from the heartbeats list");
122                heartbeatLock.EnterWriteLock();
123                lastHeartbeats.Remove(slave.Id);
124                heartbeatLock.ExitWriteLock();
125              }
126            }
127
128            heartbeatLock.ExitUpgradeableReadLock();
129          } else {
130            //TODO: RLY neccesary?
131            //HiveLogger.Info(this.ToString() + ": Slave " + slave.Id + " has wrong state: Shouldn't have offline or nullstate, has " + slave.State);
132            heartbeatLock.EnterWriteLock();
133            //HiveLogger.Info(this.ToString() + ": Slave " + slave.Id + " has wrong state: Resetting all his jobs");
134            if (lastHeartbeats.ContainsKey(slave.Id))
135              lastHeartbeats.Remove(slave.Id);
136            foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
137              DaoLocator.JobDao.SetJobOffline(job);
138            }
139            heartbeatLock.ExitWriteLock();
140          }
141        }
142        CheckForPendingJobs();
143        //        DaoLocator.DestroyContext();
144        scope.Complete();
145      }
146    }
147
148    private void CheckForPendingJobs() {
149      IList<JobDto> pendingJobsInDB = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(JobState.Pending));
150
151      foreach (JobDto currJob in pendingJobsInDB) {
152        lock (pendingJobs) {
153          if (pendingJobs.ContainsKey(currJob.Id)) {
154            if (pendingJobs[currJob.Id] <= 0) {
155              currJob.State = JobState.Offline;
156              DaoLocator.JobDao.Update(currJob);
157            } else {
158              pendingJobs[currJob.Id]--;
159            }
160          }
161        }
162      }
163    }
164
165    #region ISlaveCommunicator Members
166
167    /// <summary>
168    /// Login process for the slave
169    /// A hearbeat entry is created as well (login is the first hearbeat)
170    /// </summary>
171    /// <param name="slaveInfo"></param>
172    /// <returns></returns>
173    public Response Login(SlaveDto slaveInfo) {
174      Response response = new Response();
175
176      heartbeatLock.EnterWriteLock();
177      if (lastHeartbeats.ContainsKey(slaveInfo.Id)) {
178        lastHeartbeats[slaveInfo.Id] = DateTime.Now;
179      } else {
180        lastHeartbeats.Add(slaveInfo.Id, DateTime.Now);
181      }
182      heartbeatLock.ExitWriteLock();
183
184      SlaveDto dbSlave = DaoLocator.SlaveDao.FindById(slaveInfo.Id);
185
186      //Really set offline?
187      //Reconnect issues with the currently calculating jobs
188      slaveInfo.State = SlaveState.Idle;
189      slaveInfo.CalendarSyncStatus = dbSlave != null ? dbSlave.CalendarSyncStatus : CalendarState.NotAllowedToFetch;
190
191      if (dbSlave == null)
192        DaoLocator.SlaveDao.Insert(slaveInfo);
193      else
194        DaoLocator.SlaveDao.Update(slaveInfo);
195
196      return response;
197    }
198
199    public ResponseCalendar GetCalendar(Guid slaveId) {
200      ResponseCalendar response = new ResponseCalendar();
201
202      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
203      if (slave == null) {
204        //response.Success = false;
205        response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
206        return response;
207      }
208
209      response.ForceFetch = (slave.CalendarSyncStatus == CalendarState.ForceFetch);
210
211      IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForSlave(slave);
212      if (appointments.Count() == 0) {
213        response.StatusMessage = ResponseStatus.GetCalendar_NoCalendarFound;
214        //response.Success = false;
215      } else {
216        //response.Success = true;
217        response.Appointments = appointments;
218      }
219
220      slave.CalendarSyncStatus = CalendarState.Fetched;
221      DaoLocator.SlaveDao.Update(slave);
222      return response;
223    }
224
225    public Response SetCalendarStatus(Guid slaveId, CalendarState state) {
226      Response response = new Response();
227      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
228      if (slave == null) {
229        //response.Success = false;
230        response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
231        return response;
232      }
233
234      slave.CalendarSyncStatus = state;
235      DaoLocator.SlaveDao.Update(slave);
236
237      return response;
238    }
239
240    /// <summary>
241    /// The slave has to send regulary heartbeats
242    /// this hearbeats will be stored in the heartbeats dictionary
243    /// check if there is work for the slave and send the slave a response if he should pull a job
244    /// </summary>
245    /// <param name="hbData"></param>
246    /// <returns></returns>
247    public ResponseHeartBeat ProcessHeartBeat(HeartBeatData hbData) {
248      Logger.Debug("BEGIN Processing Heartbeat for Slave " + hbData.SlaveId);
249
250      ResponseHeartBeat response = new ResponseHeartBeat();
251      response.ActionRequest = new List<MessageContainer>();
252
253      Logger.Debug("BEGIN Started Slave Fetching");
254      SlaveDto slave = DaoLocator.SlaveDao.FindById(hbData.SlaveId);
255      Logger.Debug("END Finished Slave Fetching");
256
257      slave.NrOfFreeCores = hbData.FreeCores;
258      slave.FreeMemory = hbData.FreeMemory;
259      slave.IsAllowedToCalculate = hbData.IsAllowedToCalculate;
260
261      // check if the slave is logged in
262      if (slave.State == SlaveState.Offline || slave.State == SlaveState.NullState) {
263        response.StatusMessage = ResponseStatus.ProcessHeartBeat_UserNotLoggedIn;
264        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
265        Logger.Error("ProcessHeartBeat: Slave state null or offline: " + slave);
266        return response;
267      }
268
269      // save timestamp of this heartbeat
270      Logger.Debug("BEGIN Locking for Heartbeats");
271      heartbeatLock.EnterWriteLock();
272      Logger.Debug("END Locked for Heartbeats");
273      if (lastHeartbeats.ContainsKey(hbData.SlaveId)) {
274        lastHeartbeats[hbData.SlaveId] = DateTime.Now;
275      } else {
276        lastHeartbeats.Add(hbData.SlaveId, DateTime.Now);
277      }
278      heartbeatLock.ExitWriteLock();
279
280      Logger.Debug("BEGIN Processing Heartbeat Jobs");
281      ProcessJobProcess(hbData, response);
282      Logger.Debug("END Processed Heartbeat Jobs");
283
284      //check if new Cal must be loaded
285      if (slave.CalendarSyncStatus == CalendarState.Fetch || slave.CalendarSyncStatus == CalendarState.ForceFetch) {
286        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
287        //slave.CalendarSyncStatus = CalendarState.Fetching;
288        Logger.Info("fetch or forcefetch sent");
289      }
290
291      // check if slave has a free core for a new job
292      // if true, ask scheduler for a new job for this slave
293      Logger.Debug(" BEGIN Looking for Slave Jobs");
294      if (slave.IsAllowedToCalculate && hbData.FreeCores > 0 && scheduler.ExistsJobForSlave(hbData)) {
295        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
296      } else {
297        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
298      }
299      Logger.Debug(" END Looked for Slave Jobs");
300
301      DaoLocator.SlaveDao.Update(slave);
302
303      //tx.Commit();
304      Logger.Debug(" END Processed Heartbeat for Slave " + hbData.SlaveId);
305      return response;
306    }
307
308    /// <summary>
309    /// Process the Job progress sent by a slave
310    /// [chn] this method needs to be refactored, because its a performance hog
311    ///
312    /// what it does:
313    /// (1) find out if the jobs that should be calculated by this slave (from db) and compare if they are consistent with what the joblist the slave sent
314    /// (2) find out if every job from the joblist really should be calculated by this slave
315    /// (3) checks if a job should be aborted and issues Message
316    /// (4) update job-progress and write to db
317    /// (5) if snapshot is requested, issue Message
318    ///
319    /// (6) for each job from DB, check if there is a job from slave (again).
320    /// (7) if job matches, it is removed from newAssigneJobs
321    /// (8) if job !matches, job's TTL is reduced by 1,
322    /// (9) if TTL==0, job is set to Abort (save to DB), and Message to Abort job is issued to slave
323    ///
324    ///
325    ///
326    /// quirks:
327    /// (1) the response-object is modified during the foreach-loop (only last element counts)
328    /// (2) state Abort results in Finished. This should be: AbortRequested, Aborted.
329    /// </summary>
330    /// <param name="hbData"></param>
331    /// <param name="jobAdapter"></param>
332    /// <param name="slaveAdapter"></param>
333    /// <param name="response"></param>
334    private void ProcessJobProcess(HeartBeatData hbData, ResponseHeartBeat response) {
335      Logger.Debug("Started for Slave " + hbData.SlaveId);
336      List<JobDto> jobsOfSlave = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(hbData.SlaveId)));
337      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
338        if (jobsOfSlave == null || jobsOfSlave.Count == 0) {
339          //response.Success = false;
340          //response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
341
342          foreach (Guid jobId in hbData.JobProgress.Keys) {
343            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
344          }
345
346          Logger.Error("There is no job calculated by this user " + hbData.SlaveId + ", advise him to abort all");
347          return;
348        }
349
350        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
351          JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
352          curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id);
353          if (curJob.Slave == null || curJob.Slave.Id != hbData.SlaveId) {
354            //response.Success = false;
355            //response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
356            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
357            Logger.Error("There is no job calculated by this user " + hbData.SlaveId + " Job: " + curJob);
358          } else if (curJob.State == JobState.Aborted) {
359            // a request to abort the job has been set
360            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
361            curJob.State = JobState.Finished;
362          } else {
363            // save job progress
364            curJob.Percentage = jobProgress.Value;
365
366            if (curJob.State == JobState.SnapshotRequested) {
367              // a request for a snapshot has been set
368              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
369              curJob.State = JobState.SnapshotSent;
370            }
371          }
372          DaoLocator.JobDao.Update(curJob);
373        }
374      }
375      foreach (JobDto currJob in jobsOfSlave) {
376        bool found = false;
377        if (hbData.JobProgress != null) {
378          foreach (Guid jobId in hbData.JobProgress.Keys) {
379            if (jobId == currJob.Id) {
380              found = true;
381              break;
382            }
383          }
384        }
385        if (!found) {
386          lock (newAssignedJobs) {
387            if (newAssignedJobs.ContainsKey(currJob.Id)) {
388              newAssignedJobs[currJob.Id]--;
389              Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);
390              if (newAssignedJobs[currJob.Id] <= 0) {
391                Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave);
392
393                currJob.State = JobState.Offline;
394                DaoLocator.JobDao.Update(currJob);
395
396                response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
397
398                newAssignedJobs.Remove(currJob.Id);
399              }
400            } else {
401              Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
402              currJob.State = JobState.Offline;
403              DaoLocator.JobDao.Update(currJob);
404            }
405          } // lock
406        } else {
407          lock (newAssignedJobs) {
408
409            if (newAssignedJobs.ContainsKey(currJob.Id)) {
410              Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
411              newAssignedJobs.Remove(currJob.Id);
412            }
413          }
414        }
415      }
416    }
417
418    /// <summary>
419    /// if the slave was told to pull a job he calls this method
420    /// the server selects a job and sends it to the slave
421    /// </summary>
422    /// <param name="slaveId"></param>
423    /// <returns></returns>
424    public ResponseObject<JobDto> GetJob(Guid slaveId) {
425      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
426
427      JobDto job2Calculate = scheduler.GetNextJobForSlave(slaveId);
428      if (job2Calculate != null) {
429        response.Obj = job2Calculate;
430        response.Obj.PluginsNeeded = DaoLocator.PluginInfoDao.GetPluginDependenciesForJob(response.Obj);
431
432        Logger.Info("Job pulled: " + job2Calculate + " for user " + slaveId);
433        lock (newAssignedJobs) {
434          if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
435            newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
436        }
437      } else {
438        //response.Success = false;
439        response.Obj = null;
440        response.StatusMessage = ResponseStatus.GetJob_NoJobsAvailable;
441        Logger.Info("No more Jobs left for " + slaveId);
442      }
443      return response;
444    }
445
446    public ResponseResultReceived ProcessJobResult(Stream stream, bool finished) {
447      Logger.Info("BEGIN Job received for Storage - main method:");
448
449      //Stream jobResultStream = null;
450      //Stream jobStream = null;
451
452      //try {
453      BinaryFormatter formatter = new BinaryFormatter();
454
455      JobResult result = (JobResult)formatter.Deserialize(stream);
456
457      //important - repeatable read isolation level is required here,
458      //otherwise race conditions could occur when writing the stream into the DB
459      //just removed TransactionIsolationLevel.RepeatableRead
460      //tx = session.BeginTransaction();
461
462      ResponseResultReceived response = ProcessJobResult(result.SlaveId, result.JobId, new byte[] { }, result.Percentage, result.Exception, finished);
463
464      if (response.StatusMessage == ResponseStatus.Ok) {
465        Logger.Debug("Trying to aquire WCF Job Stream");
466        //jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
467        //Logger.Debug("Job Stream Aquired");
468        byte[] buffer = new byte[3024];
469        List<byte> serializedJob = new List<byte>();
470        int read = 0;
471        int i = 0;
472        while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
473          for (int j = 0; j < read; j++) {
474            serializedJob.Add(buffer[j]);
475          }
476          if (i % 100 == 0)
477            Logger.Debug("Writing to stream: " + i);
478          //jobStream.Write(buffer, 0, read);
479          i++;
480        }
481        Logger.Debug("Done Writing, closing the stream!");
482        //jobStream.Close();
483
484        DaoLocator.JobDao.SetBinaryJobFile(result.JobId, serializedJob.ToArray());
485      }
486      Logger.Info("END Job received for Storage:");
487      stream.Dispose();
488      return response;
489    }
490
491    private ResponseResultReceived ProcessJobResult(Guid slaveId, Guid jobId, byte[] result, double? percentage, string exception, bool finished) {
492      Logger.Info("BEGIN Job received for Storage - SUB method: " + jobId);
493
494      ResponseResultReceived response = new ResponseResultReceived();
495      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
496
497      SerializedJob job = new SerializedJob();
498
499      if (job != null) {
500        job.JobInfo = DaoLocator.JobDao.FindById(jobId);
501        if (job.JobInfo != null) {
502          job.JobInfo.Slave = job.JobInfo.Slave = DaoLocator.SlaveDao.GetSlaveForJob(jobId);
503        }
504      }
505
506      if (job != null && job.JobInfo == null) {
507        //response.Success = false;
508        response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
509        response.JobId = jobId;
510        Logger.Error("No job with Id " + jobId);
511
512        //tx.Rollback();
513        return response;
514      }
515      if (job.JobInfo.State == JobState.Aborted) {
516        //response.Success = false;
517        response.StatusMessage = ResponseStatus.ProcessJobResult_JobAborted;
518
519        Logger.Error("Job was aborted! " + job.JobInfo);
520
521        //tx.Rollback();
522        return response;
523      }
524      if (job.JobInfo.Slave == null) {
525        //response.Success = false;
526        response.StatusMessage = ResponseStatus.ProcessJobResult_JobIsNotBeeingCalculated;
527        response.JobId = jobId;
528
529        Logger.Error("Job is not being calculated (slave = null)! " + job.JobInfo);
530
531        //tx.Rollback();
532        return response;
533      }
534      if (job.JobInfo.Slave.Id != slaveId) {
535        //response.Success = false;
536        response.StatusMessage = ResponseStatus.ProcessJobResult_WrongSlaveForJob;
537        response.JobId = jobId;
538
539        Logger.Error("Wrong Slave for this Job! " + job.JobInfo + ", Sending Slave is: " + slaveId);
540
541        //tx.Rollback();
542        return response;
543      }
544      if (job.JobInfo.State == JobState.Finished) {
545        response.StatusMessage = ResponseStatus.Ok;
546        response.JobId = jobId;
547
548        Logger.Error("Job already finished! " + job.JobInfo + ", Sending Slave is: " + slaveId);
549
550        //tx.Rollback();
551        return response;
552      }
553      //Todo: RequestsnapshotSent => calculating?
554      if (job.JobInfo.State == JobState.SnapshotSent) {
555        job.JobInfo.State = JobState.Calculating;
556      }
557      if (job.JobInfo.State != JobState.Calculating && job.JobInfo.State != JobState.Pending) {
558        //response.Success = false;
559        response.StatusMessage = ResponseStatus.ProcessJobResult_InvalidJobState;
560        response.JobId = jobId;
561
562        Logger.Error("Wrong Job State, job is: " + job.JobInfo);
563
564        //tx.Rollback();
565        return response;
566      }
567      job.JobInfo.Percentage = percentage;
568
569      if (!string.IsNullOrEmpty(exception)) {
570        job.JobInfo.State = JobState.Failed;
571        job.JobInfo.Exception = exception;
572        job.JobInfo.DateFinished = DateTime.Now;
573      } else if (finished) {
574        job.JobInfo.State = JobState.Finished;
575        job.JobInfo.DateFinished = DateTime.Now;
576      }
577
578      job.SerializedJobData = result;
579
580      DaoLocator.JobDao.Update(job.JobInfo);
581
582      response.StatusMessage = ResponseStatus.Ok;
583      response.JobId = jobId;
584      response.Finished = finished;
585
586      Logger.Info("END Job received for Storage - SUB method: " + jobId);
587      return response;
588
589    }
590
591    /// <summary>
592    /// the slave can send job results during calculating
593    /// and will send a final job result when he finished calculating
594    /// these job results will be stored in the database
595    /// </summary>
596    /// <param name="slaveId"></param>
597    /// <param name="jobId"></param>
598    /// <param name="result"></param>
599    /// <param name="exception"></param>
600    /// <param name="finished"></param>
601    /// <returns></returns>
602    public ResponseResultReceived StoreFinishedJobResult(Guid slaveId,
603      Guid jobId,
604      byte[] result,
605      double percentage,
606      string exception) {
607
608      return ProcessJobResult(slaveId, jobId, result, percentage, exception, true);
609    }
610
611    public ResponseResultReceived ProcessSnapshot(Guid slaveId, Guid jobId, byte[] result, double percentage, string exception) {
612      return ProcessJobResult(slaveId, jobId, result, percentage, exception, false);
613    }
614
615    /// <summary>
616    /// when a slave logs out the state will be set
617    /// and the entry in the last hearbeats dictionary will be removed
618    /// </summary>
619    /// <param name="slaveId"></param>
620    /// <returns></returns>                       
621    public Response Logout(Guid slaveId) {
622      Logger.Info("Slave logged out " + slaveId);
623
624      Response response = new Response();
625
626      heartbeatLock.EnterWriteLock();
627      if (lastHeartbeats.ContainsKey(slaveId))
628        lastHeartbeats.Remove(slaveId);
629      heartbeatLock.ExitWriteLock();
630
631      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
632      if (slave == null) {
633        //response.Success = false;
634        response.StatusMessage = ResponseStatus.Logout_SlaveNotRegistered;
635        return response;
636      }
637      if (slave.State == SlaveState.Calculating) {
638        // check wich job the slave was calculating and reset it
639        IEnumerable<JobDto> jobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(slave);
640        foreach (JobDto job in jobsOfSlave) {
641          if (job.State != JobState.Finished)
642            DaoLocator.JobDao.SetJobOffline(job);
643        }
644      }
645
646      slave.State = SlaveState.Offline;
647      DaoLocator.SlaveDao.Update(slave);
648
649      return response;
650    }
651
652    /// <summary>
653    /// If a slave goes offline and restores a job he was calculating
654    /// he can ask the slave if he still needs the job result
655    /// </summary>
656    /// <param name="jobId"></param>
657    /// <returns></returns>
658    public Response IsJobStillNeeded(Guid jobId) {
659      Response response = new Response();
660      JobDto job = DaoLocator.JobDao.FindById(jobId);
661      if (job == null) {
662        //response.Success = false;
663        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobDoesNotExist;
664        Logger.Error("Job doesn't exist (anymore)! " + jobId);
665        return response;
666      }
667      if (job.State == JobState.Finished) {
668        //response.Success = true;
669        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobAlreadyFinished;
670        Logger.Error("already finished! " + job);
671        return response;
672      }
673      job.State = JobState.Pending;
674      lock (pendingJobs) {
675        pendingJobs.Add(job.Id, PENDING_TIMEOUT);
676      }
677
678      DaoLocator.JobDao.Update(job);
679
680      return response;
681    }
682
683    public ResponseList<CachedHivePluginInfoDto> GetPlugins(List<HivePluginInfoDto> pluginList) {
684      ResponseList<CachedHivePluginInfoDto> response = new ResponseList<CachedHivePluginInfoDto>();
685      response.List = new List<CachedHivePluginInfoDto>();
686      foreach (HivePluginInfoDto pluginInfo in pluginList) {
687        if (pluginInfo.Update) {
688          //check if there is a newer version         
689          IPluginDescription ipd = ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision > pluginInfo.Version.Revision).SingleOrDefault();
690          if (ipd != null) {
691            response.List.Add(ConvertPluginDescriptorToDto(ipd));
692          }
693        } else {
694          IPluginDescription ipd = ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision >= pluginInfo.Version.Revision).SingleOrDefault();
695          if (ipd != null) {
696            response.List.Add(ConvertPluginDescriptorToDto(ipd));
697          } else {
698            //response.Success = false;
699            response.StatusMessage = ResponseStatus.GetPlugins_PluginsNotAvailable;
700            return response;
701          }
702        }
703      }
704      return response;
705    }
706
707    private CachedHivePluginInfoDto ConvertPluginDescriptorToDto(IPluginDescription currPlugin) {
708      CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
709        Name = currPlugin.Name,
710        Version = currPlugin.Version
711      };
712
713      foreach (string fileName in from file in currPlugin.Files select file.Name) {
714        currCachedPlugin.PluginFiles.Add(new HivePluginFile(File.ReadAllBytes(fileName), fileName));
715      }
716      return currCachedPlugin;
717    }
718
719    #endregion
720  }
721}
Note: See TracBrowser for help on using the repository browser.