Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs @ 5213

Last change on this file since 5213 was 5213, checked in by cneumuel, 13 years ago

#1260

  • changed HiveEngine to be compatible with recent changes of engine
File size: 29.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Linq;
26using System.Runtime.Serialization.Formatters.Binary;
27using System.Threading;
28using System.Transactions;
29using HeuristicLab.Hive.Contracts;
30using HeuristicLab.Hive.Contracts.BusinessObjects;
31using HeuristicLab.Hive.Contracts.Interfaces;
32using HeuristicLab.Hive.Contracts.ResponseObjects;
33using HeuristicLab.Hive.Server.Core.InternalInterfaces;
34using HeuristicLab.PluginInfrastructure;
35using System.Diagnostics;
36using HeuristicLab.Hive.Tracing;
37using HeuristicLab.PluginInfrastructure.Manager;
38using System.Web;
39using System.Web.SessionState;
40
41namespace HeuristicLab.Hive.Server.Core {
42  /// <summary>
43  /// The SlaveCommunicator manages the whole communication with the slave
44  /// </summary>
45  public class SlaveCommunicator : ISlaveCommunicator, IInternalSlaveCommunicator {
46    private ILifecycleManager lifecycleManager;
47    private IInternalJobManager jobManager;
48    private IScheduler scheduler;
49
50    private static object slaveLocker = new object();
51
52    /// <summary>
53    /// Initialization of the Adapters to the database
54    /// Initialization of Eventhandler for the lifecycle management
55    /// Initialization of lastHearbeats Dictionary
56    /// </summary>
57    public SlaveCommunicator() {
58      Logger.Debug("ServiceCommunicator instantiated");
59      lifecycleManager = ServiceLocator.GetLifecycleManager();
60      jobManager = ServiceLocator.GetJobManager() as IInternalJobManager;
61      scheduler = ServiceLocator.GetScheduler();
62
63      lifecycleManager.ServerHeartbeat += new EventHandler(lifecycleManager_OnServerHeartbeat);
64    }
65
66    /// <summary>
67    /// Check if online slaves send their hearbeats
68    /// if not -> set them offline and check if they where calculating a job
69    /// </summary>
70    /// <param name="sender"></param>
71    /// <param name="e"></param>
72    public void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
73      // this block can conflict with the heartbeats from a slave (which also updates the slave-records)
74      lock (slaveLocker) {
75        Logger.Debug("Server Heartbeat ticked");
76        List<Guid> slaveIds = DaoLocator.SlaveDao.FindAll().Select(s => s.Id).ToList();
77        foreach (Guid slaveId in slaveIds) {
78          using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
79            SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
80
81            TimeSpan diff = TimeSpan.MaxValue;
82            if(slave.LastHeartbeat != null )
83              diff = DateTime.Now.Subtract(slave.LastHeartbeat.Value);
84
85            if (slave.State != SlaveState.Offline && diff.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIFF_SECONDS) {
86              // if slave calculated jobs, the job must be reset
87              Logger.Info("Slave timed out and is on RESET (no message for " + diff.TotalSeconds + " seconds.)");
88              foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
89                DaoLocator.JobDao.SetJobOffline(job);
90              }
91              Logger.Debug("setting slave offline");
92              slave.State = SlaveState.Offline;
93
94              DaoLocator.SlaveDao.Update(slave);
95
96              Logger.Debug("removing it from the heartbeats list");
97            }
98
99            scope.Complete();
100          } // using
101        } // foreach
102
103        using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
104          var timeoutJobs = DaoLocator.JobDao.FindTimeoutJobs();
105          foreach (var job in timeoutJobs) {
106            DaoLocator.JobDao.SetJobOffline(job);
107          }
108          scope.Complete();
109        }
110
111        Logger.Debug("Server Heartbeat ended");
112      } // lock
113    }
114
115    #region ISlaveCommunicator Members
116
117    /// <summary>
118    /// Login process for the slave
119    /// A hearbeat entry is created as well (login is the first hearbeat)
120    /// </summary>
121    /// <param name="slaveInfo"></param>
122    /// <returns></returns>
123    public Response Login(SlaveDto slave) {
124      Response response = new Response();
125
126      SlaveDto dbSlave = DaoLocator.SlaveDao.FindById(slave.Id);
127
128      slave.CalendarSyncStatus = dbSlave != null ? dbSlave.CalendarSyncStatus : CalendarState.NotAllowedToFetch;
129      slave.State = SlaveState.Idle;
130      slave.Login = DateTime.Now;
131
132      if (dbSlave == null) {
133        DaoLocator.SlaveDao.Insert(slave);
134      } else {
135        DaoLocator.SlaveDao.Update(slave);
136      }
137
138      return response;
139    }
140
141    public ResponseCalendar GetCalendar(Guid slaveId) {
142      ResponseCalendar response = new ResponseCalendar();
143
144      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
145      if (slave == null) {
146        response.StatusMessage = ResponseStatus.GetCalendar_ResourceDoesNotExist;
147        return response;
148      }
149
150      response.ForceFetch = (slave.CalendarSyncStatus == CalendarState.ForceFetch);
151
152      IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForSlave(slave);
153      if (appointments.Count() == 0) {
154        response.StatusMessage = ResponseStatus.GetCalendar_NoCalendarFound;
155      } else {
156        response.Appointments = appointments;
157      }
158
159      slave.CalendarSyncStatus = CalendarState.Fetched;
160      DaoLocator.SlaveDao.Update(slave);
161      return response;
162    }
163
164    public Response SetCalendarStatus(Guid slaveId, CalendarState state) {
165      Response response = new Response();
166      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
167      if (slave == null) {
168        response.StatusMessage = ResponseStatus.GetCalendar_ResourceDoesNotExist;
169        return response;
170      }
171
172      slave.CalendarSyncStatus = state;
173      DaoLocator.SlaveDao.Update(slave);
174
175      return response;
176    }
177
178    /// <summary>
179    /// The slave has to send regulary heartbeats
180    /// this hearbeats will be stored in the heartbeats dictionary
181    /// check if there is work for the slave and send the slave a response if he should pull a job
182    /// </summary>
183    /// <param name="heartbeatData"></param>
184    /// <returns></returns>
185    public ResponseHeartBeat ProcessHeartBeat(HeartBeatData heartbeatData) {
186      lock (slaveLocker) {
187        Logger.Debug("BEGIN Processing Heartbeat for Slave " + heartbeatData.SlaveId);
188        ResponseHeartBeat response = new ResponseHeartBeat();
189        response.ActionRequest = new List<MessageContainer>();
190
191        using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
192          SlaveDto slave = UpdateSlaveData(heartbeatData);
193          lock (DefaultScheduler.locker) {
194            DaoLocator.SlaveDao.Update(slave);
195          }
196
197          //ProcessJobProgress(heartbeatData, response);
198          response.ActionRequest = ProcessJobProgress(heartbeatData);
199
200          //check if new Cal must be loaded
201          if (slave.CalendarSyncStatus == CalendarState.Fetch || slave.CalendarSyncStatus == CalendarState.ForceFetch) {
202            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
203            Logger.Info("fetch or forcefetch sent");
204          }
205
206          // check if slave has a free core for a new job
207          // if true, ask scheduler for a new job for this slave
208          Logger.Debug(" BEGIN Looking for Slave Jobs");
209          if (this.IsAllowedToSendJobs() &&
210              slave.IsAllowedToCalculate &&
211              heartbeatData.FreeCores > 0 &&
212              scheduler.ExistsJobForSlave(heartbeatData)) {
213            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
214          } else {
215            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
216          }
217
218          scope.Complete();
219        }
220        Logger.Debug(" END Processed Heartbeat for Slave " + heartbeatData.SlaveId);
221        return response;
222      }
223    }
224
225    /// <summary>
226    /// Returns true if there are enough resources to send a job
227    /// There should not be too many jobs sent simultaniously
228    /// </summary>
229    private bool IsAllowedToSendJobs() {
230      return true; // try out without limit
231      //return lifecycleManager.JobsCurrentlyTransferring < ApplicationConstants.MAX_JOB_TRANSFER_COUNT;
232    }
233
234    private SlaveDto UpdateSlaveData(HeartBeatData heartbeatData) {
235      SlaveDto slave = DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId);
236      if (slave == null) {
237        slave = new SlaveDto() { Id = heartbeatData.SlaveId };
238        Login(slave);
239        slave = DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId);
240      }
241
242      slave.NrOfFreeCores = heartbeatData.FreeCores;
243      slave.FreeMemory = heartbeatData.FreeMemory;
244      slave.IsAllowedToCalculate = heartbeatData.IsAllowedToCalculate;
245      slave.LastHeartbeat = DateTime.Now;
246
247      slave.State = heartbeatData.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle;
248      return slave;
249    }
250
251    private List<MessageContainer> ProcessJobProgress(HeartBeatData heartbeatData) {
252      List<MessageContainer> actions = new List<MessageContainer>();
253
254      // find all the jobs in jobProgress which are not in the database -> they are not supposed to be calculated by this slave
255      //IEnumerable<Guid> jobsToAbort = GetJobsNotCalculatedByThisSlave(heartbeatData.SlaveId, heartbeatData.JobProgress.Keys);
256      //foreach (Guid jobId in jobsToAbort) {
257      //  Logger.Error("Job shall not be caculated by this slave or does not exist in DB: " + jobId);
258      //  actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
259      //  heartbeatData.JobProgress.Remove(jobId);
260      //}
261
262      // process all the remaining jobProgresses
263      foreach (var jobProgress in heartbeatData.JobProgress) {
264        JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
265        if (curJob == null) {
266          // job does not exist in db
267          Logger.Error("Job does not exist in DB: " + jobProgress.Key);
268          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
269        } else {
270          curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id);
271          Guid id = curJob.Slave != null ? curJob.Slave.Id : Guid.Empty;
272          if (curJob.Slave == null || curJob.Slave.Id != heartbeatData.SlaveId) {
273            // assigned slave does not match heartbeat
274            Logger.Error("The slave " + heartbeatData.SlaveId + " is not supposed to calculate Job: " + curJob.ToString());
275            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
276          } else {
277            // save job execution time
278            curJob.ExecutionTime = jobProgress.Value;
279            curJob.LastHeartbeat = DateTime.Now;
280
281            if (curJob.State == JobState.Aborted) {
282              // a request to abort the job has been set
283              Logger.Error("Job is in state aborted, send AbortJob: " + curJob.Id);
284              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
285              DaoLocator.JobDao.UnAssignSlaveToJob(curJob.Id);
286            } else if (curJob.State == JobState.SnapshotRequested) {
287              // a request for a snapshot has been set
288              actions.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
289              curJob.State = JobState.SnapshotSent;
290            }
291          }
292          DaoLocator.JobDao.Update(curJob);
293        }
294      }
295      return actions;
296    }
297
298    /// <summary>
299    /// Returns the jobIds of the jobs which are not assigned to this slave in the database (they either are not stored in DB or they are assigned
300    /// </summary>
301    private IEnumerable<Guid> GetJobsNotCalculatedByThisSlave(Guid slaveId, IEnumerable<Guid> jobIds) {
302      IEnumerable<Guid> activeJobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(slaveId)).Select(j => j.Id);
303      return jobIds.Except(activeJobsOfSlave).ToList();
304    }
305
306    /// <summary>
307    /// Process the Job progress sent by a slave
308    /// [chn] this method needs to be refactored, because its a performance hog
309    ///
310    /// what it does:
311    /// (1) find out if the jobs that should be calculated by this slave (from db) and compare if they are consistent with what the joblist the slave sent
312    /// (2) find out if every job from the joblist really should be calculated by this slave
313    /// (3) checks if a job should be aborted and issues Message
314    /// (4) update job-progress and write to db
315    /// (5) if snapshot is requested, issue Message
316    ///
317    /// (6) for each job from DB, check if there is a job from slave (again).
318    /// (7) if job matches, it is removed from newAssigneJobs
319    /// (8) if job !matches, job's TTL is reduced by 1,
320    /// (9) if TTL==0, job is set to Abort (save to DB), and Message to Abort job is issued to slave
321    ///
322    ///
323    ///
324    /// quirks:
325    /// (1) the response-object is modified during the foreach-loop (only last element counts)
326    /// (2) state Abort results in Finished. This should be: AbortRequested, Aborted.
327    /// </summary>
328    /// <param name="heartbeatData"></param>
329    /// <param name="response"></param>
330    //private void ProcessJobProgress(HeartBeatData heartbeatData, ResponseHeartBeat response) {
331    //  Logger.Debug("Started for Slave " + heartbeatData.SlaveId);
332    //  List<JobDto> jobsOfSlave = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId)));
333    //  if (heartbeatData.JobProgress != null && heartbeatData.JobProgress.Count > 0) {
334    //    if (jobsOfSlave == null || jobsOfSlave.Count == 0) {
335    //      foreach (Guid jobId in heartbeatData.JobProgress.Keys) {
336    //        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
337    //      }
338
339    //      Logger.Error("There is no job calculated by this user " + heartbeatData.SlaveId + ", advise him to abort all");
340    //      return;
341    //    }
342
343    //    foreach (KeyValuePair<Guid, TimeSpan> jobProgress in heartbeatData.JobProgress) {
344    //      JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
345    //      if (curJob == null) {
346    //        response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
347    //        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
348    //        Logger.Error("Job does not exist in DB: " + jobProgress.Key);
349    //        return;
350    //      }
351    //      curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id);
352    //      if (curJob.Slave == null || curJob.Slave.Id != heartbeatData.SlaveId) {
353    //        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
354    //        Logger.Error("There is no job calculated by this user " + heartbeatData.SlaveId + " Job: " + curJob);
355    //      } else if (curJob.State == JobState.Aborted) {
356    //        // a request to abort the job has been set
357    //        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
358    //        curJob.State = JobState.Finished;
359    //      } else {
360    //        // save job progress
361    //        curJob.ExecutionTime = jobProgress.Value;
362
363    //        if (curJob.State == JobState.SnapshotRequested) {
364    //          // a request for a snapshot has been set
365    //          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
366    //          curJob.State = JobState.SnapshotSent;
367    //        }
368    //      }
369    //      DaoLocator.JobDao.Update(curJob);
370    //    }
371    //  }
372
373    //  foreach (JobDto currJob in jobsOfSlave) {
374    //    bool found = false;
375    //    if (heartbeatData.JobProgress != null) {
376    //      foreach (Guid jobId in heartbeatData.JobProgress.Keys) {
377    //        if (jobId == currJob.Id) {
378    //          found = true;
379    //          break;
380    //        }
381    //      }
382    //    }
383
384    //    if (!found) {
385    //      lock (newAssignedJobs) {
386    //        if (newAssignedJobs.ContainsKey(currJob.Id)) {
387    //          newAssignedJobs[currJob.Id]--;
388    //          Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);
389    //          if (newAssignedJobs[currJob.Id] <= 0) {
390    //            Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave);
391
392    //            currJob.State = JobState.Offline;
393    //            DaoLocator.JobDao.Update(currJob);
394
395    //            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
396
397    //            newAssignedJobs.Remove(currJob.Id);
398    //          }
399    //        } else {
400    //          Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
401    //          currJob.State = JobState.Offline;
402    //          DaoLocator.JobDao.Update(currJob);
403    //        }
404    //      } // lock
405    //    } else {
406    //      lock (newAssignedJobs) {
407    //        if (newAssignedJobs.ContainsKey(currJob.Id)) {
408    //          Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
409    //          newAssignedJobs.Remove(currJob.Id);
410    //        }
411    //      }
412    //    }
413    //  }
414    //}
415
416    /// <summary>
417    /// if the slave was told to pull a job he calls this method
418    /// the server selects a job and sends it to the slave
419    /// </summary>
420    /// <param name="slaveId"></param>
421    /// <returns></returns>
422    public ResponseObject<JobDto> GetJob(Guid slaveId) {
423      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
424
425      JobDto job2Calculate = scheduler.GetNextJobForSlave(slaveId);
426      if (job2Calculate != null) {
427        response.Obj = job2Calculate;
428        response.Obj.PluginsNeeded = DaoLocator.PluginInfoDao.GetPluginDependenciesForJob(response.Obj);
429
430        Logger.Info("Job pulled: " + job2Calculate + " for user " + slaveId);
431        job2Calculate.LastHeartbeat = DateTime.Now.AddSeconds(ApplicationConstants.JOB_TIME_TO_LIVE_SECONDS);
432        DaoLocator.JobDao.Update(job2Calculate);
433      } else {
434        //response.Success = false;
435        response.Obj = null;
436        response.StatusMessage = ResponseStatus.GetJob_NoJobsAvailable;
437        Logger.Info("No more Jobs left for " + slaveId);
438      }
439      return response;
440    }
441
442    public ResponseResultReceived ProcessJobResult(Stream stream, bool finished) {
443      Logger.Info("BEGIN Job received for Storage - main method:");
444
445      //Stream jobResultStream = null;
446      //Stream jobStream = null;
447
448      //try {
449      BinaryFormatter formatter = new BinaryFormatter();
450
451      JobResult result = (JobResult)formatter.Deserialize(stream);
452
453      //important - repeatable read isolation level is required here,
454      //otherwise race conditions could occur when writing the stream into the DB
455      //just removed TransactionIsolationLevel.RepeatableRead
456      //tx = session.BeginTransaction();
457
458      ResponseResultReceived response = ProcessJobResult(result.SlaveId, result.Id, new byte[] { }, result.ExecutionTime, result.Exception, finished);
459
460      if (response.StatusMessage == ResponseStatus.Ok) {
461        Logger.Debug("Trying to aquire WCF Job Stream");
462        //jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
463        //Logger.Debug("Job Stream Aquired");
464        byte[] buffer = new byte[3024];
465        List<byte> serializedJob = new List<byte>();
466        int read = 0;
467        int i = 0;
468        while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
469          for (int j = 0; j < read; j++) {
470            serializedJob.Add(buffer[j]);
471          }
472          if (i % 100 == 0)
473            Logger.Debug("Writing to stream: " + i);
474          //jobStream.Write(buffer, 0, read);
475          i++;
476        }
477        Logger.Debug("Done Writing, closing the stream!");
478        //jobStream.Close();
479
480        if (serializedJob.Count > 0) {
481          DaoLocator.JobDao.SetBinaryJobFile(result.Id, serializedJob.ToArray());
482        }
483      }
484      Logger.Info("END Job received for Storage:");
485      stream.Dispose();
486      return response;
487    }
488
489    private ResponseResultReceived ProcessJobResult(Guid slaveId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
490      Logger.Info("BEGIN Job received for Storage - SUB method: " + jobId);
491
492      ResponseResultReceived response = new ResponseResultReceived();
493      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
494
495      SerializedJob job = new SerializedJob();
496
497      if (job != null) {
498        job.JobInfo = DaoLocator.JobDao.FindById(jobId);
499        if (job.JobInfo != null) {
500          job.JobInfo.Slave = job.JobInfo.Slave = DaoLocator.SlaveDao.GetSlaveForJob(jobId);
501        }
502      }
503
504      if (job != null && job.JobInfo == null) {
505        response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
506        response.JobId = jobId;
507        Logger.Error("No job with Id " + jobId);
508
509        //tx.Rollback();
510        return response;
511      }
512      if (job.JobInfo.State == JobState.Aborted) {
513        response.StatusMessage = ResponseStatus.ProcessJobResult_JobAborted;
514
515        Logger.Error("Job was aborted! " + job.JobInfo);
516
517        //tx.Rollback();
518        return response;
519      }
520      if (job.JobInfo.Slave == null) {
521        response.StatusMessage = ResponseStatus.ProcessJobResult_JobIsNotBeeingCalculated;
522        response.JobId = jobId;
523
524        Logger.Error("Job is not being calculated (slave = null)! " + job.JobInfo);
525
526        //tx.Rollback();
527        return response;
528      }
529      if (job.JobInfo.Slave.Id != slaveId) {
530        //response.Success = false;
531        response.StatusMessage = ResponseStatus.ProcessJobResult_WrongSlaveForJob;
532        response.JobId = jobId;
533
534        Logger.Error("Wrong Slave for this Job! " + job.JobInfo + ", Sending Slave is: " + slaveId);
535
536        //tx.Rollback();
537        return response;
538      }
539      if (job.JobInfo.State == JobState.Finished) {
540        response.StatusMessage = ResponseStatus.Ok;
541        response.JobId = jobId;
542
543        Logger.Error("Job already finished! " + job.JobInfo + ", Sending Slave is: " + slaveId);
544
545        //tx.Rollback();
546        return response;
547      }
548      //Todo: RequestsnapshotSent => calculating?
549      if (job.JobInfo.State == JobState.SnapshotSent) {
550        job.JobInfo.State = JobState.Calculating;
551      }
552      if (job.JobInfo.State != JobState.Calculating && job.JobInfo.State != JobState.Pending) {
553        response.StatusMessage = ResponseStatus.ProcessJobResult_InvalidJobState;
554        response.JobId = jobId;
555
556        Logger.Error("Wrong Job State, job is: " + job.JobInfo);
557
558        //tx.Rollback();
559        return response;
560      }
561      job.JobInfo.ExecutionTime = executionTime;
562
563      if (!string.IsNullOrEmpty(exception)) {
564        job.JobInfo.State = JobState.Failed;
565        job.JobInfo.Exception = exception;
566        job.JobInfo.DateFinished = DateTime.Now;
567      } else if (finished) {
568        job.JobInfo.State = JobState.Finished;
569        job.JobInfo.DateFinished = DateTime.Now;
570      }
571
572      job.SerializedJobData = result;
573
574      DaoLocator.JobDao.Update(job.JobInfo);
575
576      response.StatusMessage = ResponseStatus.Ok;
577      response.JobId = jobId;
578      response.Finished = finished;
579
580      Logger.Info("END Job received for Storage - SUB method: " + jobId);
581      return response;
582
583    }
584
585    /// <summary>
586    /// the slave can send job results during calculating
587    /// and will send a final job result when he finished calculating
588    /// these job results will be stored in the database
589    /// </summary>
590    public ResponseResultReceived StoreFinishedJobResult(Guid slaveId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
591      return ProcessJobResult(slaveId, jobId, result, executionTime, exception, true);
592    }
593
594    public ResponseResultReceived ProcessSnapshot(Guid slaveId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
595      return ProcessJobResult(slaveId, jobId, result, executionTime, exception, false);
596    }
597
598    /// <summary>
599    /// when a slave logs out the state will be set
600    /// and the entry in the last hearbeats dictionary will be removed
601    /// </summary>
602    /// <param name="slaveId"></param>
603    /// <returns></returns>                       
604    public Response Logout(Guid slaveId) {
605      Logger.Info("Slave logged out " + slaveId);
606
607      Response response = new Response();
608
609      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
610      if (slave == null) {
611        response.StatusMessage = ResponseStatus.Logout_SlaveNotRegistered;
612        return response;
613      }
614      if (slave.State == SlaveState.Calculating) {
615        // check wich job the slave was calculating and reset it
616        IEnumerable<JobDto> jobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(slave);
617        foreach (JobDto job in jobsOfSlave) {
618          if (job.State != JobState.Finished)
619            DaoLocator.JobDao.SetJobOffline(job);
620        }
621      }
622
623      slave.State = SlaveState.Offline;
624      DaoLocator.SlaveDao.Update(slave);
625
626      return response;
627    }
628
629    /// <summary>
630    /// If a slave goes offline and restores a job he was calculating
631    /// he can ask the slave if he still needs the job result
632    /// </summary>
633    /// <param name="jobId"></param>
634    /// <returns></returns>
635    /// TODO: Just reset LastHeartbeat timestamp in this method
636    public Response IsJobStillNeeded(Guid jobId) {
637      Response response = new Response();
638      JobDto job = DaoLocator.JobDao.FindById(jobId);
639      if (job == null) {
640        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobDoesNotExist;
641        Logger.Error("Job doesn't exist (anymore)! " + jobId);
642        return response;
643      }
644      if (job.State == JobState.Finished) {
645        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobAlreadyFinished;
646        Logger.Error("already finished! " + job);
647        return response;
648      }
649      job.State = JobState.Pending;
650      //lock (lifecycleManager.PendingJobs) {
651      //  lifecycleManager.PendingJobs.Add(job.Id, PENDING_TIMEOUT);
652      //}
653
654      DaoLocator.JobDao.Update(job);
655
656      return response;
657    }
658
659    public ResponseList<CachedHivePluginInfoDto> GetPlugins(List<HivePluginInfoDto> pluginList) {
660      ResponseList<CachedHivePluginInfoDto> response = new ResponseList<CachedHivePluginInfoDto>();
661      response.List = new List<CachedHivePluginInfoDto>();
662      foreach (HivePluginInfoDto pluginInfo in pluginList) {
663        IPluginDescription ipd = GetAvailablePlugins().Where(pd =>
664          pd.Name == pluginInfo.Name &&
665          pd.Version.Major == pluginInfo.Version.Major &&
666          pd.Version.Minor == pluginInfo.Version.Minor &&
667          pd.Version.Build >= pluginInfo.Version.Build).SingleOrDefault(); // ignore revision here, otherwise each client who uses a more recent trunk-build than the server-build will not be able to execute the job
668        if (ipd != null) {
669          response.List.Add(ConvertPluginDescriptorToDto(ipd));
670        }
671      }
672      return response;
673    }
674
675    private IEnumerable<PluginDescription> GetAvailablePlugins() {
676      return lifecycleManager.Plugins;
677    }
678
679    private CachedHivePluginInfoDto ConvertPluginDescriptorToDto(IPluginDescription currPlugin) {
680      CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
681        Name = currPlugin.Name,
682        Version = currPlugin.Version
683      };
684
685      foreach (string fileName in from file in currPlugin.Files select file.Name) {
686        currCachedPlugin.PluginFiles.Add(new HivePluginFile(File.ReadAllBytes(fileName), fileName));
687      }
688      return currCachedPlugin;
689    }
690
691    public ResponseObject<HivePluginFile> GetConfigurationFile() {
692      var response = new ResponseObject<HivePluginFile>();
693      response.Obj = new HivePluginFile(lifecycleManager.ConfigurationFile, "Sandbox.config");
694      return response;
695    }
696
697    #endregion
698  }
699}
Note: See TracBrowser for help on using the repository browser.