Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs @ 4333

Last change on this file since 4333 was 4333, checked in by cneumuel, 12 years ago

added authorizationManager which checks for permission to specific jobs (#1168)

File size: 29.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Linq;
26using System.Runtime.Serialization.Formatters.Binary;
27using System.Threading;
28using System.Transactions;
29using HeuristicLab.Hive.Contracts;
30using HeuristicLab.Hive.Contracts.BusinessObjects;
31using HeuristicLab.Hive.Contracts.Interfaces;
32using HeuristicLab.Hive.Server.Core.InternalInterfaces;
33using HeuristicLab.PluginInfrastructure;
34using HeuristicLab.Tracing;
35using HeuristicLab.Hive.Contracts.ResponseObjects;
36using System.Security.Permissions;
37using System.Web.Security;
38using System.Security.Principal;
39using System.ServiceModel;
40using System.Security;
41
42namespace HeuristicLab.Hive.Server.Core {
43  /// <summary>
44  /// The SlaveCommunicator manages the whole communication with the slave
45  /// </summary>
46  public class SlaveCommunicator : ISlaveCommunicator,
47    IInternalSlaveCommunicator {
48    private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>();
49    private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>();
50    private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>();
51
52    private static ReaderWriterLockSlim heartbeatLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
53
54    //private ISessionFactory factory;
55    private ILifecycleManager lifecycleManager;
56    private IInternalJobManager jobManager;
57    private IScheduler scheduler;
58
59    private static int PENDING_TIMEOUT = 100;
60
61    /// <summary>
62    /// Initialization of the Adapters to the database
63    /// Initialization of Eventhandler for the lifecycle management
64    /// Initialization of lastHearbeats Dictionary
65    /// </summary>
66    public SlaveCommunicator() {
67      //factory = ServiceLocator.GetSessionFactory();
68
69      lifecycleManager = ServiceLocator.GetLifecycleManager();
70      jobManager = ServiceLocator.GetJobManager() as IInternalJobManager;
71      scheduler = ServiceLocator.GetScheduler();
72
73      lifecycleManager.RegisterHeartbeat(new EventHandler(lifecycleManager_OnServerHeartbeat));
74    }
75
76    /// <summary>
77    /// Check if online slaves send their hearbeats
78    /// if not -> set them offline and check if they where calculating a job
79    /// </summary>
80    /// <param name="sender"></param>
81    /// <param name="e"></param>
82    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
83      Logger.Debug("Server Heartbeat ticked");
84
85      // [chn] why is transaction management done here
86      using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
87        List<SlaveDto> allSlaves = new List<SlaveDto>(DaoLocator.SlaveDao.FindAll());
88
89        foreach (SlaveDto slave in allSlaves) {
90          if (slave.State != SlaveState.Offline && slave.State != SlaveState.NullState) {
91            heartbeatLock.EnterUpgradeableReadLock();
92
93            if (!lastHeartbeats.ContainsKey(slave.Id)) {
94              Logger.Info("Slave " + slave.Id +
95                              " wasn't offline but hasn't sent heartbeats - setting offline");
96              slave.State = SlaveState.Offline;
97              DaoLocator.SlaveDao.Update(slave);
98              Logger.Info("Slave " + slave.Id +
99                              " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
100              foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
101                //maybe implementa n additional Watchdog? Till then, just set them offline..
102                DaoLocator.JobDao.SetJobOffline(job);
103              }
104            } else {
105              DateTime lastHbOfSlave = lastHeartbeats[slave.Id];
106
107              TimeSpan dif = DateTime.Now.Subtract(lastHbOfSlave);
108              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
109              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
110                // if slave calculated jobs, the job must be reset
111                Logger.Info("Slave timed out and is on RESET");
112                foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
113                  DaoLocator.JobDao.SetJobOffline(job);
114                  lock (newAssignedJobs) {
115                    if (newAssignedJobs.ContainsKey(job.Id))
116                      newAssignedJobs.Remove(job.Id);
117                  }
118                }
119                Logger.Debug("setting slave offline");
120                // slave must be set offline
121                slave.State = SlaveState.Offline;
122
123                //slaveAdapter.Update(slave);
124                DaoLocator.SlaveDao.Update(slave);
125
126                Logger.Debug("removing it from the heartbeats list");
127                heartbeatLock.EnterWriteLock();
128                lastHeartbeats.Remove(slave.Id);
129                heartbeatLock.ExitWriteLock();
130              }
131            }
132
133            heartbeatLock.ExitUpgradeableReadLock();
134          } else {
135            //TODO: RLY neccesary?
136            //HiveLogger.Info(this.ToString() + ": Slave " + slave.Id + " has wrong state: Shouldn't have offline or nullstate, has " + slave.State);
137            heartbeatLock.EnterWriteLock();
138            //HiveLogger.Info(this.ToString() + ": Slave " + slave.Id + " has wrong state: Resetting all his jobs");
139            if (lastHeartbeats.ContainsKey(slave.Id))
140              lastHeartbeats.Remove(slave.Id);
141            foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
142              DaoLocator.JobDao.SetJobOffline(job);
143            }
144            heartbeatLock.ExitWriteLock();
145          }
146        }
147        CheckForPendingJobs();
148        //        DaoLocator.DestroyContext();
149        scope.Complete();
150      }
151    }
152
153    private void CheckForPendingJobs() {
154      IList<JobDto> pendingJobsInDB = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(JobState.Pending));
155
156      foreach (JobDto currJob in pendingJobsInDB) {
157        lock (pendingJobs) {
158          if (pendingJobs.ContainsKey(currJob.Id)) {
159            if (pendingJobs[currJob.Id] <= 0) {
160              currJob.State = JobState.Offline;
161              DaoLocator.JobDao.Update(currJob);
162            } else {
163              pendingJobs[currJob.Id]--;
164            }
165          }
166        }
167      }
168    }
169
170    #region ISlaveCommunicator Members
171
172    /// <summary>
173    /// Login process for the slave
174    /// A hearbeat entry is created as well (login is the first hearbeat)
175    /// </summary>
176    /// <param name="slaveInfo"></param>
177    /// <returns></returns>
178    public Response Login(SlaveDto slave) {
179      Response response = new Response();
180
181      heartbeatLock.EnterWriteLock();
182      if (lastHeartbeats.ContainsKey(slave.Id)) {
183        lastHeartbeats[slave.Id] = DateTime.Now;
184      } else {
185        lastHeartbeats.Add(slave.Id, DateTime.Now);
186      }
187      heartbeatLock.ExitWriteLock();
188
189      SlaveDto dbSlave = DaoLocator.SlaveDao.FindById(slave.Id);
190
191      slave.CalendarSyncStatus = dbSlave != null ? dbSlave.CalendarSyncStatus : CalendarState.NotAllowedToFetch;
192      slave.State = SlaveState.Idle;
193
194      if (dbSlave == null)
195        DaoLocator.SlaveDao.Insert(slave);
196      else {
197        DaoLocator.SlaveDao.Update(slave);
198      }
199
200      return response;
201    }
202
203    public ResponseCalendar GetCalendar(Guid slaveId) {
204      ResponseCalendar response = new ResponseCalendar();
205
206      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
207      if (slave == null) {
208        //response.Success = false;
209        response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
210        return response;
211      }
212
213      response.ForceFetch = (slave.CalendarSyncStatus == CalendarState.ForceFetch);
214
215      IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForSlave(slave);
216      if (appointments.Count() == 0) {
217        response.StatusMessage = ResponseStatus.GetCalendar_NoCalendarFound;
218        //response.Success = false;
219      } else {
220        //response.Success = true;
221        response.Appointments = appointments;
222      }
223
224      slave.CalendarSyncStatus = CalendarState.Fetched;
225      DaoLocator.SlaveDao.Update(slave);
226      return response;
227    }
228
229    public Response SetCalendarStatus(Guid slaveId, CalendarState state) {
230      Response response = new Response();
231      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
232      if (slave == null) {
233        //response.Success = false;
234        response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
235        return response;
236      }
237
238      slave.CalendarSyncStatus = state;
239      DaoLocator.SlaveDao.Update(slave);
240
241      return response;
242    }
243
244    /// <summary>
245    /// The slave has to send regulary heartbeats
246    /// this hearbeats will be stored in the heartbeats dictionary
247    /// check if there is work for the slave and send the slave a response if he should pull a job
248    /// </summary>
249    /// <param name="heartbeatData"></param>
250    /// <returns></returns>
251    public ResponseHeartBeat ProcessHeartBeat(HeartBeatData heartbeatData) {
252      Logger.Debug("BEGIN Processing Heartbeat for Slave " + heartbeatData.SlaveId);
253
254      ResponseHeartBeat response = new ResponseHeartBeat();
255      response.ActionRequest = new List<MessageContainer>();
256
257      Logger.Debug("BEGIN Started Slave Fetching");
258      SlaveDto slave = DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId);
259      Logger.Debug("END Finished Slave Fetching");
260
261      slave.NrOfFreeCores = heartbeatData.FreeCores;
262      slave.FreeMemory = heartbeatData.FreeMemory;
263      slave.IsAllowedToCalculate = heartbeatData.IsAllowedToCalculate;
264
265      // check if the slave is logged in
266      if (slave.State == SlaveState.Offline || slave.State == SlaveState.NullState) {
267        response.StatusMessage = ResponseStatus.ProcessHeartBeat_UserNotLoggedIn;
268        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
269        Logger.Error("ProcessHeartBeat: Slave state null or offline: " + slave);
270        return response;
271      }
272
273      // save timestamp of this heartbeat
274      Logger.Debug("BEGIN Locking for Heartbeats");
275      heartbeatLock.EnterWriteLock();
276      Logger.Debug("END Locked for Heartbeats");
277      if (lastHeartbeats.ContainsKey(heartbeatData.SlaveId)) {
278        lastHeartbeats[heartbeatData.SlaveId] = DateTime.Now;
279      } else {
280        lastHeartbeats.Add(heartbeatData.SlaveId, DateTime.Now);
281      }
282      heartbeatLock.ExitWriteLock();
283
284      Logger.Debug("BEGIN Processing Heartbeat Jobs");
285      ProcessJobProcess(heartbeatData, response);
286      Logger.Debug("END Processed Heartbeat Jobs");
287
288      //check if new Cal must be loaded
289      if (slave.CalendarSyncStatus == CalendarState.Fetch || slave.CalendarSyncStatus == CalendarState.ForceFetch) {
290        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
291        //slave.CalendarSyncStatus = CalendarState.Fetching;
292        Logger.Info("fetch or forcefetch sent");
293      }
294
295      // check if slave has a free core for a new job
296      // if true, ask scheduler for a new job for this slave
297      Logger.Debug(" BEGIN Looking for Slave Jobs");
298      if (slave.IsAllowedToCalculate && heartbeatData.FreeCores > 0 && scheduler.ExistsJobForSlave(heartbeatData)) {
299        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
300      } else {
301        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
302      }
303      Logger.Debug(" END Looked for Slave Jobs");
304
305      DaoLocator.SlaveDao.Update(slave);
306
307      //tx.Commit();
308      Logger.Debug(" END Processed Heartbeat for Slave " + heartbeatData.SlaveId);
309      return response;
310    }
311
312    /// <summary>
313    /// Process the Job progress sent by a slave
314    /// [chn] this method needs to be refactored, because its a performance hog
315    ///
316    /// what it does:
317    /// (1) find out if the jobs that should be calculated by this slave (from db) and compare if they are consistent with what the joblist the slave sent
318    /// (2) find out if every job from the joblist really should be calculated by this slave
319    /// (3) checks if a job should be aborted and issues Message
320    /// (4) update job-progress and write to db
321    /// (5) if snapshot is requested, issue Message
322    ///
323    /// (6) for each job from DB, check if there is a job from slave (again).
324    /// (7) if job matches, it is removed from newAssigneJobs
325    /// (8) if job !matches, job's TTL is reduced by 1,
326    /// (9) if TTL==0, job is set to Abort (save to DB), and Message to Abort job is issued to slave
327    ///
328    ///
329    ///
330    /// quirks:
331    /// (1) the response-object is modified during the foreach-loop (only last element counts)
332    /// (2) state Abort results in Finished. This should be: AbortRequested, Aborted.
333    /// </summary>
334    /// <param name="hbData"></param>
335    /// <param name="jobAdapter"></param>
336    /// <param name="slaveAdapter"></param>
337    /// <param name="response"></param>
338    private void ProcessJobProcess(HeartBeatData hbData, ResponseHeartBeat response) {
339      Logger.Debug("Started for Slave " + hbData.SlaveId);
340      List<JobDto> jobsOfSlave = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(hbData.SlaveId)));
341      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
342        if (jobsOfSlave == null || jobsOfSlave.Count == 0) {
343          foreach (Guid jobId in hbData.JobProgress.Keys) {
344            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
345          }
346
347          Logger.Error("There is no job calculated by this user " + hbData.SlaveId + ", advise him to abort all");
348          return;
349        }
350
351        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
352          JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
353          if (curJob == null) {
354            response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
355            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
356            Logger.Error("Job does not exist in DB: " + jobProgress.Key);
357            return;
358          }
359          curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id);
360          if (curJob.Slave == null || curJob.Slave.Id != hbData.SlaveId) {
361            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
362            Logger.Error("There is no job calculated by this user " + hbData.SlaveId + " Job: " + curJob);
363          } else if (curJob.State == JobState.Aborted) {
364            // a request to abort the job has been set
365            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
366            curJob.State = JobState.Finished;
367          } else {
368            // save job progress
369            curJob.Percentage = jobProgress.Value;
370
371            if (curJob.State == JobState.SnapshotRequested) {
372              // a request for a snapshot has been set
373              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
374              curJob.State = JobState.SnapshotSent;
375            }
376          }
377          DaoLocator.JobDao.Update(curJob);
378        }
379      }
380      foreach (JobDto currJob in jobsOfSlave) {
381        bool found = false;
382        if (hbData.JobProgress != null) {
383          foreach (Guid jobId in hbData.JobProgress.Keys) {
384            if (jobId == currJob.Id) {
385              found = true;
386              break;
387            }
388          }
389        }
390        if (!found) {
391          lock (newAssignedJobs) {
392            if (newAssignedJobs.ContainsKey(currJob.Id)) {
393              newAssignedJobs[currJob.Id]--;
394              Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);
395              if (newAssignedJobs[currJob.Id] <= 0) {
396                Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave);
397
398                currJob.State = JobState.Offline;
399                DaoLocator.JobDao.Update(currJob);
400
401                response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
402
403                newAssignedJobs.Remove(currJob.Id);
404              }
405            } else {
406              Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
407              currJob.State = JobState.Offline;
408              DaoLocator.JobDao.Update(currJob);
409            }
410          } // lock
411        } else {
412          lock (newAssignedJobs) {
413
414            if (newAssignedJobs.ContainsKey(currJob.Id)) {
415              Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
416              newAssignedJobs.Remove(currJob.Id);
417            }
418          }
419        }
420      }
421    }
422
423    /// <summary>
424    /// if the slave was told to pull a job he calls this method
425    /// the server selects a job and sends it to the slave
426    /// </summary>
427    /// <param name="slaveId"></param>
428    /// <returns></returns>
429    public ResponseObject<JobDto> GetJob(Guid slaveId) {
430      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
431
432      JobDto job2Calculate = scheduler.GetNextJobForSlave(slaveId);
433      if (job2Calculate != null) {
434        response.Obj = job2Calculate;
435        response.Obj.PluginsNeeded = DaoLocator.PluginInfoDao.GetPluginDependenciesForJob(response.Obj);
436
437        Logger.Info("Job pulled: " + job2Calculate + " for user " + slaveId);
438        lock (newAssignedJobs) {
439          if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
440            newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
441        }
442      } else {
443        //response.Success = false;
444        response.Obj = null;
445        response.StatusMessage = ResponseStatus.GetJob_NoJobsAvailable;
446        Logger.Info("No more Jobs left for " + slaveId);
447      }
448      return response;
449    }
450
451    public ResponseResultReceived ProcessJobResult(Stream stream, bool finished) {
452      Logger.Info("BEGIN Job received for Storage - main method:");
453
454      //Stream jobResultStream = null;
455      //Stream jobStream = null;
456
457      //try {
458      BinaryFormatter formatter = new BinaryFormatter();
459
460      JobResult result = (JobResult)formatter.Deserialize(stream);
461
462      //important - repeatable read isolation level is required here,
463      //otherwise race conditions could occur when writing the stream into the DB
464      //just removed TransactionIsolationLevel.RepeatableRead
465      //tx = session.BeginTransaction();
466
467      ResponseResultReceived response = ProcessJobResult(result.SlaveId, result.JobId, new byte[] { }, result.Percentage, result.Exception, finished);
468
469      if (response.StatusMessage == ResponseStatus.Ok) {
470        Logger.Debug("Trying to aquire WCF Job Stream");
471        //jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
472        //Logger.Debug("Job Stream Aquired");
473        byte[] buffer = new byte[3024];
474        List<byte> serializedJob = new List<byte>();
475        int read = 0;
476        int i = 0;
477        while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
478          for (int j = 0; j < read; j++) {
479            serializedJob.Add(buffer[j]);
480          }
481          if (i % 100 == 0)
482            Logger.Debug("Writing to stream: " + i);
483          //jobStream.Write(buffer, 0, read);
484          i++;
485        }
486        Logger.Debug("Done Writing, closing the stream!");
487        //jobStream.Close();
488
489        DaoLocator.JobDao.SetBinaryJobFile(result.JobId, serializedJob.ToArray());
490      }
491      Logger.Info("END Job received for Storage:");
492      stream.Dispose();
493      return response;
494    }
495
496    private ResponseResultReceived ProcessJobResult(Guid slaveId, Guid jobId, byte[] result, double? percentage, string exception, bool finished) {
497      Logger.Info("BEGIN Job received for Storage - SUB method: " + jobId);
498
499      ResponseResultReceived response = new ResponseResultReceived();
500      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
501
502      SerializedJob job = new SerializedJob();
503
504      if (job != null) {
505        job.JobInfo = DaoLocator.JobDao.FindById(jobId);
506        if (job.JobInfo != null) {
507          job.JobInfo.Slave = job.JobInfo.Slave = DaoLocator.SlaveDao.GetSlaveForJob(jobId);
508        }
509      }
510
511      if (job != null && job.JobInfo == null) {
512        //response.Success = false;
513        response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
514        response.JobId = jobId;
515        Logger.Error("No job with Id " + jobId);
516
517        //tx.Rollback();
518        return response;
519      }
520      if (job.JobInfo.State == JobState.Aborted) {
521        //response.Success = false;
522        response.StatusMessage = ResponseStatus.ProcessJobResult_JobAborted;
523
524        Logger.Error("Job was aborted! " + job.JobInfo);
525
526        //tx.Rollback();
527        return response;
528      }
529      if (job.JobInfo.Slave == null) {
530        //response.Success = false;
531        response.StatusMessage = ResponseStatus.ProcessJobResult_JobIsNotBeeingCalculated;
532        response.JobId = jobId;
533
534        Logger.Error("Job is not being calculated (slave = null)! " + job.JobInfo);
535
536        //tx.Rollback();
537        return response;
538      }
539      if (job.JobInfo.Slave.Id != slaveId) {
540        //response.Success = false;
541        response.StatusMessage = ResponseStatus.ProcessJobResult_WrongSlaveForJob;
542        response.JobId = jobId;
543
544        Logger.Error("Wrong Slave for this Job! " + job.JobInfo + ", Sending Slave is: " + slaveId);
545
546        //tx.Rollback();
547        return response;
548      }
549      if (job.JobInfo.State == JobState.Finished) {
550        response.StatusMessage = ResponseStatus.Ok;
551        response.JobId = jobId;
552
553        Logger.Error("Job already finished! " + job.JobInfo + ", Sending Slave is: " + slaveId);
554
555        //tx.Rollback();
556        return response;
557      }
558      //Todo: RequestsnapshotSent => calculating?
559      if (job.JobInfo.State == JobState.SnapshotSent) {
560        job.JobInfo.State = JobState.Calculating;
561      }
562      if (job.JobInfo.State != JobState.Calculating && job.JobInfo.State != JobState.Pending) {
563        //response.Success = false;
564        response.StatusMessage = ResponseStatus.ProcessJobResult_InvalidJobState;
565        response.JobId = jobId;
566
567        Logger.Error("Wrong Job State, job is: " + job.JobInfo);
568
569        //tx.Rollback();
570        return response;
571      }
572      job.JobInfo.Percentage = percentage;
573
574      if (!string.IsNullOrEmpty(exception)) {
575        job.JobInfo.State = JobState.Failed;
576        job.JobInfo.Exception = exception;
577        job.JobInfo.DateFinished = DateTime.Now;
578      } else if (finished) {
579        job.JobInfo.State = JobState.Finished;
580        job.JobInfo.DateFinished = DateTime.Now;
581      }
582
583      job.SerializedJobData = result;
584
585      DaoLocator.JobDao.Update(job.JobInfo);
586
587      response.StatusMessage = ResponseStatus.Ok;
588      response.JobId = jobId;
589      response.Finished = finished;
590
591      Logger.Info("END Job received for Storage - SUB method: " + jobId);
592      return response;
593
594    }
595
596    /// <summary>
597    /// the slave can send job results during calculating
598    /// and will send a final job result when he finished calculating
599    /// these job results will be stored in the database
600    /// </summary>
601    /// <param name="slaveId"></param>
602    /// <param name="jobId"></param>
603    /// <param name="result"></param>
604    /// <param name="exception"></param>
605    /// <param name="finished"></param>
606    /// <returns></returns>
607    public ResponseResultReceived StoreFinishedJobResult(Guid slaveId,
608      Guid jobId,
609      byte[] result,
610      double percentage,
611      string exception) {
612
613      return ProcessJobResult(slaveId, jobId, result, percentage, exception, true);
614    }
615
616    public ResponseResultReceived ProcessSnapshot(Guid slaveId, Guid jobId, byte[] result, double percentage, string exception) {
617      return ProcessJobResult(slaveId, jobId, result, percentage, exception, false);
618    }
619
620    /// <summary>
621    /// when a slave logs out the state will be set
622    /// and the entry in the last hearbeats dictionary will be removed
623    /// </summary>
624    /// <param name="slaveId"></param>
625    /// <returns></returns>                       
626    public Response Logout(Guid slaveId) {
627      Logger.Info("Slave logged out " + slaveId);
628
629      Response response = new Response();
630
631      heartbeatLock.EnterWriteLock();
632      if (lastHeartbeats.ContainsKey(slaveId))
633        lastHeartbeats.Remove(slaveId);
634      heartbeatLock.ExitWriteLock();
635
636      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
637      if (slave == null) {
638        //response.Success = false;
639        response.StatusMessage = ResponseStatus.Logout_SlaveNotRegistered;
640        return response;
641      }
642      if (slave.State == SlaveState.Calculating) {
643        // check wich job the slave was calculating and reset it
644        IEnumerable<JobDto> jobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(slave);
645        foreach (JobDto job in jobsOfSlave) {
646          if (job.State != JobState.Finished)
647            DaoLocator.JobDao.SetJobOffline(job);
648        }
649      }
650
651      slave.State = SlaveState.Offline;
652      DaoLocator.SlaveDao.Update(slave);
653
654      return response;
655    }
656
657    /// <summary>
658    /// If a slave goes offline and restores a job he was calculating
659    /// he can ask the slave if he still needs the job result
660    /// </summary>
661    /// <param name="jobId"></param>
662    /// <returns></returns>
663    public Response IsJobStillNeeded(Guid jobId) {
664      Response response = new Response();
665      JobDto job = DaoLocator.JobDao.FindById(jobId);
666      if (job == null) {
667        //response.Success = false;
668        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobDoesNotExist;
669        Logger.Error("Job doesn't exist (anymore)! " + jobId);
670        return response;
671      }
672      if (job.State == JobState.Finished) {
673        //response.Success = true;
674        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobAlreadyFinished;
675        Logger.Error("already finished! " + job);
676        return response;
677      }
678      job.State = JobState.Pending;
679      lock (pendingJobs) {
680        pendingJobs.Add(job.Id, PENDING_TIMEOUT);
681      }
682
683      DaoLocator.JobDao.Update(job);
684
685      return response;
686    }
687
688    public ResponseList<CachedHivePluginInfoDto> GetPlugins(List<HivePluginInfoDto> pluginList) {
689      ResponseList<CachedHivePluginInfoDto> response = new ResponseList<CachedHivePluginInfoDto>();
690      response.List = new List<CachedHivePluginInfoDto>();
691      foreach (HivePluginInfoDto pluginInfo in pluginList) {
692        if (pluginInfo.Update) {
693          //check if there is a newer version         
694          IPluginDescription ipd = ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision > pluginInfo.Version.Revision).SingleOrDefault();
695          if (ipd != null) {
696            response.List.Add(ConvertPluginDescriptorToDto(ipd));
697          }
698        } else {
699          IPluginDescription ipd = ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision >= pluginInfo.Version.Revision).SingleOrDefault();
700          if (ipd != null) {
701            response.List.Add(ConvertPluginDescriptorToDto(ipd));
702          } else {
703            //response.Success = false;
704            response.StatusMessage = ResponseStatus.GetPlugins_PluginsNotAvailable;
705            return response;
706          }
707        }
708      }
709      return response;
710    }
711
712    private CachedHivePluginInfoDto ConvertPluginDescriptorToDto(IPluginDescription currPlugin) {
713      CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
714        Name = currPlugin.Name,
715        Version = currPlugin.Version
716      };
717
718      foreach (string fileName in from file in currPlugin.Files select file.Name) {
719        currCachedPlugin.PluginFiles.Add(new HivePluginFile(File.ReadAllBytes(fileName), fileName));
720      }
721      return currCachedPlugin;
722    }
723
724    #endregion
725  }
726}
Note: See TracBrowser for help on using the repository browser.