Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs @ 4305

Last change on this file since 4305 was 4305, checked in by cneumuel, 14 years ago

added streamedHttpEndpoit binding (without message-security (for now)) (#1168)

File size: 29.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Linq;
26using System.Runtime.Serialization.Formatters.Binary;
27using System.Threading;
28using System.Transactions;
29using HeuristicLab.Hive.Contracts;
30using HeuristicLab.Hive.Contracts.BusinessObjects;
31using HeuristicLab.Hive.Contracts.Interfaces;
32using HeuristicLab.Hive.Server.Core.InternalInterfaces;
33using HeuristicLab.PluginInfrastructure;
34using HeuristicLab.Tracing;
35using HeuristicLab.Hive.Contracts.ResponseObjects;
36using System.Security.Permissions;
37using System.Web.Security;
38using System.Security.Principal;
39using System.ServiceModel;
40using System.Security;
41
42namespace HeuristicLab.Hive.Server.Core {
43  /// <summary>
44  /// The SlaveCommunicator manages the whole communication with the slave
45  /// </summary>
46  public class SlaveCommunicator : ISlaveCommunicator,
47    IInternalSlaveCommunicator {
48    private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>();
49    private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>();
50    private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>();
51
52    private static ReaderWriterLockSlim heartbeatLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
53
54    //private ISessionFactory factory;
55    private ILifecycleManager lifecycleManager;
56    private IInternalJobManager jobManager;
57    private IScheduler scheduler;
58
59    private static int PENDING_TIMEOUT = 100;
60
61    /// <summary>
62    /// Initialization of the Adapters to the database
63    /// Initialization of Eventhandler for the lifecycle management
64    /// Initialization of lastHearbeats Dictionary
65    /// </summary>
66    public SlaveCommunicator() {
67      //factory = ServiceLocator.GetSessionFactory();
68
69      lifecycleManager = ServiceLocator.GetLifecycleManager();
70      jobManager = ServiceLocator.GetJobManager() as IInternalJobManager;
71      scheduler = ServiceLocator.GetScheduler();
72
73      lifecycleManager.RegisterHeartbeat(new EventHandler(lifecycleManager_OnServerHeartbeat));
74    }
75
76    /// <summary>
77    /// Check if online slaves send their hearbeats
78    /// if not -> set them offline and check if they where calculating a job
79    /// </summary>
80    /// <param name="sender"></param>
81    /// <param name="e"></param>
82    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
83      Logger.Debug("Server Heartbeat ticked");
84
85      // [chn] why is transaction management done here
86      using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
87        List<SlaveDto> allSlaves = new List<SlaveDto>(DaoLocator.SlaveDao.FindAll());
88
89        foreach (SlaveDto slave in allSlaves) {
90          if (slave.State != SlaveState.Offline && slave.State != SlaveState.NullState) {
91            heartbeatLock.EnterUpgradeableReadLock();
92
93            if (!lastHeartbeats.ContainsKey(slave.Id)) {
94              Logger.Info("Slave " + slave.Id +
95                              " wasn't offline but hasn't sent heartbeats - setting offline");
96              slave.State = SlaveState.Offline;
97              DaoLocator.SlaveDao.Update(slave);
98              Logger.Info("Slave " + slave.Id +
99                              " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
100              foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
101                //maybe implementa n additional Watchdog? Till then, just set them offline..
102                DaoLocator.JobDao.SetJobOffline(job);
103              }
104            } else {
105              DateTime lastHbOfSlave = lastHeartbeats[slave.Id];
106
107              TimeSpan dif = DateTime.Now.Subtract(lastHbOfSlave);
108              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
109              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
110                // if slave calculated jobs, the job must be reset
111                Logger.Info("Slave timed out and is on RESET");
112                foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
113                  DaoLocator.JobDao.SetJobOffline(job);
114                  lock (newAssignedJobs) {
115                    if (newAssignedJobs.ContainsKey(job.Id))
116                      newAssignedJobs.Remove(job.Id);
117                  }
118                }
119                Logger.Debug("setting slave offline");
120                // slave must be set offline
121                slave.State = SlaveState.Offline;
122
123                //slaveAdapter.Update(slave);
124                DaoLocator.SlaveDao.Update(slave);
125
126                Logger.Debug("removing it from the heartbeats list");
127                heartbeatLock.EnterWriteLock();
128                lastHeartbeats.Remove(slave.Id);
129                heartbeatLock.ExitWriteLock();
130              }
131            }
132
133            heartbeatLock.ExitUpgradeableReadLock();
134          } else {
135            //TODO: RLY neccesary?
136            //HiveLogger.Info(this.ToString() + ": Slave " + slave.Id + " has wrong state: Shouldn't have offline or nullstate, has " + slave.State);
137            heartbeatLock.EnterWriteLock();
138            //HiveLogger.Info(this.ToString() + ": Slave " + slave.Id + " has wrong state: Resetting all his jobs");
139            if (lastHeartbeats.ContainsKey(slave.Id))
140              lastHeartbeats.Remove(slave.Id);
141            foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
142              DaoLocator.JobDao.SetJobOffline(job);
143            }
144            heartbeatLock.ExitWriteLock();
145          }
146        }
147        CheckForPendingJobs();
148        //        DaoLocator.DestroyContext();
149        scope.Complete();
150      }
151    }
152
153    private void CheckForPendingJobs() {
154      IList<JobDto> pendingJobsInDB = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(JobState.Pending));
155
156      foreach (JobDto currJob in pendingJobsInDB) {
157        lock (pendingJobs) {
158          if (pendingJobs.ContainsKey(currJob.Id)) {
159            if (pendingJobs[currJob.Id] <= 0) {
160              currJob.State = JobState.Offline;
161              DaoLocator.JobDao.Update(currJob);
162            } else {
163              pendingJobs[currJob.Id]--;
164            }
165          }
166        }
167      }
168    }
169
170    #region ISlaveCommunicator Members
171
172    /// <summary>
173    /// Login process for the slave
174    /// A hearbeat entry is created as well (login is the first hearbeat)
175    /// </summary>
176    /// <param name="slaveInfo"></param>
177    /// <returns></returns>
178    public Response Login(SlaveDto slave) {
179      Response response = new Response();
180
181      heartbeatLock.EnterWriteLock();
182      if (lastHeartbeats.ContainsKey(slave.Id)) {
183        lastHeartbeats[slave.Id] = DateTime.Now;
184      } else {
185        lastHeartbeats.Add(slave.Id, DateTime.Now);
186      }
187      heartbeatLock.ExitWriteLock();
188
189      SlaveDto dbSlave = DaoLocator.SlaveDao.FindById(slave.Id);
190
191      slave.CalendarSyncStatus = dbSlave != null ? dbSlave.CalendarSyncStatus : CalendarState.NotAllowedToFetch;
192      slave.State = SlaveState.Idle;
193
194      if (dbSlave == null)
195        DaoLocator.SlaveDao.Insert(slave);
196      else {
197        DaoLocator.SlaveDao.Update(slave);
198      }
199
200      return response;
201    }
202
203    public ResponseCalendar GetCalendar(Guid slaveId) {
204      ResponseCalendar response = new ResponseCalendar();
205
206      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
207      if (slave == null) {
208        //response.Success = false;
209        response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
210        return response;
211      }
212
213      response.ForceFetch = (slave.CalendarSyncStatus == CalendarState.ForceFetch);
214
215      IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForSlave(slave);
216      if (appointments.Count() == 0) {
217        response.StatusMessage = ResponseStatus.GetCalendar_NoCalendarFound;
218        //response.Success = false;
219      } else {
220        //response.Success = true;
221        response.Appointments = appointments;
222      }
223
224      slave.CalendarSyncStatus = CalendarState.Fetched;
225      DaoLocator.SlaveDao.Update(slave);
226      return response;
227    }
228
229    public Response SetCalendarStatus(Guid slaveId, CalendarState state) {
230      Response response = new Response();
231      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
232      if (slave == null) {
233        //response.Success = false;
234        response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
235        return response;
236      }
237
238      slave.CalendarSyncStatus = state;
239      DaoLocator.SlaveDao.Update(slave);
240
241      return response;
242    }
243
244    /// <summary>
245    /// The slave has to send regulary heartbeats
246    /// this hearbeats will be stored in the heartbeats dictionary
247    /// check if there is work for the slave and send the slave a response if he should pull a job
248    /// </summary>
249    /// <param name="hbData"></param>
250    /// <returns></returns>
251    public ResponseHeartBeat ProcessHeartBeat(HeartBeatData hbData) {
252      Logger.Debug("BEGIN Processing Heartbeat for Slave " + hbData.SlaveId);
253
254      ResponseHeartBeat response = new ResponseHeartBeat();
255      response.ActionRequest = new List<MessageContainer>();
256
257      Logger.Debug("BEGIN Started Slave Fetching");
258      SlaveDto slave = DaoLocator.SlaveDao.FindById(hbData.SlaveId);
259      Logger.Debug("END Finished Slave Fetching");
260
261      slave.NrOfFreeCores = hbData.FreeCores;
262      slave.FreeMemory = hbData.FreeMemory;
263      slave.IsAllowedToCalculate = hbData.IsAllowedToCalculate;
264
265      // check if the slave is logged in
266      if (slave.State == SlaveState.Offline || slave.State == SlaveState.NullState) {
267        response.StatusMessage = ResponseStatus.ProcessHeartBeat_UserNotLoggedIn;
268        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
269        Logger.Error("ProcessHeartBeat: Slave state null or offline: " + slave);
270        return response;
271      }
272
273      // save timestamp of this heartbeat
274      Logger.Debug("BEGIN Locking for Heartbeats");
275      heartbeatLock.EnterWriteLock();
276      Logger.Debug("END Locked for Heartbeats");
277      if (lastHeartbeats.ContainsKey(hbData.SlaveId)) {
278        lastHeartbeats[hbData.SlaveId] = DateTime.Now;
279      } else {
280        lastHeartbeats.Add(hbData.SlaveId, DateTime.Now);
281      }
282      heartbeatLock.ExitWriteLock();
283
284      Logger.Debug("BEGIN Processing Heartbeat Jobs");
285      ProcessJobProcess(hbData, response);
286      Logger.Debug("END Processed Heartbeat Jobs");
287
288      //check if new Cal must be loaded
289      if (slave.CalendarSyncStatus == CalendarState.Fetch || slave.CalendarSyncStatus == CalendarState.ForceFetch) {
290        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
291        //slave.CalendarSyncStatus = CalendarState.Fetching;
292        Logger.Info("fetch or forcefetch sent");
293      }
294
295      // check if slave has a free core for a new job
296      // if true, ask scheduler for a new job for this slave
297      Logger.Debug(" BEGIN Looking for Slave Jobs");
298      if (slave.IsAllowedToCalculate && hbData.FreeCores > 0 && scheduler.ExistsJobForSlave(hbData)) {
299        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
300      } else {
301        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
302      }
303      Logger.Debug(" END Looked for Slave Jobs");
304
305      DaoLocator.SlaveDao.Update(slave);
306
307      //tx.Commit();
308      Logger.Debug(" END Processed Heartbeat for Slave " + hbData.SlaveId);
309      return response;
310    }
311
312    /// <summary>
313    /// Process the Job progress sent by a slave
314    /// [chn] this method needs to be refactored, because its a performance hog
315    ///
316    /// what it does:
317    /// (1) find out if the jobs that should be calculated by this slave (from db) and compare if they are consistent with what the joblist the slave sent
318    /// (2) find out if every job from the joblist really should be calculated by this slave
319    /// (3) checks if a job should be aborted and issues Message
320    /// (4) update job-progress and write to db
321    /// (5) if snapshot is requested, issue Message
322    ///
323    /// (6) for each job from DB, check if there is a job from slave (again).
324    /// (7) if job matches, it is removed from newAssigneJobs
325    /// (8) if job !matches, job's TTL is reduced by 1,
326    /// (9) if TTL==0, job is set to Abort (save to DB), and Message to Abort job is issued to slave
327    ///
328    ///
329    ///
330    /// quirks:
331    /// (1) the response-object is modified during the foreach-loop (only last element counts)
332    /// (2) state Abort results in Finished. This should be: AbortRequested, Aborted.
333    /// </summary>
334    /// <param name="hbData"></param>
335    /// <param name="jobAdapter"></param>
336    /// <param name="slaveAdapter"></param>
337    /// <param name="response"></param>
338    private void ProcessJobProcess(HeartBeatData hbData, ResponseHeartBeat response) {
339      Logger.Debug("Started for Slave " + hbData.SlaveId);
340      List<JobDto> jobsOfSlave = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(hbData.SlaveId)));
341      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
342        if (jobsOfSlave == null || jobsOfSlave.Count == 0) {
343          //response.Success = false;
344          //response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
345
346          foreach (Guid jobId in hbData.JobProgress.Keys) {
347            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
348          }
349
350          Logger.Error("There is no job calculated by this user " + hbData.SlaveId + ", advise him to abort all");
351          return;
352        }
353
354        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
355          JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
356          curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id);
357          if (curJob.Slave == null || curJob.Slave.Id != hbData.SlaveId) {
358            //response.Success = false;
359            //response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
360            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
361            Logger.Error("There is no job calculated by this user " + hbData.SlaveId + " Job: " + curJob);
362          } else if (curJob.State == JobState.Aborted) {
363            // a request to abort the job has been set
364            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
365            curJob.State = JobState.Finished;
366          } else {
367            // save job progress
368            curJob.Percentage = jobProgress.Value;
369
370            if (curJob.State == JobState.SnapshotRequested) {
371              // a request for a snapshot has been set
372              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
373              curJob.State = JobState.SnapshotSent;
374            }
375          }
376          DaoLocator.JobDao.Update(curJob);
377        }
378      }
379      foreach (JobDto currJob in jobsOfSlave) {
380        bool found = false;
381        if (hbData.JobProgress != null) {
382          foreach (Guid jobId in hbData.JobProgress.Keys) {
383            if (jobId == currJob.Id) {
384              found = true;
385              break;
386            }
387          }
388        }
389        if (!found) {
390          lock (newAssignedJobs) {
391            if (newAssignedJobs.ContainsKey(currJob.Id)) {
392              newAssignedJobs[currJob.Id]--;
393              Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);
394              if (newAssignedJobs[currJob.Id] <= 0) {
395                Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave);
396
397                currJob.State = JobState.Offline;
398                DaoLocator.JobDao.Update(currJob);
399
400                response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
401
402                newAssignedJobs.Remove(currJob.Id);
403              }
404            } else {
405              Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
406              currJob.State = JobState.Offline;
407              DaoLocator.JobDao.Update(currJob);
408            }
409          } // lock
410        } else {
411          lock (newAssignedJobs) {
412
413            if (newAssignedJobs.ContainsKey(currJob.Id)) {
414              Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
415              newAssignedJobs.Remove(currJob.Id);
416            }
417          }
418        }
419      }
420    }
421
422    /// <summary>
423    /// if the slave was told to pull a job he calls this method
424    /// the server selects a job and sends it to the slave
425    /// </summary>
426    /// <param name="slaveId"></param>
427    /// <returns></returns>
428    public ResponseObject<JobDto> GetJob(Guid slaveId) {
429      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
430
431      JobDto job2Calculate = scheduler.GetNextJobForSlave(slaveId);
432      if (job2Calculate != null) {
433        response.Obj = job2Calculate;
434        response.Obj.PluginsNeeded = DaoLocator.PluginInfoDao.GetPluginDependenciesForJob(response.Obj);
435
436        Logger.Info("Job pulled: " + job2Calculate + " for user " + slaveId);
437        lock (newAssignedJobs) {
438          if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
439            newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
440        }
441      } else {
442        //response.Success = false;
443        response.Obj = null;
444        response.StatusMessage = ResponseStatus.GetJob_NoJobsAvailable;
445        Logger.Info("No more Jobs left for " + slaveId);
446      }
447      return response;
448    }
449
450    public ResponseResultReceived ProcessJobResult(Stream stream, bool finished) {
451      Logger.Info("BEGIN Job received for Storage - main method:");
452
453      //Stream jobResultStream = null;
454      //Stream jobStream = null;
455
456      //try {
457      BinaryFormatter formatter = new BinaryFormatter();
458
459      JobResult result = (JobResult)formatter.Deserialize(stream);
460
461      //important - repeatable read isolation level is required here,
462      //otherwise race conditions could occur when writing the stream into the DB
463      //just removed TransactionIsolationLevel.RepeatableRead
464      //tx = session.BeginTransaction();
465
466      ResponseResultReceived response = ProcessJobResult(result.SlaveId, result.JobId, new byte[] { }, result.Percentage, result.Exception, finished);
467
468      if (response.StatusMessage == ResponseStatus.Ok) {
469        Logger.Debug("Trying to aquire WCF Job Stream");
470        //jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
471        //Logger.Debug("Job Stream Aquired");
472        byte[] buffer = new byte[3024];
473        List<byte> serializedJob = new List<byte>();
474        int read = 0;
475        int i = 0;
476        while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
477          for (int j = 0; j < read; j++) {
478            serializedJob.Add(buffer[j]);
479          }
480          if (i % 100 == 0)
481            Logger.Debug("Writing to stream: " + i);
482          //jobStream.Write(buffer, 0, read);
483          i++;
484        }
485        Logger.Debug("Done Writing, closing the stream!");
486        //jobStream.Close();
487
488        DaoLocator.JobDao.SetBinaryJobFile(result.JobId, serializedJob.ToArray());
489      }
490      Logger.Info("END Job received for Storage:");
491      stream.Dispose();
492      return response;
493    }
494
495    private ResponseResultReceived ProcessJobResult(Guid slaveId, Guid jobId, byte[] result, double? percentage, string exception, bool finished) {
496      Logger.Info("BEGIN Job received for Storage - SUB method: " + jobId);
497
498      ResponseResultReceived response = new ResponseResultReceived();
499      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
500
501      SerializedJob job = new SerializedJob();
502
503      if (job != null) {
504        job.JobInfo = DaoLocator.JobDao.FindById(jobId);
505        if (job.JobInfo != null) {
506          job.JobInfo.Slave = job.JobInfo.Slave = DaoLocator.SlaveDao.GetSlaveForJob(jobId);
507        }
508      }
509
510      if (job != null && job.JobInfo == null) {
511        //response.Success = false;
512        response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
513        response.JobId = jobId;
514        Logger.Error("No job with Id " + jobId);
515
516        //tx.Rollback();
517        return response;
518      }
519      if (job.JobInfo.State == JobState.Aborted) {
520        //response.Success = false;
521        response.StatusMessage = ResponseStatus.ProcessJobResult_JobAborted;
522
523        Logger.Error("Job was aborted! " + job.JobInfo);
524
525        //tx.Rollback();
526        return response;
527      }
528      if (job.JobInfo.Slave == null) {
529        //response.Success = false;
530        response.StatusMessage = ResponseStatus.ProcessJobResult_JobIsNotBeeingCalculated;
531        response.JobId = jobId;
532
533        Logger.Error("Job is not being calculated (slave = null)! " + job.JobInfo);
534
535        //tx.Rollback();
536        return response;
537      }
538      if (job.JobInfo.Slave.Id != slaveId) {
539        //response.Success = false;
540        response.StatusMessage = ResponseStatus.ProcessJobResult_WrongSlaveForJob;
541        response.JobId = jobId;
542
543        Logger.Error("Wrong Slave for this Job! " + job.JobInfo + ", Sending Slave is: " + slaveId);
544
545        //tx.Rollback();
546        return response;
547      }
548      if (job.JobInfo.State == JobState.Finished) {
549        response.StatusMessage = ResponseStatus.Ok;
550        response.JobId = jobId;
551
552        Logger.Error("Job already finished! " + job.JobInfo + ", Sending Slave is: " + slaveId);
553
554        //tx.Rollback();
555        return response;
556      }
557      //Todo: RequestsnapshotSent => calculating?
558      if (job.JobInfo.State == JobState.SnapshotSent) {
559        job.JobInfo.State = JobState.Calculating;
560      }
561      if (job.JobInfo.State != JobState.Calculating && job.JobInfo.State != JobState.Pending) {
562        //response.Success = false;
563        response.StatusMessage = ResponseStatus.ProcessJobResult_InvalidJobState;
564        response.JobId = jobId;
565
566        Logger.Error("Wrong Job State, job is: " + job.JobInfo);
567
568        //tx.Rollback();
569        return response;
570      }
571      job.JobInfo.Percentage = percentage;
572
573      if (!string.IsNullOrEmpty(exception)) {
574        job.JobInfo.State = JobState.Failed;
575        job.JobInfo.Exception = exception;
576        job.JobInfo.DateFinished = DateTime.Now;
577      } else if (finished) {
578        job.JobInfo.State = JobState.Finished;
579        job.JobInfo.DateFinished = DateTime.Now;
580      }
581
582      job.SerializedJobData = result;
583
584      DaoLocator.JobDao.Update(job.JobInfo);
585
586      response.StatusMessage = ResponseStatus.Ok;
587      response.JobId = jobId;
588      response.Finished = finished;
589
590      Logger.Info("END Job received for Storage - SUB method: " + jobId);
591      return response;
592
593    }
594
595    /// <summary>
596    /// the slave can send job results during calculating
597    /// and will send a final job result when he finished calculating
598    /// these job results will be stored in the database
599    /// </summary>
600    /// <param name="slaveId"></param>
601    /// <param name="jobId"></param>
602    /// <param name="result"></param>
603    /// <param name="exception"></param>
604    /// <param name="finished"></param>
605    /// <returns></returns>
606    public ResponseResultReceived StoreFinishedJobResult(Guid slaveId,
607      Guid jobId,
608      byte[] result,
609      double percentage,
610      string exception) {
611
612      return ProcessJobResult(slaveId, jobId, result, percentage, exception, true);
613    }
614
615    public ResponseResultReceived ProcessSnapshot(Guid slaveId, Guid jobId, byte[] result, double percentage, string exception) {
616      return ProcessJobResult(slaveId, jobId, result, percentage, exception, false);
617    }
618
619    /// <summary>
620    /// when a slave logs out the state will be set
621    /// and the entry in the last hearbeats dictionary will be removed
622    /// </summary>
623    /// <param name="slaveId"></param>
624    /// <returns></returns>                       
625    public Response Logout(Guid slaveId) {
626      Logger.Info("Slave logged out " + slaveId);
627
628      Response response = new Response();
629
630      heartbeatLock.EnterWriteLock();
631      if (lastHeartbeats.ContainsKey(slaveId))
632        lastHeartbeats.Remove(slaveId);
633      heartbeatLock.ExitWriteLock();
634
635      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
636      if (slave == null) {
637        //response.Success = false;
638        response.StatusMessage = ResponseStatus.Logout_SlaveNotRegistered;
639        return response;
640      }
641      if (slave.State == SlaveState.Calculating) {
642        // check wich job the slave was calculating and reset it
643        IEnumerable<JobDto> jobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(slave);
644        foreach (JobDto job in jobsOfSlave) {
645          if (job.State != JobState.Finished)
646            DaoLocator.JobDao.SetJobOffline(job);
647        }
648      }
649
650      slave.State = SlaveState.Offline;
651      DaoLocator.SlaveDao.Update(slave);
652
653      return response;
654    }
655
656    /// <summary>
657    /// If a slave goes offline and restores a job he was calculating
658    /// he can ask the slave if he still needs the job result
659    /// </summary>
660    /// <param name="jobId"></param>
661    /// <returns></returns>
662    public Response IsJobStillNeeded(Guid jobId) {
663      Response response = new Response();
664      JobDto job = DaoLocator.JobDao.FindById(jobId);
665      if (job == null) {
666        //response.Success = false;
667        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobDoesNotExist;
668        Logger.Error("Job doesn't exist (anymore)! " + jobId);
669        return response;
670      }
671      if (job.State == JobState.Finished) {
672        //response.Success = true;
673        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobAlreadyFinished;
674        Logger.Error("already finished! " + job);
675        return response;
676      }
677      job.State = JobState.Pending;
678      lock (pendingJobs) {
679        pendingJobs.Add(job.Id, PENDING_TIMEOUT);
680      }
681
682      DaoLocator.JobDao.Update(job);
683
684      return response;
685    }
686
687    public ResponseList<CachedHivePluginInfoDto> GetPlugins(List<HivePluginInfoDto> pluginList) {
688      ResponseList<CachedHivePluginInfoDto> response = new ResponseList<CachedHivePluginInfoDto>();
689      response.List = new List<CachedHivePluginInfoDto>();
690      foreach (HivePluginInfoDto pluginInfo in pluginList) {
691        if (pluginInfo.Update) {
692          //check if there is a newer version         
693          IPluginDescription ipd = ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision > pluginInfo.Version.Revision).SingleOrDefault();
694          if (ipd != null) {
695            response.List.Add(ConvertPluginDescriptorToDto(ipd));
696          }
697        } else {
698          IPluginDescription ipd = ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision >= pluginInfo.Version.Revision).SingleOrDefault();
699          if (ipd != null) {
700            response.List.Add(ConvertPluginDescriptorToDto(ipd));
701          } else {
702            //response.Success = false;
703            response.StatusMessage = ResponseStatus.GetPlugins_PluginsNotAvailable;
704            return response;
705          }
706        }
707      }
708      return response;
709    }
710
711    private CachedHivePluginInfoDto ConvertPluginDescriptorToDto(IPluginDescription currPlugin) {
712      CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
713        Name = currPlugin.Name,
714        Version = currPlugin.Version
715      };
716
717      foreach (string fileName in from file in currPlugin.Files select file.Name) {
718        currCachedPlugin.PluginFiles.Add(new HivePluginFile(File.ReadAllBytes(fileName), fileName));
719      }
720      return currCachedPlugin;
721    }
722
723    #endregion
724  }
725}
Note: See TracBrowser for help on using the repository browser.