Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 3155

Last change on this file since 3155 was 3018, checked in by kgrading, 15 years ago

added Priority and resource restricted scheduling (#907)

File size: 30.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39using System.Runtime.Serialization.Formatters.Binary;
40using HeuristicLab.Tracing;
41using Linq = HeuristicLab.Hive.Server.LINQDataAccess;
42using System.Transactions;
43
44namespace HeuristicLab.Hive.Server.Core {
45  /// <summary>
46  /// The ClientCommunicator manages the whole communication with the client
47  /// </summary>
48  public class ClientCommunicator : IClientCommunicator,
49    IInternalClientCommunicator {
50    private static Dictionary<Guid, DateTime> lastHeartbeats =
51      new Dictionary<Guid, DateTime>();
52    private static Dictionary<Guid, int> newAssignedJobs =
53      new Dictionary<Guid, int>();
54    private static Dictionary<Guid, int> pendingJobs =
55      new Dictionary<Guid, int>();
56
57    private static ReaderWriterLockSlim heartbeatLock =
58      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
59
60    //private ISessionFactory factory;
61    private ILifecycleManager lifecycleManager;
62    private IInternalJobManager jobManager;
63    private IScheduler scheduler;   
64
65    private static int PENDING_TIMEOUT = 100;
66
67    /// <summary>
68    /// Initialization of the Adapters to the database
69    /// Initialization of Eventhandler for the lifecycle management
70    /// Initialization of lastHearbeats Dictionary
71    /// </summary>
72    public ClientCommunicator() {
73      //factory = ServiceLocator.GetSessionFactory();
74
75      lifecycleManager = ServiceLocator.GetLifecycleManager();
76      jobManager = ServiceLocator.GetJobManager() as
77        IInternalJobManager;
78      scheduler = ServiceLocator.GetScheduler();
79
80      lifecycleManager.RegisterHeartbeat(
81        new EventHandler(lifecycleManager_OnServerHeartbeat));
82    }
83
84    /// <summary>
85    /// Check if online clients send their hearbeats
86    /// if not -> set them offline and check if they where calculating a job
87    /// </summary>
88    /// <param name="sender"></param>
89    /// <param name="e"></param>
90    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
91      HiveLogger.Info(this.ToString() + ": Server Heartbeat ticked");
92
93      using (TransactionScope scope = new TransactionScope()) {
94
95        List<ClientDto> allClients = new List<ClientDto>(DaoLocator.ClientDao.FindAll());
96
97        foreach (ClientDto client in allClients) {
98          if (client.State != State.offline && client.State != State.nullState) {
99            heartbeatLock.EnterUpgradeableReadLock();
100
101            if (!lastHeartbeats.ContainsKey(client.Id)) {
102              HiveLogger.Info(this.ToString() + ": Client " + client.Id +
103                              " wasn't offline but hasn't sent heartbeats - setting offline");
104              client.State = State.offline;
105              DaoLocator.ClientDao.Update(client);
106              HiveLogger.Info(this.ToString() + ": Client " + client.Id +
107                              " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
108              foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) {
109                jobManager.ResetJobsDependingOnResults(job);
110              }
111            }
112            else {
113              DateTime lastHbOfClient = lastHeartbeats[client.Id];
114
115              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
116              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
117              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
118                // if client calculated jobs, the job must be reset
119                HiveLogger.Info(this.ToString() + ": Client timed out and is on RESET");
120                foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) {
121                  jobManager.ResetJobsDependingOnResults(job);
122                  lock (newAssignedJobs) {
123                    if (newAssignedJobs.ContainsKey(job.Id))
124                      newAssignedJobs.Remove(job.Id);
125                  }
126                }
127
128                // client must be set offline
129                client.State = State.offline;
130
131                //clientAdapter.Update(client);
132                DaoLocator.ClientDao.Update(client);
133
134                heartbeatLock.EnterWriteLock();
135                lastHeartbeats.Remove(client.Id);
136                heartbeatLock.ExitWriteLock();
137              }
138            }
139
140            heartbeatLock.ExitUpgradeableReadLock();
141          }
142          else {
143            //TODO: RLY neccesary?
144            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Shouldn't have offline or nullstate, has " + client.State);
145            heartbeatLock.EnterWriteLock();
146            //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Resetting all his jobs");
147            if (lastHeartbeats.ContainsKey(client.Id))
148              lastHeartbeats.Remove(client.Id);
149            foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) {
150              jobManager.ResetJobsDependingOnResults(job);
151            }
152            heartbeatLock.ExitWriteLock();
153          }
154        }
155        CheckForPendingJobs();
156        DaoLocator.DestroyContext();
157        scope.Complete();       
158      }
159    }
160
161    private void CheckForPendingJobs() {
162      IList<JobDto> pendingJobsInDB = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(State.pending));
163
164      foreach (JobDto currJob in pendingJobsInDB) {
165        lock (pendingJobs) {
166          if (pendingJobs.ContainsKey(currJob.Id)) {
167            if (pendingJobs[currJob.Id] <= 0) {
168              currJob.State = State.offline;
169              DaoLocator.JobDao.Update(currJob);
170              //jobAdapter.Update(currJob);
171            } else {
172              pendingJobs[currJob.Id]--;
173            }
174          }
175        }
176      }
177    }
178
179    #region IClientCommunicator Members
180
181    /// <summary>
182    /// Login process for the client
183    /// A hearbeat entry is created as well (login is the first hearbeat)
184    /// </summary>
185    /// <param name="clientInfo"></param>
186    /// <returns></returns>
187    public Response Login(ClientDto clientInfo) {
188    //  ISession session = factory.GetSessionForCurrentThread();
189    //  ITransaction tx = null;
190
191    //  try {
192    //    IClientAdapter clientAdapter =
193    //      session.GetDataAdapter<ClientInfo, IClientAdapter>();
194
195    //    tx = session.BeginTransaction();
196
197        Response response = new Response();
198
199        heartbeatLock.EnterWriteLock();
200        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
201          lastHeartbeats[clientInfo.Id] = DateTime.Now;
202        } else {
203          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
204        }
205        heartbeatLock.ExitWriteLock();
206
207        clientInfo.State = State.idle;
208
209        if (DaoLocator.ClientDao.FindById(clientInfo.Id) == null)
210          DaoLocator.ClientDao.Insert(clientInfo);
211        else
212          DaoLocator.ClientDao.Update(clientInfo);       
213        response.Success = true;
214        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
215
216        //tx.Commit();
217        return response;
218      //}
219      /*catch (Exception ex) {
220        if (tx != null)
221          tx.Rollback();
222        throw ex;
223      }
224      finally {
225        if (session != null)
226          session.EndSession();
227      } */
228    }
229
230    /// <summary>
231    /// The client has to send regulary heartbeats
232    /// this hearbeats will be stored in the heartbeats dictionary
233    /// check if there is work for the client and send the client a response if he should pull a job
234    /// </summary>
235    /// <param name="hbData"></param>
236    /// <returns></returns>
237    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
238      HiveLogger.Debug(this.ToString() + ": BEGIN Processing Heartbeat for Client " + hbData.ClientId);
239      HiveLogger.Debug(this.ToString() + ": BEGIN Fetching Adapters");
240      HiveLogger.Debug(this.ToString() + ": END Fetched Adapters");
241      HiveLogger.Debug(this.ToString() + ": BEGIN Starting Transaction");
242
243      HiveLogger.Debug(this.ToString() + ": END Started Transaction");
244
245        ResponseHB response = new ResponseHB();
246        response.ActionRequest = new List<MessageContainer>();
247
248        HiveLogger.Debug(this.ToString() + ": BEGIN Started Client Fetching");
249        ClientDto client = DaoLocator.ClientDao.FindById(hbData.ClientId);
250        HiveLogger.Debug(this.ToString() + ": END Finished Client Fetching");
251        // check if the client is logged in
252        if (client.State == State.offline || client.State == State.nullState) {
253          response.Success = false;
254          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;         
255          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
256
257          HiveLogger.Error(this.ToString() + " ProcessHeartBeat: Client state null or offline: " + client);
258
259          return response;
260        }
261
262        client.NrOfFreeCores = hbData.FreeCores;
263        client.FreeMemory = hbData.FreeMemory;
264
265        // save timestamp of this heartbeat
266        HiveLogger.Debug(this.ToString() + ": BEGIN Locking for Heartbeats");
267        heartbeatLock.EnterWriteLock();
268        HiveLogger.Debug(this.ToString() + ": END Locked for Heartbeats");
269        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
270          lastHeartbeats[hbData.ClientId] = DateTime.Now;
271        } else {
272          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
273        }
274        heartbeatLock.ExitWriteLock();
275
276        // check if client has a free core for a new job
277        // if true, ask scheduler for a new job for this client
278        HiveLogger.Debug(this.ToString() + ": BEGIN Looking for Client Jobs");
279        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData)) {
280          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
281        } else {
282          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
283        }
284        HiveLogger.Debug(this.ToString() + ": END Looked for Client Jobs");
285        response.Success = true;
286        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
287
288        HiveLogger.Debug(this.ToString() + ": BEGIN Processing Heartbeat Jobs");
289        processJobProcess(hbData, response);
290        HiveLogger.Debug(this.ToString() + ": END Processed Heartbeat Jobs");
291       
292        DaoLocator.ClientDao.Update(client);
293
294        //tx.Commit();
295        HiveLogger.Debug(this.ToString() + ": END Processed Heartbeat for Client " + hbData.ClientId);
296        return response;
297      }
298
299    /// <summary>
300    /// Process the Job progress sent by a client
301    /// </summary>
302    /// <param name="hbData"></param>
303    /// <param name="jobAdapter"></param>
304    /// <param name="clientAdapter"></param>
305    /// <param name="response"></param>
306    private void processJobProcess(HeartBeatData hbData, ResponseHB response) {
307      HiveLogger.Info(this.ToString() + " processJobProcess: Started for Client " + hbData.ClientId);     
308     
309      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
310        List<JobDto> jobsOfClient = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfClient(DaoLocator.ClientDao.FindById(hbData.ClientId)));
311        if (jobsOfClient == null || jobsOfClient.Count == 0) {
312          response.Success = false;
313          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
314          HiveLogger.Error(this.ToString() + " processJobProcess: There is no job calculated by this user " + hbData.ClientId);     
315          return;
316        }
317
318        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
319          JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
320          curJob.Client = DaoLocator.ClientDao.GetClientForJob(curJob.Id);
321          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
322            response.Success = false;
323            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
324            HiveLogger.Error(this.ToString() + " processJobProcess: There is no job calculated by this user " + hbData.ClientId + " Job: " + curJob);     
325          } else if (curJob.State == State.abort) {
326            // a request to abort the job has been set
327            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
328            curJob.State = State.finished;
329          } else {
330            // save job progress
331            curJob.Percentage = jobProgress.Value;
332
333            if (curJob.State == State.requestSnapshot) {
334              // a request for a snapshot has been set
335              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
336              curJob.State = State.requestSnapshotSent;
337            }
338          }
339          DaoLocator.JobDao.Update(curJob);
340        }
341        foreach (JobDto currJob in jobsOfClient) {
342          bool found = false;
343          foreach (Guid jobId in hbData.JobProgress.Keys) {
344            if (jobId == currJob.Id) {
345              found = true;
346              break;
347            }
348          }
349          if (!found) {
350            lock (newAssignedJobs) {
351              if (newAssignedJobs.ContainsKey(currJob.Id)) {
352                newAssignedJobs[currJob.Id]--;
353                HiveLogger.Error(this.ToString() + " processJobProcess: Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Client);                     
354                if (newAssignedJobs[currJob.Id] <= 0) {
355                  HiveLogger.Error(this.ToString() + " processJobProcess: Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Client);                 
356
357                  currJob.State = State.offline;
358                  DaoLocator.JobDao.Update(currJob);
359
360                  response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
361
362                  newAssignedJobs.Remove(currJob.Id);
363                }
364              } else {
365                HiveLogger.Error(this.ToString() + " processJobProcess: Job ID wasn't with the heartbeats:  " + currJob);                     
366                currJob.State = State.offline;
367                DaoLocator.JobDao.Update(currJob);
368              }
369            } // lock
370          } else {
371            lock (newAssignedJobs) {
372
373              if (newAssignedJobs.ContainsKey(currJob.Id)) {
374                HiveLogger.Info(this.ToString() + " processJobProcess: Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);                     
375                newAssignedJobs.Remove(currJob.Id);
376              }
377            }
378          }
379        }
380      }
381    }
382
383    /// <summary>
384    /// if the client was told to pull a job he calls this method
385    /// the server selects a job and sends it to the client
386    /// </summary>
387    /// <param name="clientId"></param>
388    /// <returns></returns>
389    /*public ResponseSerializedJob SendSerializedJob(Guid clientId) {
390      ISession session = factory.GetSessionForCurrentThread();
391      ITransaction tx = null;
392
393      try {
394        IJobAdapter jobAdapter =
395          session.GetDataAdapter<JobDto, IJobAdapter>();
396
397        tx = session.BeginTransaction();
398
399        ResponseSerializedJob response = new ResponseSerializedJob();
400
401        JobDto job2Calculate = scheduler.GetNextJobForClient(clientId);
402        if (job2Calculate != null) {
403          SerializedJob computableJob =
404            jobAdapter.GetSerializedJob(job2Calculate.Id);
405
406          response.Job = computableJob;
407          response.Success = true;
408          HiveLogger.Info(this.ToString() + " SendSerializedJob: Job pulled: " + computableJob.JobInfo + " for user " + clientId);                     
409          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
410          lock (newAssignedJobs) {
411            if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
412              newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
413          }
414        } else {
415          HiveLogger.Info(this.ToString() + " SendSerializedJob: No more Jobs left for " + clientId);                     
416          response.Success = false;
417          response.Job = null;
418          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
419        }
420
421        tx.Commit();
422
423        return response;
424      }
425      catch (Exception ex) {
426        if (tx != null)
427          tx.Rollback();
428        throw ex;
429      }
430      finally {
431        if (session != null)
432          session.EndSession();
433      }
434    }     */
435
436    /// <summary>
437    /// if the client was told to pull a job he calls this method
438    /// the server selects a job and sends it to the client
439    /// </summary>
440    /// <param name="clientId"></param>
441    /// <returns></returns>
442    public ResponseJob SendJob(Guid clientId) {
443
444      ResponseJob response = new ResponseJob();
445
446      JobDto job2Calculate = scheduler.GetNextJobForClient(clientId);
447      if (job2Calculate != null) {
448        response.Job = job2Calculate;
449        response.Success = true;
450        HiveLogger.Info(this.ToString() + " SendSerializedJob: Job pulled: " + job2Calculate + " for user " + clientId);
451        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
452        lock (newAssignedJobs) {
453          if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
454            newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
455        }
456      }
457      else {
458        response.Success = false;
459        response.Job = null;
460        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
461        HiveLogger.Info(this.ToString() + " SendSerializedJob: No more Jobs left for " + clientId);
462      }
463
464
465
466      return response;
467    }
468
469    public ResponseResultReceived ProcessJobResult(
470      Stream stream,
471      bool finished) {
472
473      HiveLogger.Info(this.ToString() + " ProcessJobResult: BEGIN Job received for Storage - main method:");
474     
475     
476      Stream jobResultStream = null;
477      Stream jobStream = null;
478
479      //try {
480        BinaryFormatter formatter =
481          new BinaryFormatter();
482
483        JobResult result =
484          (JobResult)formatter.Deserialize(stream);
485
486        //important - repeatable read isolation level is required here,
487        //otherwise race conditions could occur when writing the stream into the DB
488        //just removed TransactionIsolationLevel.RepeatableRead
489        //tx = session.BeginTransaction();
490
491        ResponseResultReceived response =
492          ProcessJobResult(
493          result.ClientId,
494          result.JobId,
495          new byte[] { },
496          result.Percentage,
497          result.Exception,
498          finished);
499
500        if (response.Success) {
501          jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
502
503          byte[] buffer = new byte[3024];
504          int read = 0;
505          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {           
506            jobStream.Write(buffer, 0, read);
507          }
508          jobStream.Close();         
509        }                   
510        HiveLogger.Info(this.ToString() + " ProcessJobResult: END Job received for Storage:");
511        return response;
512      }
513
514
515    private ResponseResultReceived ProcessJobResult(Guid clientId,
516      Guid jobId,
517      byte[] result,
518      double percentage,
519      Exception exception,
520      bool finished) {
521     
522      HiveLogger.Info(this.ToString() + " ProcessJobResult: BEGIN Job received for Storage - SUB method: " + jobId);
523
524        ResponseResultReceived response = new ResponseResultReceived();
525        ClientDto client =
526          DaoLocator.ClientDao.FindById(clientId);
527
528        SerializedJob job =
529          new SerializedJob();
530
531        if (job != null) {
532          job.JobInfo =
533            DaoLocator.JobDao.FindById(jobId);         
534          job.JobInfo.Client = job.JobInfo.Client = DaoLocator.ClientDao.GetClientForJob(jobId);
535        }
536
537        if (job == null && job.JobInfo != null) {
538          response.Success = false;
539          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
540          response.JobId = jobId;
541         
542          HiveLogger.Error(this.ToString() + " ProcessJobResult: No job with Id " + jobId);                     
543         
544          //tx.Rollback();
545          return response;
546        }
547        if (job.JobInfo.State == State.abort) {
548          response.Success = false;
549          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
550         
551          HiveLogger.Error(this.ToString() + " ProcessJobResult: Job was aborted! " + job.JobInfo);                     
552         
553          //tx.Rollback();
554          return response;
555        }
556        if (job.JobInfo.Client == null) {
557          response.Success = false;
558          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
559          response.JobId = jobId;
560
561          HiveLogger.Error(this.ToString() + " ProcessJobResult: Job is not being calculated (client = null)! " + job.JobInfo);                     
562
563          //tx.Rollback();
564          return response;
565        }
566        if (job.JobInfo.Client.Id != clientId) {
567          response.Success = false;
568          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
569          response.JobId = jobId;
570
571          HiveLogger.Error(this.ToString() + " ProcessJobResult: Wrong Client for this Job! " + job.JobInfo + ", Sending Client is: " + clientId);                     
572
573          //tx.Rollback();
574          return response;
575        }
576        if (job.JobInfo.State == State.finished) {
577          response.Success = true;
578          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
579          response.JobId = jobId;
580
581          HiveLogger.Error(this.ToString() + " ProcessJobResult: Job already finished! " + job.JobInfo + ", Sending Client is: " + clientId);                     
582
583          //tx.Rollback();
584          return response;
585        }
586        if (job.JobInfo.State == State.requestSnapshotSent) {
587          job.JobInfo.State = State.calculating;
588        }
589        if (job.JobInfo.State != State.calculating &&
590          job.JobInfo.State != State.pending) {
591          response.Success = false;
592          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
593          response.JobId = jobId;
594
595          HiveLogger.Error(this.ToString() + " ProcessJobResult: Wrong Job State, job is: " + job.JobInfo);                     
596
597          //tx.Rollback();
598          return response;
599        }
600        job.JobInfo.Percentage = percentage;
601
602        if (finished) {
603          job.JobInfo.State = State.finished;
604        }
605
606        job.SerializedJobData = result;
607       
608        DaoLocator.JobDao.Update(job.JobInfo);
609
610        response.Success = true;
611        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
612        response.JobId = jobId;
613        response.finished = finished;       
614               
615        HiveLogger.Info(this.ToString() + " ProcessJobResult: END Job received for Storage - SUB method: " + jobId);       
616        return response;
617
618    }
619
620
621    /// <summary>
622    /// the client can send job results during calculating
623    /// and will send a final job result when he finished calculating
624    /// these job results will be stored in the database
625    /// </summary>
626    /// <param name="clientId"></param>
627    /// <param name="jobId"></param>
628    /// <param name="result"></param>
629    /// <param name="exception"></param>
630    /// <param name="finished"></param>
631    /// <returns></returns>
632    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
633      Guid jobId,
634      byte[] result,
635      double percentage,
636      Exception exception) {
637
638      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
639    }
640
641
642    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
643      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
644    }
645
646    /// <summary>
647    /// when a client logs out the state will be set
648    /// and the entry in the last hearbeats dictionary will be removed
649    /// </summary>
650    /// <param name="clientId"></param>
651    /// <returns></returns>                       
652    public Response Logout(Guid clientId) {
653
654      HiveLogger.Info("Client logged out " + clientId);
655     
656      /*ISession session = factory.GetSessionForCurrentThread();
657      ITransaction tx = null;
658
659      try {
660        IClientAdapter clientAdapter =
661          session.GetDataAdapter<ClientDto, IClientAdapter>();
662        IJobAdapter jobAdapter =
663          session.GetDataAdapter<JobDto, IJobAdapter>();
664
665        tx = session.BeginTransaction();
666           */
667        Response response = new Response();
668
669        heartbeatLock.EnterWriteLock();
670        if (lastHeartbeats.ContainsKey(clientId))
671          lastHeartbeats.Remove(clientId);
672        heartbeatLock.ExitWriteLock();
673
674      ClientDto client = DaoLocator.ClientDao.FindById(clientId);
675        if (client == null) {
676          response.Success = false;
677          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
678          return response;
679        }
680        if (client.State == State.calculating) {
681          // check wich job the client was calculating and reset it
682          IEnumerable<JobDto> jobsOfClient = DaoLocator.JobDao.FindActiveJobsOfClient(client);
683          foreach (JobDto job in jobsOfClient) {
684            if (job.State != State.finished)
685              jobManager.ResetJobsDependingOnResults(job);
686          }
687        }
688
689        client.State = State.offline;
690        DaoLocator.ClientDao.Update(client);
691
692        response.Success = true;
693        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
694
695        //tx.Commit();
696        return response;
697      } /*
698      catch (Exception ex) {
699        if (tx != null)
700          tx.Rollback();
701        throw ex;
702      }
703      finally {
704        if (session != null)
705          session.EndSession();
706      }
707    }     */
708
709    /// <summary>
710    /// If a client goes offline and restores a job he was calculating
711    /// he can ask the client if he still needs the job result
712    /// </summary>
713    /// <param name="jobId"></param>
714    /// <returns></returns>
715    public Response IsJobStillNeeded(Guid jobId) {
716      /*ISession session = factory.GetSessionForCurrentThread();
717      ITransaction tx = null;
718
719      try {
720        IJobAdapter jobAdapter =
721          session.GetDataAdapter<JobDto, IJobAdapter>();
722        tx = session.BeginTransaction();                */
723
724        Response response = new Response();
725      JobDto job = DaoLocator.JobDao.FindById(jobId);
726        if (job == null) {
727          response.Success = false;
728          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
729          HiveLogger.Error(this.ToString() + " IsJobStillNeeded: Job doesn't exist (anymore)! " + jobId);
730          return response;
731        }
732        if (job.State == State.finished) {
733          response.Success = true;
734          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
735          HiveLogger.Error(this.ToString() + " IsJobStillNeeded: already finished! " + job);
736          return response;
737        }
738        job.State = State.pending;
739        lock (pendingJobs) {
740          pendingJobs.Add(job.Id, PENDING_TIMEOUT);
741        }
742
743        DaoLocator.JobDao.Update(job);
744
745        response.Success = true;
746        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
747        //tx.Commit();
748        return response;
749      }
750      /*catch (Exception ex) {
751        if (tx != null)
752          tx.Rollback();
753        throw ex;
754      }
755      finally {
756        if (session != null)
757          session.EndSession();
758      }
759    }   */
760
761    public ResponsePlugin SendPlugins(List<HivePluginInfoDto> pluginList) {
762      ResponsePlugin response = new ResponsePlugin();
763      foreach (HivePluginInfoDto pluginInfo in pluginList) {
764        // TODO: BuildDate deleted, not needed???
765        // TODO: Split version to major, minor and revision number
766        foreach (IPluginDescription currPlugin in ApplicationManager.Manager.Plugins) {
767          if (currPlugin.Name == pluginInfo.Name) {
768
769            CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
770              Name = currPlugin.Name,
771              Version = currPlugin.Version.ToString(),
772              BuildDate = currPlugin.BuildDate
773            };
774
775            foreach (string fileName in from file in currPlugin.Files select file.Name) {
776              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(fileName));
777            }
778            response.Plugins.Add(currCachedPlugin);
779          }
780        }
781      }
782      response.Success = true;
783      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
784
785      return response;
786
787    }
788
789    #endregion
790  }
791}
Note: See TracBrowser for help on using the repository browser.