Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs @ 4772

Last change on this file since 4772 was 4772, checked in by cneumuel, 13 years ago

#1260

  • added LogServiceReader to display log for slave without writing to local files
  • aborted jobs with childjobs now got back to state WaitForChildJob (instead of Offline)
  • lifecyclemanager now knows about available plugins (does not yet work perfectly)
File size: 33.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Linq;
26using System.Runtime.Serialization.Formatters.Binary;
27using System.Threading;
28using System.Transactions;
29using HeuristicLab.Hive.Contracts;
30using HeuristicLab.Hive.Contracts.BusinessObjects;
31using HeuristicLab.Hive.Contracts.Interfaces;
32using HeuristicLab.Hive.Contracts.ResponseObjects;
33using HeuristicLab.Hive.Server.Core.InternalInterfaces;
34using HeuristicLab.PluginInfrastructure;
35using System.Diagnostics;
36using HeuristicLab.Hive.Tracing;
37using HeuristicLab.PluginInfrastructure.Manager;
38
39namespace HeuristicLab.Hive.Server.Core {
40  /// <summary>
41  /// The SlaveCommunicator manages the whole communication with the slave
42  /// </summary>
43  public class SlaveCommunicator : ISlaveCommunicator, IInternalSlaveCommunicator {
44    private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>();
45
46    /// <summary>
47    /// Contains a list job JobIds which have been sent to a slave, but the slave has not yet sent
48    /// a jobProgress of the job with a heartbeat, because he has not finished downloading/deserializing it.
49    /// The number value indicates a TimeToLive count that is decremented with each server-heartbeat.
50    /// When the number reaches zero, the jobs is assumed to be lost and is set Offline again.
51    /// </summary>
52    private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>();
53
54    /// <summary>
55    /// When a slave reconnects and he has finished results waiting it calls IsJobStillNeeded. If the finished
56    /// result has not yet been collected from anywhere else, the job will be sent by the slave and the job state is set to Pending.
57    /// Now the job be in pending state until it is received from the reconnected slave or the TimeToLive value of this dictionary has reached zero.
58    /// </summary>
59    private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>();
60    private static int PENDING_TIMEOUT = 100;
61
62    private static ReaderWriterLockSlim heartbeatLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
63
64    private ILifecycleManager lifecycleManager;
65    private IInternalJobManager jobManager;
66    private IScheduler scheduler;
67
68    private static object slaveLocker = new object();
69
70    /// <summary>
71    /// Initialization of the Adapters to the database
72    /// Initialization of Eventhandler for the lifecycle management
73    /// Initialization of lastHearbeats Dictionary
74    /// </summary>
75    public SlaveCommunicator() {
76      lifecycleManager = ServiceLocator.GetLifecycleManager();
77      jobManager = ServiceLocator.GetJobManager() as IInternalJobManager;
78      scheduler = ServiceLocator.GetScheduler();
79
80      lifecycleManager.ServerHeartbeat += new EventHandler(lifecycleManager_OnServerHeartbeat);
81    }
82
83    /// <summary>
84    /// Check if online slaves send their hearbeats
85    /// if not -> set them offline and check if they where calculating a job
86    /// </summary>
87    /// <param name="sender"></param>
88    /// <param name="e"></param>
89    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
90      // this block can conflict with the heartbeats from a slave (which also updates the slave-records)
91      lock (slaveLocker) {
92        Logger.Debug("Server Heartbeat ticked");
93        List<Guid> slaveIds = DaoLocator.SlaveDao.FindAll().Select(s => s.Id).ToList();
94        foreach (Guid slaveId in slaveIds) {
95          using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
96            heartbeatLock.EnterUpgradeableReadLock();
97            SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
98
99            if (!lastHeartbeats.ContainsKey(slave.Id)) {
100              Logger.Info("No previous hearbeats are available for " + slave.Id + " although it is in state " + slave.State);
101
102              // add a heartbeat NOW and give the slave time to say something for HEARTBEAT_MAX_DIF
103              // otherwise alls the slaves jobs would be aborted, which is not desirable if the server has just been restarted
104              heartbeatLock.EnterWriteLock();
105              lastHeartbeats.Add(slave.Id, DateTime.Now);
106              heartbeatLock.ExitWriteLock();
107            } else {
108              DateTime lastHeartbeatOfSlave = lastHeartbeats[slave.Id];
109
110              TimeSpan diff = DateTime.Now.Subtract(lastHeartbeatOfSlave);
111              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
112              if (diff.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
113                // if slave calculated jobs, the job must be reset
114                Logger.Info("Slave timed out and is on RESET");
115                foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(slave)) {
116                  DaoLocator.JobDao.SetJobOffline(job);
117                  lock (newAssignedJobs) {
118                    if (newAssignedJobs.ContainsKey(job.Id))
119                      newAssignedJobs.Remove(job.Id);
120                  }
121                }
122                Logger.Debug("setting slave offline");
123                slave.State = SlaveState.Offline;
124
125                DaoLocator.SlaveDao.Update(slave);
126
127                Logger.Debug("removing it from the heartbeats list");
128                heartbeatLock.EnterWriteLock();
129                lastHeartbeats.Remove(slave.Id);
130                heartbeatLock.ExitWriteLock();
131              }
132            }
133            heartbeatLock.ExitUpgradeableReadLock();
134            scope.Complete();
135          } // using
136        } // foreach
137
138        using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
139          CheckForPendingJobs();
140          scope.Complete();
141        }
142        Logger.Debug("Server Heartbeat ended");
143      } // lock
144    }
145
146    private void CheckForPendingJobs() {
147      IList<JobDto> pendingJobsInDb = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(JobState.Pending));
148
149      foreach (JobDto currJob in pendingJobsInDb) {
150        lock (pendingJobs) {
151          if (pendingJobs.ContainsKey(currJob.Id)) {
152            if (pendingJobs[currJob.Id] <= 0) {
153              currJob.State = JobState.Offline;
154              DaoLocator.JobDao.Update(currJob);
155            } else {
156              pendingJobs[currJob.Id]--;
157            }
158          }
159        }
160      }
161    }
162
163    #region ISlaveCommunicator Members
164
165    /// <summary>
166    /// Login process for the slave
167    /// A hearbeat entry is created as well (login is the first hearbeat)
168    /// </summary>
169    /// <param name="slaveInfo"></param>
170    /// <returns></returns>
171    public Response Login(SlaveDto slave) {
172      Response response = new Response();
173
174      heartbeatLock.EnterWriteLock();
175      if (lastHeartbeats.ContainsKey(slave.Id)) {
176        lastHeartbeats[slave.Id] = DateTime.Now;
177      } else {
178        lastHeartbeats.Add(slave.Id, DateTime.Now);
179      }
180      heartbeatLock.ExitWriteLock();
181
182      SlaveDto dbSlave = DaoLocator.SlaveDao.FindById(slave.Id);
183
184      slave.CalendarSyncStatus = dbSlave != null ? dbSlave.CalendarSyncStatus : CalendarState.NotAllowedToFetch;
185      slave.State = SlaveState.Idle;
186      slave.Login = DateTime.Now;
187
188      if (dbSlave == null) {
189        DaoLocator.SlaveDao.Insert(slave);
190      } else {
191        DaoLocator.SlaveDao.Update(slave);
192      }
193
194      return response;
195    }
196
197    public ResponseCalendar GetCalendar(Guid slaveId) {
198      ResponseCalendar response = new ResponseCalendar();
199
200      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
201      if (slave == null) {
202        response.StatusMessage = ResponseStatus.GetCalendar_ResourceDoesNotExist;
203        return response;
204      }
205
206      response.ForceFetch = (slave.CalendarSyncStatus == CalendarState.ForceFetch);
207
208      IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForSlave(slave);
209      if (appointments.Count() == 0) {
210        response.StatusMessage = ResponseStatus.GetCalendar_NoCalendarFound;
211      } else {
212        response.Appointments = appointments;
213      }
214
215      slave.CalendarSyncStatus = CalendarState.Fetched;
216      DaoLocator.SlaveDao.Update(slave);
217      return response;
218    }
219
220    public Response SetCalendarStatus(Guid slaveId, CalendarState state) {
221      Response response = new Response();
222      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
223      if (slave == null) {
224        response.StatusMessage = ResponseStatus.GetCalendar_ResourceDoesNotExist;
225        return response;
226      }
227
228      slave.CalendarSyncStatus = state;
229      DaoLocator.SlaveDao.Update(slave);
230
231      return response;
232    }
233
234    /// <summary>
235    /// The slave has to send regulary heartbeats
236    /// this hearbeats will be stored in the heartbeats dictionary
237    /// check if there is work for the slave and send the slave a response if he should pull a job
238    /// </summary>
239    /// <param name="heartbeatData"></param>
240    /// <returns></returns>
241    public ResponseHeartBeat ProcessHeartBeat(HeartBeatData heartbeatData) {
242      lock (slaveLocker) {
243        Logger.Debug("BEGIN Processing Heartbeat for Slave " + heartbeatData.SlaveId);
244        ResponseHeartBeat response = new ResponseHeartBeat();
245        response.ActionRequest = new List<MessageContainer>();
246
247        using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
248          SlaveDto slave = UpdateSlaveData(heartbeatData);
249          SaveTimestamp(heartbeatData);
250
251          //ProcessJobProgress(heartbeatData, response);
252          response.ActionRequest = ProcessJobProgress(heartbeatData);
253
254          //check if new Cal must be loaded
255          if (slave.CalendarSyncStatus == CalendarState.Fetch || slave.CalendarSyncStatus == CalendarState.ForceFetch) {
256            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
257            Logger.Info("fetch or forcefetch sent");
258          }
259
260          // check if slave has a free core for a new job
261          // if true, ask scheduler for a new job for this slave
262          Logger.Debug(" BEGIN Looking for Slave Jobs");
263          if (this.IsAllowedToSendJobs() &&
264              slave.IsAllowedToCalculate &&
265              heartbeatData.FreeCores > 0 &&
266              scheduler.ExistsJobForSlave(heartbeatData)) {
267            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
268          } else {
269            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
270          }
271
272          DaoLocator.SlaveDao.Update(slave);
273          scope.Complete();
274        }
275        Logger.Debug(" END Processed Heartbeat for Slave " + heartbeatData.SlaveId);
276        return response;
277      }
278    }
279
280    /// <summary>
281    /// Returns true if there are enough resources to send a job
282    /// There should not be too many jobs sent simultaniously
283    /// </summary>
284    private bool IsAllowedToSendJobs() {
285      return lifecycleManager.JobsCurrentlyTransferring < ApplicationConstants.MAX_JOB_TRANSFER_COUNT;
286    }
287
288    private static void SaveTimestamp(HeartBeatData heartbeatData) {
289      heartbeatLock.EnterWriteLock();
290      if (lastHeartbeats.ContainsKey(heartbeatData.SlaveId)) {
291        lastHeartbeats[heartbeatData.SlaveId] = DateTime.Now;
292      } else {
293        lastHeartbeats.Add(heartbeatData.SlaveId, DateTime.Now);
294      }
295      heartbeatLock.ExitWriteLock();
296    }
297
298    private SlaveDto UpdateSlaveData(HeartBeatData heartbeatData) {
299      SlaveDto slave = DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId);
300      if (slave == null) {
301        slave = new SlaveDto() { Id = heartbeatData.SlaveId };
302        Login(slave);
303        slave = DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId);
304      }
305
306      slave.NrOfFreeCores = heartbeatData.FreeCores;
307      slave.FreeMemory = heartbeatData.FreeMemory;
308      slave.IsAllowedToCalculate = heartbeatData.IsAllowedToCalculate;
309
310      slave.State = heartbeatData.JobProgress.Count > 0 ? SlaveState.Calculating : SlaveState.Idle;
311      return slave;
312    }
313
314    private List<MessageContainer> ProcessJobProgress(HeartBeatData heartbeatData) {
315      List<MessageContainer> actions = new List<MessageContainer>();
316
317      // find all the jobs in jobProgress which are not in the database -> they are not supposed to be calculated by this slave
318      IEnumerable<Guid> jobsToAbort = GetJobsNotInDatabase(heartbeatData.SlaveId, heartbeatData.JobProgress.Keys);
319      foreach (Guid jobId in jobsToAbort) {
320        actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
321        heartbeatData.JobProgress.Remove(jobId);
322      }
323
324      // process all the remaining jobProgresses
325      foreach (var jobProgress in heartbeatData.JobProgress) {
326        JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
327        if (curJob == null) {
328          // job does not exist in db
329          actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
330          Logger.Error("Job does not exist in DB: " + jobProgress.Key);
331        } else {
332          curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id);
333          if (curJob.Slave == null || curJob.Slave.Id != heartbeatData.SlaveId) {
334            // assigned slave does not match heartbeat
335            actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
336            Logger.Error("The slave " + heartbeatData.SlaveId + " is not supposed to calculate Job: " + curJob);
337          } else {
338            // save job execution time
339            curJob.ExecutionTime = jobProgress.Value;
340
341            if (curJob.State == JobState.Aborted) {
342              // a request to abort the job has been set
343              actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
344            } else if (curJob.State == JobState.SnapshotRequested) {
345              // a request for a snapshot has been set
346              actions.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
347              curJob.State = JobState.SnapshotSent;
348            }
349          }
350        }
351        DaoLocator.JobDao.Update(curJob);
352      }
353
354      var jobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId));
355      foreach (JobDto currJob in jobsOfSlave) {
356        if (heartbeatData.JobProgress.ContainsKey(currJob.Id)) {
357          lock (newAssignedJobs) {
358            if (newAssignedJobs.ContainsKey(currJob.Id)) {
359              Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
360              newAssignedJobs.Remove(currJob.Id);
361            }
362          }
363        } else {
364          lock (newAssignedJobs) {
365            if (newAssignedJobs.ContainsKey(currJob.Id)) {
366              newAssignedJobs[currJob.Id]--;
367              Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);
368              if (newAssignedJobs[currJob.Id] <= 0) {
369                Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave);
370
371                currJob.State = JobState.Offline;
372                DaoLocator.JobDao.Update(currJob);
373
374                actions.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
375
376                newAssignedJobs.Remove(currJob.Id);
377              }
378            } else {
379              Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
380              currJob.State = JobState.Offline;
381              DaoLocator.JobDao.Update(currJob);
382            }
383          } // lock
384        }
385      }
386      return actions;
387    }
388
389    /// <summary>
390    /// Returns the jobIds of the jobs which are not assigned to this slave in the database
391    /// </summary>
392    private IEnumerable<Guid> GetJobsNotInDatabase(Guid slaveId, IEnumerable<Guid> jobIds) {
393      IEnumerable<Guid> activeJobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(slaveId)).Select(j => j.Id);
394      return jobIds.Except(activeJobsOfSlave).ToList();
395    }
396
397    /// <summary>
398    /// Process the Job progress sent by a slave
399    /// [chn] this method needs to be refactored, because its a performance hog
400    ///
401    /// what it does:
402    /// (1) find out if the jobs that should be calculated by this slave (from db) and compare if they are consistent with what the joblist the slave sent
403    /// (2) find out if every job from the joblist really should be calculated by this slave
404    /// (3) checks if a job should be aborted and issues Message
405    /// (4) update job-progress and write to db
406    /// (5) if snapshot is requested, issue Message
407    ///
408    /// (6) for each job from DB, check if there is a job from slave (again).
409    /// (7) if job matches, it is removed from newAssigneJobs
410    /// (8) if job !matches, job's TTL is reduced by 1,
411    /// (9) if TTL==0, job is set to Abort (save to DB), and Message to Abort job is issued to slave
412    ///
413    ///
414    ///
415    /// quirks:
416    /// (1) the response-object is modified during the foreach-loop (only last element counts)
417    /// (2) state Abort results in Finished. This should be: AbortRequested, Aborted.
418    /// </summary>
419    /// <param name="heartbeatData"></param>
420    /// <param name="response"></param>
421    //private void ProcessJobProgress(HeartBeatData heartbeatData, ResponseHeartBeat response) {
422    //  Logger.Debug("Started for Slave " + heartbeatData.SlaveId);
423    //  List<JobDto> jobsOfSlave = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.SlaveDao.FindById(heartbeatData.SlaveId)));
424    //  if (heartbeatData.JobProgress != null && heartbeatData.JobProgress.Count > 0) {
425    //    if (jobsOfSlave == null || jobsOfSlave.Count == 0) {
426    //      foreach (Guid jobId in heartbeatData.JobProgress.Keys) {
427    //        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
428    //      }
429
430    //      Logger.Error("There is no job calculated by this user " + heartbeatData.SlaveId + ", advise him to abort all");
431    //      return;
432    //    }
433
434    //    foreach (KeyValuePair<Guid, TimeSpan> jobProgress in heartbeatData.JobProgress) {
435    //      JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
436    //      if (curJob == null) {
437    //        response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
438    //        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobProgress.Key));
439    //        Logger.Error("Job does not exist in DB: " + jobProgress.Key);
440    //        return;
441    //      }
442    //      curJob.Slave = DaoLocator.SlaveDao.GetSlaveForJob(curJob.Id);
443    //      if (curJob.Slave == null || curJob.Slave.Id != heartbeatData.SlaveId) {
444    //        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
445    //        Logger.Error("There is no job calculated by this user " + heartbeatData.SlaveId + " Job: " + curJob);
446    //      } else if (curJob.State == JobState.Aborted) {
447    //        // a request to abort the job has been set
448    //        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
449    //        curJob.State = JobState.Finished;
450    //      } else {
451    //        // save job progress
452    //        curJob.ExecutionTime = jobProgress.Value;
453
454    //        if (curJob.State == JobState.SnapshotRequested) {
455    //          // a request for a snapshot has been set
456    //          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
457    //          curJob.State = JobState.SnapshotSent;
458    //        }
459    //      }
460    //      DaoLocator.JobDao.Update(curJob);
461    //    }
462    //  }
463
464    //  foreach (JobDto currJob in jobsOfSlave) {
465    //    bool found = false;
466    //    if (heartbeatData.JobProgress != null) {
467    //      foreach (Guid jobId in heartbeatData.JobProgress.Keys) {
468    //        if (jobId == currJob.Id) {
469    //          found = true;
470    //          break;
471    //        }
472    //      }
473    //    }
474
475    //    if (!found) {
476    //      lock (newAssignedJobs) {
477    //        if (newAssignedJobs.ContainsKey(currJob.Id)) {
478    //          newAssignedJobs[currJob.Id]--;
479    //          Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Slave);
480    //          if (newAssignedJobs[currJob.Id] <= 0) {
481    //            Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Slave);
482
483    //            currJob.State = JobState.Offline;
484    //            DaoLocator.JobDao.Update(currJob);
485
486    //            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
487
488    //            newAssignedJobs.Remove(currJob.Id);
489    //          }
490    //        } else {
491    //          Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
492    //          currJob.State = JobState.Offline;
493    //          DaoLocator.JobDao.Update(currJob);
494    //        }
495    //      } // lock
496    //    } else {
497    //      lock (newAssignedJobs) {
498    //        if (newAssignedJobs.ContainsKey(currJob.Id)) {
499    //          Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
500    //          newAssignedJobs.Remove(currJob.Id);
501    //        }
502    //      }
503    //    }
504    //  }
505    //}
506
507    /// <summary>
508    /// if the slave was told to pull a job he calls this method
509    /// the server selects a job and sends it to the slave
510    /// </summary>
511    /// <param name="slaveId"></param>
512    /// <returns></returns>
513    public ResponseObject<JobDto> GetJob(Guid slaveId) {
514      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
515
516      JobDto job2Calculate = scheduler.GetNextJobForSlave(slaveId);
517      if (job2Calculate != null) {
518        response.Obj = job2Calculate;
519        response.Obj.PluginsNeeded = DaoLocator.PluginInfoDao.GetPluginDependenciesForJob(response.Obj);
520
521        Logger.Info("Job pulled: " + job2Calculate + " for user " + slaveId);
522        lock (newAssignedJobs) {
523          if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
524            newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
525        }
526      } else {
527        //response.Success = false;
528        response.Obj = null;
529        response.StatusMessage = ResponseStatus.GetJob_NoJobsAvailable;
530        Logger.Info("No more Jobs left for " + slaveId);
531      }
532      return response;
533    }
534
535    public ResponseResultReceived ProcessJobResult(Stream stream, bool finished) {
536      Logger.Info("BEGIN Job received for Storage - main method:");
537
538      //Stream jobResultStream = null;
539      //Stream jobStream = null;
540
541      //try {
542      BinaryFormatter formatter = new BinaryFormatter();
543
544      JobResult result = (JobResult)formatter.Deserialize(stream);
545
546      //important - repeatable read isolation level is required here,
547      //otherwise race conditions could occur when writing the stream into the DB
548      //just removed TransactionIsolationLevel.RepeatableRead
549      //tx = session.BeginTransaction();
550
551      ResponseResultReceived response = ProcessJobResult(result.SlaveId, result.Id, new byte[] { }, result.ExecutionTime, result.Exception, finished);
552
553      if (response.StatusMessage == ResponseStatus.Ok) {
554        Logger.Debug("Trying to aquire WCF Job Stream");
555        //jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
556        //Logger.Debug("Job Stream Aquired");
557        byte[] buffer = new byte[3024];
558        List<byte> serializedJob = new List<byte>();
559        int read = 0;
560        int i = 0;
561        while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
562          for (int j = 0; j < read; j++) {
563            serializedJob.Add(buffer[j]);
564          }
565          if (i % 100 == 0)
566            Logger.Debug("Writing to stream: " + i);
567          //jobStream.Write(buffer, 0, read);
568          i++;
569        }
570        Logger.Debug("Done Writing, closing the stream!");
571        //jobStream.Close();
572
573        DaoLocator.JobDao.SetBinaryJobFile(result.Id, serializedJob.ToArray());
574      }
575      Logger.Info("END Job received for Storage:");
576      stream.Dispose();
577      return response;
578    }
579
580    private ResponseResultReceived ProcessJobResult(Guid slaveId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
581      Logger.Info("BEGIN Job received for Storage - SUB method: " + jobId);
582
583      ResponseResultReceived response = new ResponseResultReceived();
584      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
585
586      SerializedJob job = new SerializedJob();
587
588      if (job != null) {
589        job.JobInfo = DaoLocator.JobDao.FindById(jobId);
590        if (job.JobInfo != null) {
591          job.JobInfo.Slave = job.JobInfo.Slave = DaoLocator.SlaveDao.GetSlaveForJob(jobId);
592        }
593      }
594
595      if (job != null && job.JobInfo == null) {
596        response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
597        response.JobId = jobId;
598        Logger.Error("No job with Id " + jobId);
599
600        //tx.Rollback();
601        return response;
602      }
603      if (job.JobInfo.State == JobState.Aborted) {
604        response.StatusMessage = ResponseStatus.ProcessJobResult_JobAborted;
605
606        Logger.Error("Job was aborted! " + job.JobInfo);
607
608        //tx.Rollback();
609        return response;
610      }
611      if (job.JobInfo.Slave == null) {
612        response.StatusMessage = ResponseStatus.ProcessJobResult_JobIsNotBeeingCalculated;
613        response.JobId = jobId;
614
615        Logger.Error("Job is not being calculated (slave = null)! " + job.JobInfo);
616
617        //tx.Rollback();
618        return response;
619      }
620      if (job.JobInfo.Slave.Id != slaveId) {
621        //response.Success = false;
622        response.StatusMessage = ResponseStatus.ProcessJobResult_WrongSlaveForJob;
623        response.JobId = jobId;
624
625        Logger.Error("Wrong Slave for this Job! " + job.JobInfo + ", Sending Slave is: " + slaveId);
626
627        //tx.Rollback();
628        return response;
629      }
630      if (job.JobInfo.State == JobState.Finished) {
631        response.StatusMessage = ResponseStatus.Ok;
632        response.JobId = jobId;
633
634        Logger.Error("Job already finished! " + job.JobInfo + ", Sending Slave is: " + slaveId);
635
636        //tx.Rollback();
637        return response;
638      }
639      //Todo: RequestsnapshotSent => calculating?
640      if (job.JobInfo.State == JobState.SnapshotSent) {
641        job.JobInfo.State = JobState.Calculating;
642      }
643      if (job.JobInfo.State != JobState.Calculating && job.JobInfo.State != JobState.Pending) {
644        response.StatusMessage = ResponseStatus.ProcessJobResult_InvalidJobState;
645        response.JobId = jobId;
646
647        Logger.Error("Wrong Job State, job is: " + job.JobInfo);
648
649        //tx.Rollback();
650        return response;
651      }
652      job.JobInfo.ExecutionTime = executionTime;
653
654      if (!string.IsNullOrEmpty(exception)) {
655        job.JobInfo.State = JobState.Failed;
656        job.JobInfo.Exception = exception;
657        job.JobInfo.DateFinished = DateTime.Now;
658      } else if (finished) {
659        job.JobInfo.State = JobState.Finished;
660        job.JobInfo.DateFinished = DateTime.Now;
661      }
662
663      job.SerializedJobData = result;
664
665      DaoLocator.JobDao.Update(job.JobInfo);
666
667      response.StatusMessage = ResponseStatus.Ok;
668      response.JobId = jobId;
669      response.Finished = finished;
670
671      Logger.Info("END Job received for Storage - SUB method: " + jobId);
672      return response;
673
674    }
675
676    /// <summary>
677    /// the slave can send job results during calculating
678    /// and will send a final job result when he finished calculating
679    /// these job results will be stored in the database
680    /// </summary>
681    public ResponseResultReceived StoreFinishedJobResult(Guid slaveId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
682      return ProcessJobResult(slaveId, jobId, result, executionTime, exception, true);
683    }
684
685    public ResponseResultReceived ProcessSnapshot(Guid slaveId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
686      return ProcessJobResult(slaveId, jobId, result, executionTime, exception, false);
687    }
688
689    /// <summary>
690    /// when a slave logs out the state will be set
691    /// and the entry in the last hearbeats dictionary will be removed
692    /// </summary>
693    /// <param name="slaveId"></param>
694    /// <returns></returns>                       
695    public Response Logout(Guid slaveId) {
696      Logger.Info("Slave logged out " + slaveId);
697
698      Response response = new Response();
699
700      heartbeatLock.EnterWriteLock();
701      if (lastHeartbeats.ContainsKey(slaveId))
702        lastHeartbeats.Remove(slaveId);
703      heartbeatLock.ExitWriteLock();
704
705      SlaveDto slave = DaoLocator.SlaveDao.FindById(slaveId);
706      if (slave == null) {
707        response.StatusMessage = ResponseStatus.Logout_SlaveNotRegistered;
708        return response;
709      }
710      if (slave.State == SlaveState.Calculating) {
711        // check wich job the slave was calculating and reset it
712        IEnumerable<JobDto> jobsOfSlave = DaoLocator.JobDao.FindActiveJobsOfSlave(slave);
713        foreach (JobDto job in jobsOfSlave) {
714          if (job.State != JobState.Finished)
715            DaoLocator.JobDao.SetJobOffline(job);
716        }
717      }
718
719      slave.State = SlaveState.Offline;
720      DaoLocator.SlaveDao.Update(slave);
721
722      return response;
723    }
724
725    /// <summary>
726    /// If a slave goes offline and restores a job he was calculating
727    /// he can ask the slave if he still needs the job result
728    /// </summary>
729    /// <param name="jobId"></param>
730    /// <returns></returns>
731    public Response IsJobStillNeeded(Guid jobId) {
732      Response response = new Response();
733      JobDto job = DaoLocator.JobDao.FindById(jobId);
734      if (job == null) {
735        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobDoesNotExist;
736        Logger.Error("Job doesn't exist (anymore)! " + jobId);
737        return response;
738      }
739      if (job.State == JobState.Finished) {
740        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobAlreadyFinished;
741        Logger.Error("already finished! " + job);
742        return response;
743      }
744      job.State = JobState.Pending;
745      lock (pendingJobs) {
746        pendingJobs.Add(job.Id, PENDING_TIMEOUT);
747      }
748
749      DaoLocator.JobDao.Update(job);
750
751      return response;
752    }
753
754    public ResponseList<CachedHivePluginInfoDto> GetPlugins(List<HivePluginInfoDto> pluginList) {
755      ResponseList<CachedHivePluginInfoDto> response = new ResponseList<CachedHivePluginInfoDto>();
756      response.List = new List<CachedHivePluginInfoDto>();
757      foreach (HivePluginInfoDto pluginInfo in pluginList) {
758        if (pluginInfo.Update) {
759          //check if there is a newer version   
760
761          IPluginDescription ipd = GetAvailablePlugins().Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision > pluginInfo.Version.Revision).SingleOrDefault();
762          if (ipd != null) {
763            response.List.Add(ConvertPluginDescriptorToDto(ipd));
764          }
765        } else {
766          IPluginDescription ipd = GetAvailablePlugins().Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision >= pluginInfo.Version.Revision).SingleOrDefault();
767          if (ipd != null) {
768            response.List.Add(ConvertPluginDescriptorToDto(ipd));
769          } else {
770            response.StatusMessage = ResponseStatus.GetPlugins_PluginsNotAvailable;
771            return response;
772          }
773        }
774      }
775      return response;
776    }
777
778    private IEnumerable<PluginDescription> GetAvailablePlugins() {
779      return lifecycleManager.Plugins;
780    }
781
782    private CachedHivePluginInfoDto ConvertPluginDescriptorToDto(IPluginDescription currPlugin) {
783      CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
784        Name = currPlugin.Name,
785        Version = currPlugin.Version
786      };
787
788      foreach (string fileName in from file in currPlugin.Files select file.Name) {
789        currCachedPlugin.PluginFiles.Add(new HivePluginFile(File.ReadAllBytes(fileName), fileName));
790      }
791      return currCachedPlugin;
792    }
793
794    #endregion
795  }
796}
Note: See TracBrowser for help on using the repository browser.