Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/SlaveCommunicator.cs @ 4264

Last change on this file since 4264 was 4264, checked in by cneumuel, 14 years ago

Split up "State" to "JobState" and "SlaveState" (#1159)

File size: 29.4 KB
RevLine 
[4253]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Linq;
26using System.Runtime.Serialization.Formatters.Binary;
27using System.Threading;
28using System.Transactions;
29using HeuristicLab.Hive.Contracts;
30using HeuristicLab.Hive.Contracts.BusinessObjects;
31using HeuristicLab.Hive.Contracts.Interfaces;
32using HeuristicLab.Hive.Server.Core.InternalInterfaces;
33using HeuristicLab.PluginInfrastructure;
34using HeuristicLab.Tracing;
[4263]35using HeuristicLab.Hive.Contracts.ResponseObjects;
[4253]36
37namespace HeuristicLab.Hive.Server.Core {
38  /// <summary>
39  /// The ClientCommunicator manages the whole communication with the client
40  /// </summary>
41  public class SlaveCommunicator : ISlaveCommunicator,
42    IInternalSlaveCommunicator {
43    private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>();
44    private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>();
45    private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>();
46
47    private static ReaderWriterLockSlim heartbeatLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
48
49    //private ISessionFactory factory;
50    private ILifecycleManager lifecycleManager;
51    private IInternalJobManager jobManager;
52    private IScheduler scheduler;
53
54    private static int PENDING_TIMEOUT = 100;
55
56    /// <summary>
57    /// Initialization of the Adapters to the database
58    /// Initialization of Eventhandler for the lifecycle management
59    /// Initialization of lastHearbeats Dictionary
60    /// </summary>
61    public SlaveCommunicator() {
62      //factory = ServiceLocator.GetSessionFactory();
63
64      lifecycleManager = ServiceLocator.GetLifecycleManager();
65      jobManager = ServiceLocator.GetJobManager() as IInternalJobManager;
66      scheduler = ServiceLocator.GetScheduler();
67
68      lifecycleManager.RegisterHeartbeat(new EventHandler(lifecycleManager_OnServerHeartbeat));
69    }
70
71    /// <summary>
72    /// Check if online clients send their hearbeats
73    /// if not -> set them offline and check if they where calculating a job
74    /// </summary>
75    /// <param name="sender"></param>
76    /// <param name="e"></param>
77    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
78      Logger.Debug("Server Heartbeat ticked");
79
80      // [chn] why is transaction management done here
81      using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
82        List<ClientDto> allClients = new List<ClientDto>(DaoLocator.ClientDao.FindAll());
83
84        foreach (ClientDto client in allClients) {
[4264]85          if (client.State != SlaveState.Offline && client.State != SlaveState.NullState) {
[4253]86            heartbeatLock.EnterUpgradeableReadLock();
87
88            if (!lastHeartbeats.ContainsKey(client.Id)) {
89              Logger.Info("Client " + client.Id +
90                              " wasn't offline but hasn't sent heartbeats - setting offline");
[4264]91              client.State = SlaveState.Offline;
[4253]92              DaoLocator.ClientDao.Update(client);
93              Logger.Info("Client " + client.Id +
94                              " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
95              foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(client)) {
96                //maybe implementa n additional Watchdog? Till then, just set them offline..
97                DaoLocator.JobDao.SetJobOffline(job);
98              }
99            } else {
100              DateTime lastHbOfClient = lastHeartbeats[client.Id];
101
102              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
103              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
104              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
105                // if client calculated jobs, the job must be reset
106                Logger.Info("Client timed out and is on RESET");
107                foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(client)) {
108                  DaoLocator.JobDao.SetJobOffline(job);
109                  lock (newAssignedJobs) {
110                    if (newAssignedJobs.ContainsKey(job.Id))
111                      newAssignedJobs.Remove(job.Id);
112                  }
113                }
114                Logger.Debug("setting client offline");
115                // client must be set offline
[4264]116                client.State = SlaveState.Offline;
[4253]117
118                //clientAdapter.Update(client);
119                DaoLocator.ClientDao.Update(client);
120
121                Logger.Debug("removing it from the heartbeats list");
122                heartbeatLock.EnterWriteLock();
123                lastHeartbeats.Remove(client.Id);
124                heartbeatLock.ExitWriteLock();
125              }
126            }
127
128            heartbeatLock.ExitUpgradeableReadLock();
129          } else {
130            //TODO: RLY neccesary?
131            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Shouldn't have offline or nullstate, has " + client.State);
132            heartbeatLock.EnterWriteLock();
133            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Resetting all his jobs");
134            if (lastHeartbeats.ContainsKey(client.Id))
135              lastHeartbeats.Remove(client.Id);
136            foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(client)) {
137              DaoLocator.JobDao.SetJobOffline(job);
138            }
139            heartbeatLock.ExitWriteLock();
140          }
141        }
142        CheckForPendingJobs();
143        //        DaoLocator.DestroyContext();
144        scope.Complete();
145      }
146    }
147
148    private void CheckForPendingJobs() {
[4264]149      IList<JobDto> pendingJobsInDB = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(JobState.Pending));
[4253]150
151      foreach (JobDto currJob in pendingJobsInDB) {
152        lock (pendingJobs) {
153          if (pendingJobs.ContainsKey(currJob.Id)) {
154            if (pendingJobs[currJob.Id] <= 0) {
[4264]155              currJob.State = JobState.Offline;
[4253]156              DaoLocator.JobDao.Update(currJob);
157            } else {
158              pendingJobs[currJob.Id]--;
159            }
160          }
161        }
162      }
163    }
164
165    #region IClientCommunicator Members
166
167    /// <summary>
168    /// Login process for the client
169    /// A hearbeat entry is created as well (login is the first hearbeat)
170    /// </summary>
[4263]171    /// <param name="slaveInfo"></param>
[4253]172    /// <returns></returns>
[4263]173    public Response Login(ClientDto slaveInfo) {
[4253]174      Response response = new Response();
175
176      heartbeatLock.EnterWriteLock();
[4263]177      if (lastHeartbeats.ContainsKey(slaveInfo.Id)) {
178        lastHeartbeats[slaveInfo.Id] = DateTime.Now;
[4253]179      } else {
[4263]180        lastHeartbeats.Add(slaveInfo.Id, DateTime.Now);
[4253]181      }
182      heartbeatLock.ExitWriteLock();
183
[4263]184      ClientDto dbClient = DaoLocator.ClientDao.FindById(slaveInfo.Id);
[4253]185
186      //Really set offline?
187      //Reconnect issues with the currently calculating jobs
[4264]188      slaveInfo.State = SlaveState.Idle;
[4263]189      slaveInfo.CalendarSyncStatus = dbClient != null ? dbClient.CalendarSyncStatus : CalendarState.NotAllowedToFetch;
[4253]190
191      if (dbClient == null)
[4263]192        DaoLocator.ClientDao.Insert(slaveInfo);
[4253]193      else
[4263]194        DaoLocator.ClientDao.Update(slaveInfo);
195
[4253]196      return response;
197    }
198
199    public ResponseCalendar GetCalendar(Guid clientId) {
200      ResponseCalendar response = new ResponseCalendar();
201
202      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
203      if (client == null) {
[4263]204        //response.Success = false;
205        response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
[4253]206        return response;
207      }
208
209      response.ForceFetch = (client.CalendarSyncStatus == CalendarState.ForceFetch);
210
211      IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForClient(client);
212      if (appointments.Count() == 0) {
[4263]213        response.StatusMessage = ResponseStatus.GetCalendar_NoCalendarFound;
214        //response.Success = false;
[4253]215      } else {
[4263]216        //response.Success = true;
[4253]217        response.Appointments = appointments;
218      }
219
220      client.CalendarSyncStatus = CalendarState.Fetched;
221      DaoLocator.ClientDao.Update(client);
222      return response;
223    }
224
225    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
226      Response response = new Response();
227      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
228      if (client == null) {
[4263]229        //response.Success = false;
230        response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
[4253]231        return response;
232      }
233
234      client.CalendarSyncStatus = state;
235      DaoLocator.ClientDao.Update(client);
236
237      return response;
238    }
239
240    /// <summary>
241    /// The client has to send regulary heartbeats
242    /// this hearbeats will be stored in the heartbeats dictionary
243    /// check if there is work for the client and send the client a response if he should pull a job
244    /// </summary>
245    /// <param name="hbData"></param>
246    /// <returns></returns>
[4254]247    public ResponseHeartBeat ProcessHeartBeat(HeartBeatData hbData) {
[4253]248      Logger.Debug("BEGIN Processing Heartbeat for Client " + hbData.SlaveId);
249
[4254]250      ResponseHeartBeat response = new ResponseHeartBeat();
[4253]251      response.ActionRequest = new List<MessageContainer>();
252
253      Logger.Debug("BEGIN Started Client Fetching");
254      ClientDto client = DaoLocator.ClientDao.FindById(hbData.SlaveId);
255      Logger.Debug("END Finished Client Fetching");
256      // check if the client is logged in
[4264]257      if (client.State == SlaveState.Offline || client.State == SlaveState.NullState) {
[4263]258        // response.Success = false;
259        response.StatusMessage = ResponseStatus.ProcessHeartBeat_UserNotLoggedIn;
[4253]260        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
261
262        Logger.Error("ProcessHeartBeat: Client state null or offline: " + client);
263
264        return response;
265      }
266
267      client.NrOfFreeCores = hbData.FreeCores;
268      client.FreeMemory = hbData.FreeMemory;
269
270      // save timestamp of this heartbeat
271      Logger.Debug("BEGIN Locking for Heartbeats");
272      heartbeatLock.EnterWriteLock();
273      Logger.Debug("END Locked for Heartbeats");
274      if (lastHeartbeats.ContainsKey(hbData.SlaveId)) {
275        lastHeartbeats[hbData.SlaveId] = DateTime.Now;
276      } else {
277        lastHeartbeats.Add(hbData.SlaveId, DateTime.Now);
278      }
279      heartbeatLock.ExitWriteLock();
280
281      Logger.Debug("BEGIN Processing Heartbeat Jobs");
282      ProcessJobProcess(hbData, response);
283      Logger.Debug("END Processed Heartbeat Jobs");
284
285      //check if new Cal must be loaded
286      if (client.CalendarSyncStatus == CalendarState.Fetch || client.CalendarSyncStatus == CalendarState.ForceFetch) {
287        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
288
289        //client.CalendarSyncStatus = CalendarState.Fetching;
290
291        Logger.Info("fetch or forcefetch sent");
292      }
293
294      // check if client has a free core for a new job
295      // if true, ask scheduler for a new job for this client
296      Logger.Debug(" BEGIN Looking for Client Jobs");
297      if (hbData.FreeCores > 0 && scheduler.ExistsJobForSlave(hbData)) {
298        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
299      } else {
300        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
301      }
302      Logger.Debug(" END Looked for Client Jobs");
303
304      DaoLocator.ClientDao.Update(client);
305
306      //tx.Commit();
307      Logger.Debug(" END Processed Heartbeat for Client " + hbData.SlaveId);
308      return response;
309    }
310
311    /// <summary>
312    /// Process the Job progress sent by a client
313    /// [chn] this method needs to be refactored, because its a performance hog
314    ///
315    /// what it does:
316    /// (1) find out if the jobs that should be calculated by this client (from db) and compare if they are consistent with what the joblist the client sent
317    /// (2) find out if every job from the joblist really should be calculated by this client
318    /// (3) checks if a job should be aborted and issues Message
319    /// (4) update job-progress and write to db
320    /// (5) if snapshot is requested, issue Message
321    ///
322    /// (6) for each job from DB, check if there is a job from client (again).
323    /// (7) if job matches, it is removed from newAssigneJobs
324    /// (8) if job !matches, job's TTL is reduced by 1,
325    /// (9) if TTL==0, job is set to Abort (save to DB), and Message to Abort job is issued to client
326    ///
327    ///
328    ///
329    /// quirks:
330    /// (1) the response-object is modified during the foreach-loop (only last element counts)
331    /// (2) state Abort results in Finished. This should be: AbortRequested, Aborted.
332    /// </summary>
333    /// <param name="hbData"></param>
334    /// <param name="jobAdapter"></param>
335    /// <param name="clientAdapter"></param>
336    /// <param name="response"></param>
[4254]337    private void ProcessJobProcess(HeartBeatData hbData, ResponseHeartBeat response) {
[4253]338      Logger.Debug("Started for Client " + hbData.SlaveId);
339      List<JobDto> jobsOfClient = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.ClientDao.FindById(hbData.SlaveId)));
340      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
341        if (jobsOfClient == null || jobsOfClient.Count == 0) {
[4263]342          //response.Success = false;
343          //response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
[4253]344
345          foreach (Guid jobId in hbData.JobProgress.Keys) {
346            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
347          }
348
349          Logger.Error("There is no job calculated by this user " + hbData.SlaveId + ", advise him to abort all");
350          return;
351        }
352
353        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
354          JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
355          curJob.Client = DaoLocator.ClientDao.GetClientForJob(curJob.Id);
356          if (curJob.Client == null || curJob.Client.Id != hbData.SlaveId) {
[4263]357            //response.Success = false;
358            //response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
[4253]359            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
360            Logger.Error("There is no job calculated by this user " + hbData.SlaveId + " Job: " + curJob);
[4264]361          } else if (curJob.State == JobState.Aborted) {
[4253]362            // a request to abort the job has been set
363            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
[4264]364            curJob.State = JobState.Finished;
[4253]365          } else {
366            // save job progress
367            curJob.Percentage = jobProgress.Value;
368
[4264]369            if (curJob.State == JobState.SnapshotRequested) {
[4253]370              // a request for a snapshot has been set
371              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
[4264]372              curJob.State = JobState.SnapshotSent;
[4253]373            }
374          }
375          DaoLocator.JobDao.Update(curJob);
376        }
377      }
378      foreach (JobDto currJob in jobsOfClient) {
379        bool found = false;
380        if (hbData.JobProgress != null) {
381          foreach (Guid jobId in hbData.JobProgress.Keys) {
382            if (jobId == currJob.Id) {
383              found = true;
384              break;
385            }
386          }
387        }
388        if (!found) {
389          lock (newAssignedJobs) {
390            if (newAssignedJobs.ContainsKey(currJob.Id)) {
391              newAssignedJobs[currJob.Id]--;
392              Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Client);
393              if (newAssignedJobs[currJob.Id] <= 0) {
394                Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Client);
395
[4264]396                currJob.State = JobState.Offline;
[4253]397                DaoLocator.JobDao.Update(currJob);
398
399                response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
400
401                newAssignedJobs.Remove(currJob.Id);
402              }
403            } else {
404              Logger.Error("Job ID wasn't with the heartbeats:  " + currJob);
[4264]405              currJob.State = JobState.Offline;
[4253]406              DaoLocator.JobDao.Update(currJob);
407            }
408          } // lock
409        } else {
410          lock (newAssignedJobs) {
411
412            if (newAssignedJobs.ContainsKey(currJob.Id)) {
413              Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
414              newAssignedJobs.Remove(currJob.Id);
415            }
416          }
417        }
418      }
419    }
420
421    /// <summary>
422    /// if the client was told to pull a job he calls this method
423    /// the server selects a job and sends it to the client
424    /// </summary>
425    /// <param name="clientId"></param>
426    /// <returns></returns>
[4254]427    public ResponseObject<JobDto> GetJob(Guid clientId) {
428      ResponseObject<JobDto> response = new ResponseObject<JobDto>();
[4253]429
430      JobDto job2Calculate = scheduler.GetNextJobForSlave(clientId);
431      if (job2Calculate != null) {
[4254]432        response.Obj = job2Calculate;
433        response.Obj.PluginsNeeded = DaoLocator.PluginInfoDao.GetPluginDependenciesForJob(response.Obj);
[4263]434
[4253]435        Logger.Info("Job pulled: " + job2Calculate + " for user " + clientId);
436        lock (newAssignedJobs) {
437          if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
438            newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
439        }
440      } else {
[4263]441        //response.Success = false;
[4254]442        response.Obj = null;
[4263]443        response.StatusMessage = ResponseStatus.GetJob_NoJobsAvailable;
[4253]444        Logger.Info("No more Jobs left for " + clientId);
445      }
446      return response;
447    }
448
449    public ResponseResultReceived ProcessJobResult(Stream stream, bool finished) {
450      Logger.Info("BEGIN Job received for Storage - main method:");
451
452      //Stream jobResultStream = null;
453      //Stream jobStream = null;
454
455      //try {
456      BinaryFormatter formatter = new BinaryFormatter();
457
458      JobResult result = (JobResult)formatter.Deserialize(stream);
459
460      //important - repeatable read isolation level is required here,
461      //otherwise race conditions could occur when writing the stream into the DB
462      //just removed TransactionIsolationLevel.RepeatableRead
463      //tx = session.BeginTransaction();
464
465      ResponseResultReceived response = ProcessJobResult(result.ClientId, result.JobId, new byte[] { }, result.Percentage, result.Exception, finished);
466
[4263]467      if (response.StatusMessage == ResponseStatus.Ok) {
[4253]468        Logger.Debug("Trying to aquire WCF Job Stream");
469        //jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
470        //Logger.Debug("Job Stream Aquired");
471        byte[] buffer = new byte[3024];
472        List<byte> serializedJob = new List<byte>();
473        int read = 0;
474        int i = 0;
475        while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
476          for (int j = 0; j < read; j++) {
477            serializedJob.Add(buffer[j]);
478          }
479          if (i % 100 == 0)
480            Logger.Debug("Writing to stream: " + i);
481          //jobStream.Write(buffer, 0, read);
482          i++;
483        }
484        Logger.Debug("Done Writing, closing the stream!");
485        //jobStream.Close();
486
487        DaoLocator.JobDao.SetBinaryJobFile(result.JobId, serializedJob.ToArray());
488      }
489      Logger.Info("END Job received for Storage:");
490      stream.Dispose();
491      return response;
492    }
493
[4263]494    private ResponseResultReceived ProcessJobResult(Guid clientId, Guid jobId, byte[] result, double? percentage, string exception, bool finished) {
[4253]495      Logger.Info("BEGIN Job received for Storage - SUB method: " + jobId);
496
497      ResponseResultReceived response = new ResponseResultReceived();
498      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
499
500      SerializedJob job = new SerializedJob();
501
502      if (job != null) {
503        job.JobInfo = DaoLocator.JobDao.FindById(jobId);
504        if (job.JobInfo != null) {
505          job.JobInfo.Client = job.JobInfo.Client = DaoLocator.ClientDao.GetClientForJob(jobId);
506        }
507      }
[4263]508
[4253]509      if (job != null && job.JobInfo == null) {
[4263]510        //response.Success = false;
511        response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
[4253]512        response.JobId = jobId;
513        Logger.Error("No job with Id " + jobId);
514
515        //tx.Rollback();
516        return response;
517      }
[4264]518      if (job.JobInfo.State == JobState.Aborted) {
[4263]519        //response.Success = false;
520        response.StatusMessage = ResponseStatus.ProcessJobResult_JobAborted;
[4253]521
522        Logger.Error("Job was aborted! " + job.JobInfo);
523
524        //tx.Rollback();
525        return response;
526      }
527      if (job.JobInfo.Client == null) {
[4263]528        //response.Success = false;
529        response.StatusMessage = ResponseStatus.ProcessJobResult_JobIsNotBeeingCalculated;
[4253]530        response.JobId = jobId;
531
532        Logger.Error("Job is not being calculated (client = null)! " + job.JobInfo);
533
534        //tx.Rollback();
535        return response;
536      }
537      if (job.JobInfo.Client.Id != clientId) {
[4263]538        //response.Success = false;
539        response.StatusMessage = ResponseStatus.ProcessJobResult_WrongClientForJob;
[4253]540        response.JobId = jobId;
541
542        Logger.Error("Wrong Client for this Job! " + job.JobInfo + ", Sending Client is: " + clientId);
543
544        //tx.Rollback();
545        return response;
546      }
[4264]547      if (job.JobInfo.State == JobState.Finished) {
[4263]548        response.StatusMessage = ResponseStatus.Ok;
[4253]549        response.JobId = jobId;
550
551        Logger.Error("Job already finished! " + job.JobInfo + ", Sending Client is: " + clientId);
552
553        //tx.Rollback();
554        return response;
555      }
556      //Todo: RequestsnapshotSent => calculating?
[4264]557      if (job.JobInfo.State == JobState.SnapshotSent) {
558        job.JobInfo.State = JobState.Calculating;
[4253]559      }
[4264]560      if (job.JobInfo.State != JobState.Calculating && job.JobInfo.State != JobState.Pending) {
[4263]561        //response.Success = false;
562        response.StatusMessage = ResponseStatus.ProcessJobResult_InvalidJobState;
[4253]563        response.JobId = jobId;
564
565        Logger.Error("Wrong Job State, job is: " + job.JobInfo);
566
567        //tx.Rollback();
568        return response;
569      }
570      job.JobInfo.Percentage = percentage;
571
572      if (!string.IsNullOrEmpty(exception)) {
[4264]573        job.JobInfo.State = JobState.Failed;
[4253]574        job.JobInfo.Exception = exception;
575        job.JobInfo.DateFinished = DateTime.Now;
576      } else if (finished) {
[4264]577        job.JobInfo.State = JobState.Finished;
[4253]578        job.JobInfo.DateFinished = DateTime.Now;
579      }
580
581      job.SerializedJobData = result;
582
583      DaoLocator.JobDao.Update(job.JobInfo);
584
[4263]585      response.StatusMessage = ResponseStatus.Ok;
[4253]586      response.JobId = jobId;
587      response.Finished = finished;
588
589      Logger.Info("END Job received for Storage - SUB method: " + jobId);
590      return response;
591
592    }
593
594    /// <summary>
595    /// the client can send job results during calculating
596    /// and will send a final job result when he finished calculating
597    /// these job results will be stored in the database
598    /// </summary>
599    /// <param name="clientId"></param>
600    /// <param name="jobId"></param>
601    /// <param name="result"></param>
602    /// <param name="exception"></param>
603    /// <param name="finished"></param>
604    /// <returns></returns>
605    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
606      Guid jobId,
607      byte[] result,
608      double percentage,
609      string exception) {
610
611      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
612    }
613
614    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, string exception) {
615      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
616    }
617
618    /// <summary>
619    /// when a client logs out the state will be set
620    /// and the entry in the last hearbeats dictionary will be removed
621    /// </summary>
622    /// <param name="clientId"></param>
623    /// <returns></returns>                       
624    public Response Logout(Guid clientId) {
625      Logger.Info("Client logged out " + clientId);
626
627      Response response = new Response();
628
629      heartbeatLock.EnterWriteLock();
630      if (lastHeartbeats.ContainsKey(clientId))
631        lastHeartbeats.Remove(clientId);
632      heartbeatLock.ExitWriteLock();
633
634      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
635      if (client == null) {
[4263]636        //response.Success = false;
637        response.StatusMessage = ResponseStatus.Logout_SlaveNotRegistered;
[4253]638        return response;
639      }
[4264]640      if (client.State == SlaveState.Calculating) {
[4253]641        // check wich job the client was calculating and reset it
642        IEnumerable<JobDto> jobsOfClient = DaoLocator.JobDao.FindActiveJobsOfSlave(client);
643        foreach (JobDto job in jobsOfClient) {
[4264]644          if (job.State != JobState.Finished)
[4253]645            DaoLocator.JobDao.SetJobOffline(job);
646        }
647      }
648
[4264]649      client.State = SlaveState.Offline;
[4253]650      DaoLocator.ClientDao.Update(client);
651
652      return response;
653    }
654
655    /// <summary>
656    /// If a client goes offline and restores a job he was calculating
657    /// he can ask the client if he still needs the job result
658    /// </summary>
659    /// <param name="jobId"></param>
660    /// <returns></returns>
661    public Response IsJobStillNeeded(Guid jobId) {
662      Response response = new Response();
663      JobDto job = DaoLocator.JobDao.FindById(jobId);
664      if (job == null) {
[4263]665        //response.Success = false;
666        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobDoesNotExist;
[4253]667        Logger.Error("Job doesn't exist (anymore)! " + jobId);
668        return response;
669      }
[4264]670      if (job.State == JobState.Finished) {
[4263]671        //response.Success = true;
672        response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobAlreadyFinished;
[4253]673        Logger.Error("already finished! " + job);
674        return response;
675      }
[4264]676      job.State = JobState.Pending;
[4253]677      lock (pendingJobs) {
678        pendingJobs.Add(job.Id, PENDING_TIMEOUT);
679      }
680
681      DaoLocator.JobDao.Update(job);
682
683      return response;
684    }
685
[4263]686    public ResponseList<CachedHivePluginInfoDto> GetPlugins(List<HivePluginInfoDto> pluginList) {
[4254]687      ResponseList<CachedHivePluginInfoDto> response = new ResponseList<CachedHivePluginInfoDto>();
[4263]688      response.List = new List<CachedHivePluginInfoDto>();
[4253]689      foreach (HivePluginInfoDto pluginInfo in pluginList) {
690        if (pluginInfo.Update) {
691          //check if there is a newer version         
[4263]692          IPluginDescription ipd = ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision > pluginInfo.Version.Revision).SingleOrDefault();
[4253]693          if (ipd != null) {
[4263]694            response.List.Add(ConvertPluginDescriptorToDto(ipd));
[4253]695          }
696        } else {
[4263]697          IPluginDescription ipd = ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision >= pluginInfo.Version.Revision).SingleOrDefault();
[4253]698          if (ipd != null) {
[4263]699            response.List.Add(ConvertPluginDescriptorToDto(ipd));
[4253]700          } else {
[4263]701            //response.Success = false;
702            response.StatusMessage = ResponseStatus.GetPlugins_PluginsNotAvailable;
[4253]703            return response;
704          }
705        }
706      }
707      return response;
708    }
709
[4263]710    private CachedHivePluginInfoDto ConvertPluginDescriptorToDto(IPluginDescription currPlugin) {
[4253]711      CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
712        Name = currPlugin.Name,
713        Version = currPlugin.Version
714      };
715
716      foreach (string fileName in from file in currPlugin.Files select file.Name) {
717        currCachedPlugin.PluginFiles.Add(new HivePluginFile(File.ReadAllBytes(fileName), fileName));
718      }
719      return currCachedPlugin;
720    }
721
722    #endregion
723  }
724}
Note: See TracBrowser for help on using the repository browser.