Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 1998

Last change on this file since 1998 was 1998, checked in by msteinbi, 15 years ago

added pending jobs functionality (#531)

File size: 23.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39
40namespace HeuristicLab.Hive.Server.Core {
41  /// <summary>
42  /// The ClientCommunicator manages the whole communication with the client
43  /// </summary>
44  public class ClientCommunicator: IClientCommunicator {
45    private static Dictionary<Guid, DateTime> lastHeartbeats =
46      new Dictionary<Guid,DateTime>();
47    private static Dictionary<Guid, int> newAssignedJobs =
48      new Dictionary<Guid, int>();
49    private static Dictionary<Guid, int> pendingJobs =
50      new Dictionary<Guid, int>();
51
52    private static ReaderWriterLockSlim heartbeatLock =
53      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
54
55    private ISessionFactory factory;
56    private ILifecycleManager lifecycleManager;
57    private IInternalJobManager jobManager;
58    private IScheduler scheduler;
59
60    private static int PENDING_TIMEOUT = 100;
61
62    /// <summary>
63    /// Initialization of the Adapters to the database
64    /// Initialization of Eventhandler for the lifecycle management
65    /// Initialization of lastHearbeats Dictionary
66    /// </summary>
67    public ClientCommunicator() {
68      factory = ServiceLocator.GetSessionFactory();
69     
70      lifecycleManager = ServiceLocator.GetLifecycleManager();
71      jobManager = ServiceLocator.GetJobManager() as
72        IInternalJobManager;
73      scheduler = ServiceLocator.GetScheduler();
74
75      lifecycleManager.RegisterHeartbeat(
76        new EventHandler(lifecycleManager_OnServerHeartbeat));
77    }
78
79    /// <summary>
80    /// Check if online clients send their hearbeats
81    /// if not -> set them offline and check if they where calculating a job
82    /// </summary>
83    /// <param name="sender"></param>
84    /// <param name="e"></param>
85    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
86      ISession session = factory.GetSessionForCurrentThread();
87      ITransaction tx = null;
88
89      try {
90        IClientAdapter clientAdapter =
91          session.GetDataAdapter<ClientInfo, IClientAdapter>();
92        IJobAdapter jobAdapter =
93          session.GetDataAdapter<Job, IJobAdapter>();
94
95        tx = session.BeginTransaction();
96
97        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
98
99        foreach (ClientInfo client in allClients) {
100          if (client.State != State.offline && client.State != State.nullState) {
101            heartbeatLock.EnterUpgradeableReadLock();
102
103            if (!lastHeartbeats.ContainsKey(client.Id)) {
104              client.State = State.offline;
105              clientAdapter.Update(client);
106              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
107                jobManager.ResetJobsDependingOnResults(job);
108              }
109            } else {
110              DateTime lastHbOfClient = lastHeartbeats[client.Id];
111
112              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
113              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
114              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
115                // if client calculated jobs, the job must be reset
116                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
117                  jobManager.ResetJobsDependingOnResults(job);
118                  lock (newAssignedJobs) {
119                    if (newAssignedJobs.ContainsKey(job.Id))
120                      newAssignedJobs.Remove(job.Id);
121                  }
122                }
123
124                // client must be set offline
125                client.State = State.offline;
126                clientAdapter.Update(client);
127
128                heartbeatLock.EnterWriteLock();
129                lastHeartbeats.Remove(client.Id);
130                heartbeatLock.ExitWriteLock();
131              }
132            }
133
134            heartbeatLock.ExitUpgradeableReadLock();
135          } else {
136            heartbeatLock.EnterWriteLock();
137            if (lastHeartbeats.ContainsKey(client.Id))
138              lastHeartbeats.Remove(client.Id);
139            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
140              jobManager.ResetJobsDependingOnResults(job);
141            }
142            heartbeatLock.ExitWriteLock();
143          }
144        }
145        CheckForPendingJobs(jobAdapter);
146
147        tx.Commit();
148      }
149      catch (Exception ex) {
150        if (tx != null)
151          tx.Rollback();
152        throw ex;
153      }
154      finally {
155        if (session != null)
156          session.EndSession();
157      }
158    }
159
160    private void CheckForPendingJobs(IJobAdapter jobAdapter) {
161      IList<Job> pendingJobsInDB = new List<Job>(jobAdapter.GetJobsByState(State.pending));
162
163      foreach (Job currJob in pendingJobsInDB) {
164        lock (pendingJobs) {
165          if (pendingJobs.ContainsKey(currJob.Id)) {
166            if (pendingJobs[currJob.Id] <= 0) {
167              currJob.State = State.offline;
168              jobAdapter.Update(currJob);
169            } else {
170              pendingJobs[currJob.Id]--;
171            }
172          }
173        }
174      }
175    }
176
177    #region IClientCommunicator Members
178
179    /// <summary>
180    /// Login process for the client
181    /// A hearbeat entry is created as well (login is the first hearbeat)
182    /// </summary>
183    /// <param name="clientInfo"></param>
184    /// <returns></returns>
185    public Response Login(ClientInfo clientInfo) {
186      ISession session = factory.GetSessionForCurrentThread();
187      ITransaction tx = null;
188
189      try {
190        IClientAdapter clientAdapter =
191          session.GetDataAdapter<ClientInfo, IClientAdapter>();
192
193        tx = session.BeginTransaction();
194
195        Response response = new Response();
196
197        heartbeatLock.EnterWriteLock();
198        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
199          lastHeartbeats[clientInfo.Id] = DateTime.Now;
200        } else {
201          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
202        }
203        heartbeatLock.ExitWriteLock();
204
205        clientInfo.State = State.idle;
206        clientAdapter.Update(clientInfo);
207        response.Success = true;
208        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
209
210        tx.Commit();
211        return response;
212      }
213      catch (Exception ex) {
214        if (tx != null)
215          tx.Rollback();
216        throw ex;
217      }
218      finally {
219        if (session != null)
220          session.EndSession();
221      }
222    }
223
224    /// <summary>
225    /// The client has to send regulary heartbeats
226    /// this hearbeats will be stored in the heartbeats dictionary
227    /// check if there is work for the client and send the client a response if he should pull a job
228    /// </summary>
229    /// <param name="hbData"></param>
230    /// <returns></returns>
231    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
232      ISession session = factory.GetSessionForCurrentThread();
233      ITransaction tx = null;
234
235      try {
236        IClientAdapter clientAdapter =
237          session.GetDataAdapter<ClientInfo, IClientAdapter>();
238
239        IJobAdapter jobAdapter =
240          session.GetDataAdapter<Job, IJobAdapter>();
241
242        tx = session.BeginTransaction();
243
244        ResponseHB response = new ResponseHB();
245        response.ActionRequest = new List<MessageContainer>();
246
247        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
248
249        // check if the client is logged in
250        if (client.State == State.offline || client.State == State.nullState) {
251          response.Success = false;
252          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
253          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
254          return response;
255        }
256
257        client.NrOfFreeCores = hbData.FreeCores;
258        client.FreeMemory = hbData.FreeMemory;
259
260        // save timestamp of this heartbeat
261        heartbeatLock.EnterWriteLock();
262        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
263          lastHeartbeats[hbData.ClientId] = DateTime.Now;
264        } else {
265          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
266        }
267        heartbeatLock.ExitWriteLock();
268
269        // check if client has a free core for a new job
270        // if true, ask scheduler for a new job for this client
271        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
272          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
273        else
274          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
275
276        response.Success = true;
277        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
278
279        processJobProcess(hbData, jobAdapter, clientAdapter, response);
280        clientAdapter.Update(client);
281
282        tx.Commit();
283        return response;
284      }
285      catch (Exception ex) {
286        if (tx != null)
287          tx.Rollback();
288        throw ex;
289      }
290      finally {
291        if (session != null)
292          session.EndSession();
293      }
294    }
295
296    /// <summary>
297    /// Process the Job progress sent by a client
298    /// </summary>
299    /// <param name="hbData"></param>
300    /// <param name="jobAdapter"></param>
301    /// <param name="clientAdapter"></param>
302    /// <param name="response"></param>
303    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
304      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
305        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
306        if (jobsOfClient == null || jobsOfClient.Count == 0) {
307          response.Success = false;
308          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
309          return;
310        }
311
312        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
313          Job curJob = jobAdapter.GetById(jobProgress.Key);
314          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
315            response.Success = false;
316            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
317          } else if (curJob.State == State.abort) {
318            // a request to abort the job has been set
319            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
320            curJob.State = State.finished;
321          } else {
322            // save job progress
323            curJob.Percentage = jobProgress.Value;
324
325            if (curJob.State == State.requestSnapshot) {
326              // a request for a snapshot has been set
327              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
328              curJob.State = State.requestSnapshotSent;
329            }
330          }
331          jobAdapter.Update(curJob);
332        }
333        foreach (Job currJob in jobsOfClient) {
334          bool found = false;
335          foreach (Guid jobId in hbData.JobProgress.Keys) {
336            if (jobId == currJob.Id) {
337              found = true;
338              break;
339            }
340          }
341          if (!found) {
342            lock (newAssignedJobs) {
343              if (newAssignedJobs.ContainsKey(currJob.Id)) {
344                newAssignedJobs[currJob.Id]--;
345
346                if (newAssignedJobs[currJob.Id] <= 0) {
347                  currJob.State = State.offline;
348                  jobAdapter.Update(currJob);
349                  newAssignedJobs.Remove(currJob.Id);
350                }
351              } else {
352                currJob.State = State.offline;
353                jobAdapter.Update(currJob);
354              }
355            } // lock
356          } else {
357            lock (newAssignedJobs) {
358              if (newAssignedJobs.ContainsKey(currJob.Id))
359                newAssignedJobs.Remove(currJob.Id);
360            }
361          }
362        }
363      }
364    }
365   
366    /// <summary>
367    /// if the client was told to pull a job he calls this method
368    /// the server selects a job and sends it to the client
369    /// </summary>
370    /// <param name="clientId"></param>
371    /// <returns></returns>
372    public ResponseJob SendJob(Guid clientId) {
373      ResponseJob response = new ResponseJob();
374
375      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
376      if (job2Calculate != null) {
377        response.Job = job2Calculate;
378        response.Success = true;
379        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
380        lock (newAssignedJobs) {
381          newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
382        }
383      } else {
384        response.Success = false;
385        response.Job = null;
386        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
387      }
388      return response;
389    }
390
391    private ResponseResultReceived ProcessJobResult(Guid clientId,
392      Guid jobId,
393      byte[] result,
394      double percentage,
395      Exception exception,
396      bool finished) {
397      ISession session = factory.GetSessionForCurrentThread();
398      ITransaction tx = null;
399
400      try {
401        IClientAdapter clientAdapter =
402          session.GetDataAdapter<ClientInfo, IClientAdapter>();
403        IJobAdapter jobAdapter =
404          session.GetDataAdapter<Job, IJobAdapter>();
405        IJobResultsAdapter jobResultAdapter =
406          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
407
408        tx = session.BeginTransaction();
409
410        ResponseResultReceived response = new ResponseResultReceived();
411        ClientInfo client =
412          clientAdapter.GetById(clientId);
413
414        Job job =
415          jobAdapter.GetById(jobId);
416       
417        if (job == null) {
418          response.Success = false;
419          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
420          response.JobId = jobId;
421          return response;
422        }
423        if (job.State == State.abort) {
424          response.Success = false;
425          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
426        }
427        if (job.Client == null) {
428          response.Success = false;
429          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
430          response.JobId = jobId;
431          return response;
432        }
433        if (job.Client.Id != clientId) {
434          response.Success = false;
435          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
436          response.JobId = jobId;
437          return response;
438        }
439        if (job.State == State.finished) {
440          response.Success = true;
441          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
442          response.JobId = jobId;
443          return response;
444        }
445        if (job.State == State.requestSnapshotSent) {
446          job.State = State.calculating;
447        }
448        if (job.State != State.calculating && job.State != State.pending) {
449          response.Success = false;
450          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
451          response.JobId = jobId;
452          return response;
453        }
454        job.SerializedJob = result;
455        job.Percentage = percentage;
456
457        if (finished) {
458          job.State = State.finished;
459          jobAdapter.Update(job);
460
461          client.State = State.idle;
462          clientAdapter.Update(client);
463        }
464        List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
465        foreach (JobResult currentResult in jobResults)
466          jobResultAdapter.Delete(currentResult);
467
468        JobResult jobResult =
469          new JobResult();
470        jobResult.ClientId = client.Id;
471        jobResult.JobId = job.Id;
472        jobResult.Result = result;
473        jobResult.Percentage = percentage;
474        jobResult.Exception = exception;
475        jobResult.DateFinished = DateTime.Now;
476
477        jobResultAdapter.Update(jobResult);
478        jobAdapter.Update(job);
479
480        response.Success = true;
481        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
482        response.JobId = jobId;
483        response.finished = finished;
484
485        tx.Commit();
486        return response;
487      }
488      catch (Exception ex) {
489        if (tx != null)
490          tx.Rollback();
491        throw ex;
492      }
493      finally {
494        if (session != null)
495          session.EndSession();
496      }
497    }
498
499
500    /// <summary>
501    /// the client can send job results during calculating
502    /// and will send a final job result when he finished calculating
503    /// these job results will be stored in the database
504    /// </summary>
505    /// <param name="clientId"></param>
506    /// <param name="jobId"></param>
507    /// <param name="result"></param>
508    /// <param name="exception"></param>
509    /// <param name="finished"></param>
510    /// <returns></returns>
511    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
512      Guid jobId,
513      byte[] result,
514      double percentage,
515      Exception exception) {
516
517      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
518    }
519
520
521    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
522      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
523    }
524
525    /// <summary>
526    /// when a client logs out the state will be set
527    /// and the entry in the last hearbeats dictionary will be removed
528    /// </summary>
529    /// <param name="clientId"></param>
530    /// <returns></returns>                       
531    public Response Logout(Guid clientId) {
532      ISession session = factory.GetSessionForCurrentThread();
533      ITransaction tx = null;
534
535      try {
536        IClientAdapter clientAdapter =
537          session.GetDataAdapter<ClientInfo, IClientAdapter>();
538        IJobAdapter jobAdapter =
539          session.GetDataAdapter<Job, IJobAdapter>();
540
541        tx = session.BeginTransaction();
542
543        Response response = new Response();
544
545        heartbeatLock.EnterWriteLock();
546        if (lastHeartbeats.ContainsKey(clientId))
547          lastHeartbeats.Remove(clientId);
548        heartbeatLock.ExitWriteLock();
549
550        ClientInfo client = clientAdapter.GetById(clientId);
551        if (client == null) {
552          response.Success = false;
553          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
554          return response;
555        }
556        if (client.State == State.calculating) {
557          // check wich job the client was calculating and reset it
558          ICollection<Job> jobsOfClient = jobAdapter.GetJobsOf(client);
559          foreach (Job job in jobsOfClient) {
560            if (job.State != State.finished)
561              jobManager.ResetJobsDependingOnResults(job);
562          }
563        }
564
565        client.State = State.offline;
566        clientAdapter.Update(client);
567
568        response.Success = true;
569        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
570
571        tx.Commit();
572        return response;
573      }
574      catch (Exception ex) {
575        if (tx != null)
576          tx.Rollback();
577        throw ex;
578      }
579      finally {
580        if (session != null)
581          session.EndSession();
582      }
583    }
584
585    /// <summary>
586    /// If a client goes offline and restores a job he was calculating
587    /// he can ask the client if he still needs the job result
588    /// </summary>
589    /// <param name="jobId"></param>
590    /// <returns></returns>
591    public Response IsJobStillNeeded(Guid jobId) {
592      ISession session = factory.GetSessionForCurrentThread();
593      ITransaction tx = null;
594
595      try {
596        IJobAdapter jobAdapter =
597          session.GetDataAdapter<Job, IJobAdapter>();
598        tx = session.BeginTransaction();
599
600        Response response = new Response();
601        Job job = jobAdapter.GetById(jobId);
602        if (job == null) {
603          response.Success = false;
604          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
605          return response;
606        }
607        if (job.State == State.finished) {
608          response.Success = true;
609          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
610          return response;
611        }
612        job.State = State.pending;
613        lock (pendingJobs) {
614          pendingJobs.Add(job.Id, PENDING_TIMEOUT);
615        }
616
617        jobAdapter.Update(job);
618
619        response.Success = true;
620        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
621        tx.Commit();
622        return response;
623      }
624      catch (Exception ex) {
625        if (tx != null)
626          tx.Rollback();
627        throw ex;
628      }
629      finally {
630        if (session != null)
631          session.EndSession();
632      }
633    }
634
635    public ResponsePlugin SendPlugins(List<HivePluginInfo> pluginList) {
636      ResponsePlugin response = new ResponsePlugin();
637      PluginManager.Manager.Initialize();
638      ICollection<PluginInfo> allActivePlugins = PluginManager.Manager.ActivePlugins;
639
640      foreach (HivePluginInfo pluginInfo in pluginList) {
641        // TODO: BuildDate deleted, not needed???
642        // TODO: Split version to major, minor and revision number
643        foreach (PluginInfo currPlugin in allActivePlugins) {
644          if (currPlugin.Name == pluginInfo.Name
645              && currPlugin.Version.ToString() == pluginInfo.Version) {
646
647            CachedHivePluginInfo currCachedPlugin = new CachedHivePluginInfo {
648                Name = currPlugin.Name,
649                Version = currPlugin.Version.ToString(),
650                BuildDate = currPlugin.BuildDate };
651
652            foreach (String assemblyPath in currPlugin.Assemblies) {
653              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(assemblyPath));
654            }
655            response.Plugins.Add(currCachedPlugin);
656          }
657        }
658      }
659      response.Success = true;
660      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
661
662      return response;
663
664    }
665
666    #endregion
667  }
668}
Note: See TracBrowser for help on using the repository browser.