Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs @ 4263

Last change on this file since 4263 was 4263, checked in by cneumuel, 12 years ago

consolidated Response objects to use only StatusMessage with enums instead of strings.
removed Success property from Response. success is now represented by StatusMessage alone. (#1159)

File size: 29.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Linq;
26using System.Runtime.Serialization.Formatters.Binary;
27using System.Threading;
28using System.Transactions;
29using HeuristicLab.Hive.Contracts;
30using HeuristicLab.Hive.Contracts.BusinessObjects;
31using HeuristicLab.Hive.Contracts.Interfaces;
32using HeuristicLab.Hive.Server.Core.InternalInterfaces;
33using HeuristicLab.PluginInfrastructure;
34using HeuristicLab.Tracing;
35using HeuristicLab.Hive.Contracts.ResponseObjects;
36
37namespace HeuristicLab.Hive.Server.Core {
38  /// <summary>
39  /// The ClientCommunicator manages the whole communication with the client
40  /// </summary>
41  public class SlaveCommunicator : ISlaveCommunicator,
42    IInternalSlaveCommunicator {
43    private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>();
44    private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>();
45    private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>();
46
47    private static ReaderWriterLockSlim heartbeatLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
48
49    //private ISessionFactory factory;
50    private ILifecycleManager lifecycleManager;
51    private IInternalJobManager jobManager;
52    private IScheduler scheduler;
53
54    private static int PENDING_TIMEOUT = 100;
55
56    /// <summary>
57    /// Initialization of the Adapters to the database
58    /// Initialization of Eventhandler for the lifecycle management
59    /// Initialization of lastHearbeats Dictionary
60    /// </summary>
61    public SlaveCommunicator() {
62      //factory = ServiceLocator.GetSessionFactory();
63
64      lifecycleManager = ServiceLocator.GetLifecycleManager();
65      jobManager = ServiceLocator.GetJobManager() as IInternalJobManager;
66      scheduler = ServiceLocator.GetScheduler();
67
68      lifecycleManager.RegisterHeartbeat(new EventHandler(lifecycleManager_OnServerHeartbeat));
69    }
70
71    /// <summary>
72    /// Check if online clients send their hearbeats
73    /// if not -> set them offline and check if they where calculating a job
74    /// </summary>
75    /// <param name="sender"></param>
76    /// <param name="e"></param>
77    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
78      Logger.Debug("Server Heartbeat ticked");
79
80      // [chn] why is transaction management done here
81      using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
82        List<ClientDto> allClients = new List<ClientDto>(DaoLocator.ClientDao.FindAll());
83
84        foreach (ClientDto client in allClients) {
85          if (client.State != State.Offline && client.State != State.NullState) {
86            heartbeatLock.EnterUpgradeableReadLock();
87
88            if (!lastHeartbeats.ContainsKey(client.Id)) {
89              Logger.Info("Client " + client.Id +
90                              " wasn't offline but hasn't sent heartbeats - setting offline");
91              client.State = State.Offline;
92              DaoLocator.ClientDao.Update(client);
93              Logger.Info("Client " + client.Id +
94                              " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
95              foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(client)) {
96                //maybe implementa n additional Watchdog? Till then, just set them offline..
97                DaoLocator.JobDao.SetJobOffline(job);
98              }
99            } else {
100              DateTime lastHbOfClient = lastHeartbeats[client.Id];
101
102              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
103              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
104              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
105                // if client calculated jobs, the job must be reset
106                Logger.Info("Client timed out and is on RESET");
107                foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(client)) {
108                  DaoLocator.JobDao.SetJobOffline(job);
109                  lock (newAssignedJobs) {
110                    if (newAssignedJobs.ContainsKey(job.Id))
111                      newAssignedJobs.Remove(job.Id);
112                  }
113                }
114                Logger.Debug("setting client offline");
115                // client must be set offline
116                client.State = State.Offline;
117
118                //clientAdapter.Update(client);
119                DaoLocator.ClientDao.Update(client);
120
121                Logger.Debug("removing it from the heartbeats list");
122                heartbeatLock.EnterWriteLock();
123                lastHeartbeats.Remove(client.Id);
124                heartbeatLock.ExitWriteLock();
125              }
126            }
127
128            heartbeatLock.ExitUpgradeableReadLock();
129          } else {
130            //TODO: RLY neccesary?
131            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Shouldn't have offline or nullstate, has " + client.State);
132            heartbeatLock.EnterWriteLock();
133            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Resetting all his jobs");
134            if (lastHeartbeats.ContainsKey(client.Id))
135              lastHeartbeats.Remove(client.Id);
136            foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(client)) {
137              DaoLocator.JobDao.SetJobOffline(job);
138            }
139            heartbeatLock.ExitWriteLock();
140          }
141        }
142        CheckForPendingJobs();
143        //        DaoLocator.DestroyContext();
144        scope.Complete();
145      }
146    }
147
148    private void CheckForPendingJobs() {
149      IList<JobDto> pendingJobsInDB = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(State.Pending));
150
151      foreach (JobDto currJob in pendingJobsInDB) {
152        lock (pendingJobs) {
153          if (pendingJobs.ContainsKey(currJob.Id)) {
154            if (pendingJobs[currJob.Id] <= 0) {
155              currJob.State = State.Offline;
156              DaoLocator.JobDao.Update(currJob);
157            } else {
158              pendingJobs[currJob.Id]--;
159            }
160          }
161        }
162      }
163    }
164
165    #region IClientCommunicator Members
166
167    /// <summary>
168    /// Login process for the client
169    /// A hearbeat entry is created as well (login is the first hearbeat)
170    /// </summary>
171    /// <param name="slaveInfo"></param>
172    /// <returns></returns>
173    public Response Login(ClientDto slaveInfo) {
174      Response response = new Response();
175
176      heartbeatLock.EnterWriteLock();
177      if (lastHeartbeats.ContainsKey(slaveInfo.Id)) {
178        lastHeartbeats[slaveInfo.Id] = DateTime.Now;
179      } else {
180        lastHeartbeats.Add(slaveInfo.Id, DateTime.Now);
181      }
182      heartbeatLock.ExitWriteLock();
183
184      ClientDto dbClient = DaoLocator.ClientDao.FindById(slaveInfo.Id);
185
186      //Really set offline?
187      //Reconnect issues with the currently calculating jobs
188      slaveInfo.State = State.Idle;
189      slaveInfo.CalendarSyncStatus = dbClient != null ? dbClient.CalendarSyncStatus : CalendarState.NotAllowedToFetch;
190
191      if (dbClient == null)
192        DaoLocator.ClientDao.Insert(slaveInfo);
193      else
194        DaoLocator.ClientDao.Update(slaveInfo);
195
196      return response;
197    }
198
199    public ResponseCalendar GetCalendar(Guid clientId) {
200      ResponseCalendar response = new ResponseCalendar();
201
202      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
203      if (client == null) {
204        //response.Success = false;
205        response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
206        return response;
207      }
208
209      response.ForceFetch = (client.CalendarSyncStatus == CalendarState.ForceFetch);
210
211      IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForClient(client);
212      if (appointments.Count() == 0) {
213        response.StatusMessage = ResponseStatus.GetCalendar_NoCalendarFound;
214        //response.Success = false;
215      } else {
216        //response.Success = true;
217        response.Appointments = appointments;
218      }
219
220      client.CalendarSyncStatus = CalendarState.Fetched;
221      DaoLocator.ClientDao.Update(client);
222      return response;
223    }
224
225    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
226      Response response = new Response();
227      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
228      if (client == null) {
229        //response.Success = false;
230        response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
231        return response;
232      }
233
234      client.CalendarSyncStatus = state;
235      DaoLocator.ClientDao.Update(client);
236
237      return response;
238    }
239
240    /// <summary>
241    /// The client has to send regulary heartbeats
242    /// this hearbeats will be stored in the heartbeats dictionary
243    /// check if there is work for the client and send the client a response if he should pull a job
244    /// </summary>
245    /// <param name="hbData"></param>
246    /// <returns></returns>
247    public ResponseHeartBeat ProcessHeartBeat(HeartBeatData hbData) {
248      Logger.Debug("BEGIN Processing Heartbeat for Client " + hbData.SlaveId);
249
250      ResponseHeartBeat response = new ResponseHeartBeat();
251      response.ActionRequest = new List<MessageContainer>();
252
253      Logger.Debug("BEGIN Started Client Fetching");
254      ClientDto client = DaoLocator.ClientDao.FindById(hbData.SlaveId);
255      Logger.Debug("END Finished Client Fetching");
256      // check if the client is logged in
257      if (client.State == State.Offline || client.State == State.NullState) {
258        // response.Success = false;
259        response.StatusMessage = ResponseStatus.ProcessHeartBeat_UserNotLoggedIn;
260        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
261
262        Logger.Error("ProcessHeartBeat: Client state null or offline: " + client);
263
264        return response;
265      }
266
267      client.NrOfFreeCores = hbData.FreeCores;
268      client.FreeMemory = hbData.FreeMemory;
269
270      // save timestamp of this heartbeat
271      Logger.Debug("BEGIN Locking for Heartbeats");
272      heartbeatLock.EnterWriteLock();
273      Logger.Debug("END Locked for Heartbeats");
274      if (lastHeartbeats.ContainsKey(hbData.SlaveId)) {
275        lastHeartbeats[hbData.SlaveId] = DateTime.Now;
276      } else {
277        lastHeartbeats.Add(hbData.SlaveId, DateTime.Now);
278      }
279      heartbeatLock.ExitWriteLock();
280
281      Logger.Debug("BEGIN Processing Heartbeat Jobs");
282      ProcessJobProcess(hbData, response);
283      Logger.Debug("END Processed Heartbeat Jobs");
284
285      //check if new Cal must be loaded
286      if (client.CalendarSyncStatus == CalendarState.Fetch || client.CalendarSyncStatus == CalendarState.ForceFetch) {
287        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
288
289        //client.CalendarSyncStatus = CalendarState.Fetching;
290
291        Logger.Info("fetch or forcefetch sent");
292      }
293
294      // check if client has a free core for a new job
295      // if true, ask scheduler for a new job for this client
296      Logger.Debug(" BEGIN Looking for Client Jobs");
297      if (hbData.FreeCores > 0 && scheduler.ExistsJobForSlave(hbData)) {
298        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
299      } else {
300        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
301      }
302      Logger.Debug(" END Looked for Client Jobs");
303
304      DaoLocator.ClientDao.Update(client);
305
306      //tx.Commit();
307      Logger.Debug(" END Processed Heartbeat for Client " + hbData.SlaveId);
308      return response;
309    }
310
311    /// <summary>
312    /// Process the Job progress sent by a client
313    /// [chn] this method needs to be refactored, because its a performance hog
314    ///
315    /// what it does:
316    /// (1) find out if the jobs that should be calculated by this client (from db) and compare if they are consistent with what the joblist the client sent
317    /// (2) find out if every job from the joblist really should be calculated by this client
318    /// (3) checks if a job should be aborted and issues Message
319    /// (4) update job-progress and write to db
320    /// (5) if snapshot is requested, issue Message
321    ///
322    /// (6) for each job from DB, check if there is a job from client (again).
323    /// (7) if job matches, it is removed from newAssigneJobs
324    /// (8) if job !matches, job's TTL is reduced by 1,
325    /// (9) if TTL==0, job is set to Abort (save to DB), and Message to Abort job is issued to client
326    ///
327    ///
328    ///
329    /// quirks:
330    /// (1) the response-object is modified during the foreach-loop (only last element counts)
331    /// (2) state Abort results in Finished. This should be: AbortRequested, Aborted.
332    /// </summary>
333    /// <param name="hbData"></param>
334    /// <param name="jobAdapter"></param>
335    /// <param name="clientAdapter"></param>
336    /// <param name="response"></param>
337    private void ProcessJobProcess(HeartBeatData hbData, ResponseHeartBeat response) {
338      Logger.Debug("Started for Client " + hbData.SlaveId);
339      List<JobDto> jobsOfClient = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.ClientDao.FindById(hbData.SlaveId)));
340      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
341        if (jobsOfClient == null || jobsOfClient.Count == 0) {
342          //response.Success = false;
343          //response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
344
345          foreach (Guid jobId in hbData.JobProgress.Keys) {
346            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
347          }
348
349          Logger.Error("There is no job calculated by this user " + hbData.SlaveId + ", advise him to abort all");
350          return;
351        }
352
353        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
354          JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
355          curJob.Client = DaoLocator.ClientDao.GetClientForJob(curJob.Id);
356          if (curJob.Client == null || curJob.Client.Id != hbData.SlaveId) {
357            //response.Success = false;
358            //response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
359            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
360            Logger.Error("There is no job calculated by this user " + hbData.SlaveId + " Job: " + curJob);
361          } else if (curJob.State == State.Abort) {
362            // a request to abort the job has been set
363            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
364            curJob.State = State.Finished;
365          } else {
366            // save job progress
367            curJob.Percentage = jobProgress.Value;
368
369            if (curJob.State == State.RequestSnapshot) {
370              // a request for a snapshot has been set
371              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
372              curJob.State = State.RequestSnapshotSent;
373            }
374          }
375          DaoLocator.JobDao.Update(curJob);
376        }
377      }
378      foreach (JobDto currJob in jobsOfClient) {
379        bool found = false;
380        if (hbData.JobProgress != null) {
381          foreach (Guid jobId in hbData.JobProgress.Keys) {
382            if (jobId == currJob.Id) {
383              found = true;
384              break;
385            }
386          }
387        }
388        if (!found) {
389          lock (newAssignedJobs) {
390            if (newAssignedJobs.ContainsKey(currJob.Id)) {
391              newAssignedJobs[currJob.Id]--;
392              Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Client);
393              if (newAssignedJobs[currJob.Id] <= 0) {
394                Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Client);
395
396                currJob.State = State.Offline;
397                DaoLocator.JobDao.Update(currJob);
398
399                response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
400
401                newAssignedJobs.Remove(currJob.Id);
402              }
403            } else {
404              Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
405              currJob.State = State.Offline;
406              DaoLocator.JobDao.Update(currJob);
407            }
408          } // lock
409        } else {
410          lock (newAssignedJobs) {
411
412            if (newAssignedJobs.ContainsKey(currJob.Id)) {
413              Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
414              newAssignedJobs.Remove(currJob.Id);
415            }
416          }
417        }
418      }
419    }
420
421    /// <summary>
422    /// if the client was told to pull a job he calls this method
423    /// the server selects a job and sends it to the client
424    /// </summary>
425    /// <param name="clientId"></param>
426    /// <returns></returns>
427    public ResponseObject<JobDto> GetJob(Guid clientId) {
428      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
429
430      JobDto job2Calculate = scheduler.GetNextJobForSlave(clientId);
431      if (job2Calculate != null) {
432        response.Obj = job2Calculate;
433        response.Obj.PluginsNeeded = DaoLocator.PluginInfoDao.GetPluginDependenciesForJob(response.Obj);
434
435        Logger.Info("Job pulled: " + job2Calculate + " for user " + clientId);
436        lock (newAssignedJobs) {
437          if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
438            newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
439        }
440      } else {
441        //response.Success = false;
442        response.Obj = null;
443        response.StatusMessage = ResponseStatus.GetJob_NoJobsAvailable;
444        Logger.Info("No more Jobs left for " + clientId);
445      }
446      return response;
447    }
448
449    public ResponseResultReceived ProcessJobResult(Stream stream, bool finished) {
450      Logger.Info("BEGIN Job received for Storage - main method:");
451
452      //Stream jobResultStream = null;
453      //Stream jobStream = null;
454
455      //try {
456      BinaryFormatter formatter = new BinaryFormatter();
457
458      JobResult result = (JobResult)formatter.Deserialize(stream);
459
460      //important - repeatable read isolation level is required here,
461      //otherwise race conditions could occur when writing the stream into the DB
462      //just removed TransactionIsolationLevel.RepeatableRead
463      //tx = session.BeginTransaction();
464
465      ResponseResultReceived response = ProcessJobResult(result.ClientId, result.JobId, new byte[] { }, result.Percentage, result.Exception, finished);
466
467      if (response.StatusMessage == ResponseStatus.Ok) {
468        Logger.Debug("Trying to aquire WCF Job Stream");
469        //jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
470        //Logger.Debug("Job Stream Aquired");
471        byte[] buffer = new byte[3024];
472        List<byte> serializedJob = new List<byte>();
473        int read = 0;
474        int i = 0;
475        while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
476          for (int j = 0; j < read; j++) {
477            serializedJob.Add(buffer[j]);
478          }
479          if (i % 100 == 0)
480            Logger.Debug("Writing to stream: " + i);
481          //jobStream.Write(buffer, 0, read);
482          i++;
483        }
484        Logger.Debug("Done Writing, closing the stream!");
485        //jobStream.Close();
486
487        DaoLocator.JobDao.SetBinaryJobFile(result.JobId, serializedJob.ToArray());
488      }
489      Logger.Info("END Job received for Storage:");
490      stream.Dispose();
491      return response;
492    }
493
494    private ResponseResultReceived ProcessJobResult(Guid clientId, Guid jobId, byte[] result, double? percentage, string exception, bool finished) {
495      Logger.Info("BEGIN Job received for Storage - SUB method: " + jobId);
496
497      ResponseResultReceived response = new ResponseResultReceived();
498      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
499
500      SerializedJob job = new SerializedJob();
501
502      if (job != null) {
503        job.JobInfo = DaoLocator.JobDao.FindById(jobId);
504        if (job.JobInfo != null) {
505          job.JobInfo.Client = job.JobInfo.Client = DaoLocator.ClientDao.GetClientForJob(jobId);
506        }
507      }
508
509      if (job != null && job.JobInfo == null) {
510        //response.Success = false;
511        response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
512        response.JobId = jobId;
513        Logger.Error("No job with Id " + jobId);
514
515        //tx.Rollback();
516        return response;
517      }
518      if (job.JobInfo.State == State.Abort) {
519        //response.Success = false;
520        response.StatusMessage = ResponseStatus.ProcessJobResult_JobAborted;
521
522        Logger.Error("Job was aborted! " + job.JobInfo);
523
524        //tx.Rollback();
525        return response;
526      }
527      if (job.JobInfo.Client == null) {
528        //response.Success = false;
529        response.StatusMessage = ResponseStatus.ProcessJobResult_JobIsNotBeeingCalculated;
530        response.JobId = jobId;
531
532        Logger.Error("Job is not being calculated (client = null)! " + job.JobInfo);
533
534        //tx.Rollback();
535        return response;
536      }
537      if (job.JobInfo.Client.Id != clientId) {
538        //response.Success = false;
539        response.StatusMessage = ResponseStatus.ProcessJobResult_WrongClientForJob;
540        response.JobId = jobId;
541
542        Logger.Error("Wrong Client for this Job! " + job.JobInfo + ", Sending Client is: " + clientId);
543
544        //tx.Rollback();
545        return response;
546      }
547      if (job.JobInfo.State == State.Finished) {
548        response.StatusMessage = ResponseStatus.Ok;
549        response.JobId = jobId;
550
551        Logger.Error("Job already finished! " + job.JobInfo + ", Sending Client is: " + clientId);
552
553        //tx.Rollback();
554        return response;
555      }
556      //Todo: RequestsnapshotSent => calculating?
557      if (job.JobInfo.State == State.RequestSnapshotSent) {
558        job.JobInfo.State = State.Calculating;
559      }
560      if (job.JobInfo.State != State.Calculating && job.JobInfo.State != State.Pending) {
561        //response.Success = false;
562        response.StatusMessage = ResponseStatus.ProcessJobResult_InvalidJobState;
563        response.JobId = jobId;
564
565        Logger.Error("Wrong Job State, job is: " + job.JobInfo);
566
567        //tx.Rollback();
568        return response;
569      }
570      job.JobInfo.Percentage = percentage;
571
572      if (!string.IsNullOrEmpty(exception)) {
573        job.JobInfo.State = State.Failed;
574        job.JobInfo.Exception = exception;
575        job.JobInfo.DateFinished = DateTime.Now;
576      } else if (finished) {
577        job.JobInfo.State = State.Finished;
578        job.JobInfo.DateFinished = DateTime.Now;
579      }
580
581      job.SerializedJobData = result;
582
583      DaoLocator.JobDao.Update(job.JobInfo);
584
585      response.StatusMessage = ResponseStatus.Ok;
586      response.JobId = jobId;
587      response.Finished = finished;
588
589      Logger.Info("END Job received for Storage - SUB method: " + jobId);
590      return response;
591
592    }
593
594    /// <summary>
595    /// the client can send job results during calculating
596    /// and will send a final job result when he finished calculating
597    /// these job results will be stored in the database
598    /// </summary>
599    /// <param name="clientId"></param>
600    /// <param name="jobId"></param>
601    /// <param name="result"></param>
602    /// <param name="exception"></param>
603    /// <param name="finished"></param>
604    /// <returns></returns>
605    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
606      Guid jobId,
607      byte[] result,
608      double percentage,
609      string exception) {
610
611      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
612    }
613
614    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, string exception) {
615      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
616    }
617
618    /// <summary>
619    /// when a client logs out the state will be set
620    /// and the entry in the last hearbeats dictionary will be removed
621    /// </summary>
622    /// <param name="clientId"></param>
623    /// <returns></returns>                       
624    public Response Logout(Guid clientId) {
625      Logger.Info("Client logged out " + clientId);
626
627      Response response = new Response();
628
629      heartbeatLock.EnterWriteLock();
630      if (lastHeartbeats.ContainsKey(clientId))
631        lastHeartbeats.Remove(clientId);
632      heartbeatLock.ExitWriteLock();
633
634      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
635      if (client == null) {
636        //response.Success = false;
637        response.StatusMessage = ResponseStatus.Logout_SlaveNotRegistered;
638        return response;
639      }
640      if (client.State == State.Calculating) {
641        // check wich job the client was calculating and reset it
642        IEnumerable<JobDto> jobsOfClient = DaoLocator.JobDao.FindActiveJobsOfSlave(client);
643        foreach (JobDto job in jobsOfClient) {
644          if (job.State != State.Finished)
645            DaoLocator.JobDao.SetJobOffline(job);
646        }
647      }
648
649      client.State = State.Offline;
650      DaoLocator.ClientDao.Update(client);
651
652      return response;
653    }
654
655    /// <summary>
656    /// If a client goes offline and restores a job he was calculating
657    /// he can ask the client if he still needs the job result
658    /// </summary>
659    /// <param name="jobId"></param>
660    /// <returns></returns>
661    public Response IsJobStillNeeded(Guid jobId) {
662      Response response = new Response();
663      JobDto job = DaoLocator.JobDao.FindById(jobId);
664      if (job == null) {
665        //response.Success = false;
666        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobDoesNotExist;
667        Logger.Error("Job doesn't exist (anymore)! " + jobId);
668        return response;
669      }
670      if (job.State == State.Finished) {
671        //response.Success = true;
672        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobAlreadyFinished;
673        Logger.Error("already finished! " + job);
674        return response;
675      }
676      job.State = State.Pending;
677      lock (pendingJobs) {
678        pendingJobs.Add(job.Id, PENDING_TIMEOUT);
679      }
680
681      DaoLocator.JobDao.Update(job);
682
683      return response;
684    }
685
686    public ResponseList<CachedHivePluginInfoDto> GetPlugins(List<HivePluginInfoDto> pluginList) {
687      ResponseList<CachedHivePluginInfoDto> response = new ResponseList<CachedHivePluginInfoDto>();
688      response.List = new List<CachedHivePluginInfoDto>();
689      foreach (HivePluginInfoDto pluginInfo in pluginList) {
690        if (pluginInfo.Update) {
691          //check if there is a newer version         
692          IPluginDescription ipd = ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision > pluginInfo.Version.Revision).SingleOrDefault();
693          if (ipd != null) {
694            response.List.Add(ConvertPluginDescriptorToDto(ipd));
695          }
696        } else {
697          IPluginDescription ipd = ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision >= pluginInfo.Version.Revision).SingleOrDefault();
698          if (ipd != null) {
699            response.List.Add(ConvertPluginDescriptorToDto(ipd));
700          } else {
701            //response.Success = false;
702            response.StatusMessage = ResponseStatus.GetPlugins_PluginsNotAvailable;
703            return response;
704          }
705        }
706      }
707      return response;
708    }
709
710    private CachedHivePluginInfoDto ConvertPluginDescriptorToDto(IPluginDescription currPlugin) {
711      CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
712        Name = currPlugin.Name,
713        Version = currPlugin.Version
714      };
715
716      foreach (string fileName in from file in currPlugin.Files select file.Name) {
717        currCachedPlugin.PluginFiles.Add(new HivePluginFile(File.ReadAllBytes(fileName), fileName));
718      }
719      return currCachedPlugin;
720    }
721
722    #endregion
723  }
724}
Note: See TracBrowser for help on using the repository browser.