Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 2022

Last change on this file since 2022 was 2022, checked in by msteinbi, 15 years ago

Reset of client state when job finshed corrected (#466)

File size: 23.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39
40namespace HeuristicLab.Hive.Server.Core {
41  /// <summary>
42  /// The ClientCommunicator manages the whole communication with the client
43  /// </summary>
44  public class ClientCommunicator: IClientCommunicator {
45    private static Dictionary<Guid, DateTime> lastHeartbeats =
46      new Dictionary<Guid,DateTime>();
47    private static Dictionary<Guid, int> newAssignedJobs =
48      new Dictionary<Guid, int>();
49    private static Dictionary<Guid, int> pendingJobs =
50      new Dictionary<Guid, int>();
51
52    private static ReaderWriterLockSlim heartbeatLock =
53      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
54
55    private ISessionFactory factory;
56    private ILifecycleManager lifecycleManager;
57    private IInternalJobManager jobManager;
58    private IScheduler scheduler;
59
60    private static int PENDING_TIMEOUT = 100;
61
62    /// <summary>
63    /// Initialization of the Adapters to the database
64    /// Initialization of Eventhandler for the lifecycle management
65    /// Initialization of lastHearbeats Dictionary
66    /// </summary>
67    public ClientCommunicator() {
68      factory = ServiceLocator.GetSessionFactory();
69     
70      lifecycleManager = ServiceLocator.GetLifecycleManager();
71      jobManager = ServiceLocator.GetJobManager() as
72        IInternalJobManager;
73      scheduler = ServiceLocator.GetScheduler();
74
75      lifecycleManager.RegisterHeartbeat(
76        new EventHandler(lifecycleManager_OnServerHeartbeat));
77    }
78
79    /// <summary>
80    /// Check if online clients send their hearbeats
81    /// if not -> set them offline and check if they where calculating a job
82    /// </summary>
83    /// <param name="sender"></param>
84    /// <param name="e"></param>
85    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
86      ISession session = factory.GetSessionForCurrentThread();
87      ITransaction tx = null;
88
89      try {
90        IClientAdapter clientAdapter =
91          session.GetDataAdapter<ClientInfo, IClientAdapter>();
92        IJobAdapter jobAdapter =
93          session.GetDataAdapter<Job, IJobAdapter>();
94
95        tx = session.BeginTransaction();
96
97        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
98
99        foreach (ClientInfo client in allClients) {
100          if (client.State != State.offline && client.State != State.nullState) {
101            heartbeatLock.EnterUpgradeableReadLock();
102
103            if (!lastHeartbeats.ContainsKey(client.Id)) {
104              client.State = State.offline;
105              clientAdapter.Update(client);
106              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
107                jobManager.ResetJobsDependingOnResults(job);
108              }
109            } else {
110              DateTime lastHbOfClient = lastHeartbeats[client.Id];
111
112              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
113              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
114              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
115                // if client calculated jobs, the job must be reset
116                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
117                  jobManager.ResetJobsDependingOnResults(job);
118                  lock (newAssignedJobs) {
119                    if (newAssignedJobs.ContainsKey(job.Id))
120                      newAssignedJobs.Remove(job.Id);
121                  }
122                }
123
124                // client must be set offline
125                client.State = State.offline;
126                clientAdapter.Update(client);
127
128                heartbeatLock.EnterWriteLock();
129                lastHeartbeats.Remove(client.Id);
130                heartbeatLock.ExitWriteLock();
131              }
132            }
133
134            heartbeatLock.ExitUpgradeableReadLock();
135          } else {
136            heartbeatLock.EnterWriteLock();
137            if (lastHeartbeats.ContainsKey(client.Id))
138              lastHeartbeats.Remove(client.Id);
139            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
140              jobManager.ResetJobsDependingOnResults(job);
141            }
142            heartbeatLock.ExitWriteLock();
143          }
144        }
145        CheckForPendingJobs(jobAdapter);
146
147        tx.Commit();
148      }
149      catch (Exception ex) {
150        if (tx != null)
151          tx.Rollback();
152        throw ex;
153      }
154      finally {
155        if (session != null)
156          session.EndSession();
157      }
158    }
159
160    private void CheckForPendingJobs(IJobAdapter jobAdapter) {
161      IList<Job> pendingJobsInDB = new List<Job>(jobAdapter.GetJobsByState(State.pending));
162
163      foreach (Job currJob in pendingJobsInDB) {
164        lock (pendingJobs) {
165          if (pendingJobs.ContainsKey(currJob.Id)) {
166            if (pendingJobs[currJob.Id] <= 0) {
167              currJob.State = State.offline;
168              jobAdapter.Update(currJob);
169            } else {
170              pendingJobs[currJob.Id]--;
171            }
172          }
173        }
174      }
175    }
176
177    #region IClientCommunicator Members
178
179    /// <summary>
180    /// Login process for the client
181    /// A hearbeat entry is created as well (login is the first hearbeat)
182    /// </summary>
183    /// <param name="clientInfo"></param>
184    /// <returns></returns>
185    public Response Login(ClientInfo clientInfo) {
186      ISession session = factory.GetSessionForCurrentThread();
187      ITransaction tx = null;
188
189      try {
190        IClientAdapter clientAdapter =
191          session.GetDataAdapter<ClientInfo, IClientAdapter>();
192
193        tx = session.BeginTransaction();
194
195        Response response = new Response();
196
197        heartbeatLock.EnterWriteLock();
198        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
199          lastHeartbeats[clientInfo.Id] = DateTime.Now;
200        } else {
201          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
202        }
203        heartbeatLock.ExitWriteLock();
204
205        clientInfo.State = State.idle;
206        clientAdapter.Update(clientInfo);
207        response.Success = true;
208        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
209
210        tx.Commit();
211        return response;
212      }
213      catch (Exception ex) {
214        if (tx != null)
215          tx.Rollback();
216        throw ex;
217      }
218      finally {
219        if (session != null)
220          session.EndSession();
221      }
222    }
223
224    /// <summary>
225    /// The client has to send regulary heartbeats
226    /// this hearbeats will be stored in the heartbeats dictionary
227    /// check if there is work for the client and send the client a response if he should pull a job
228    /// </summary>
229    /// <param name="hbData"></param>
230    /// <returns></returns>
231    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
232      ISession session = factory.GetSessionForCurrentThread();
233      ITransaction tx = null;
234
235      try {
236        IClientAdapter clientAdapter =
237          session.GetDataAdapter<ClientInfo, IClientAdapter>();
238
239        IJobAdapter jobAdapter =
240          session.GetDataAdapter<Job, IJobAdapter>();
241
242        tx = session.BeginTransaction();
243
244        ResponseHB response = new ResponseHB();
245        response.ActionRequest = new List<MessageContainer>();
246
247        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
248
249        // check if the client is logged in
250        if (client.State == State.offline || client.State == State.nullState) {
251          response.Success = false;
252          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
253          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
254          return response;
255        }
256
257        client.NrOfFreeCores = hbData.FreeCores;
258        client.FreeMemory = hbData.FreeMemory;
259
260        // save timestamp of this heartbeat
261        heartbeatLock.EnterWriteLock();
262        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
263          lastHeartbeats[hbData.ClientId] = DateTime.Now;
264        } else {
265          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
266        }
267        heartbeatLock.ExitWriteLock();
268
269        // check if client has a free core for a new job
270        // if true, ask scheduler for a new job for this client
271        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
272          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
273        else
274          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
275
276        response.Success = true;
277        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
278
279        processJobProcess(hbData, jobAdapter, clientAdapter, response);
280        clientAdapter.Update(client);
281
282        tx.Commit();
283        return response;
284      }
285      catch (Exception ex) {
286        if (tx != null)
287          tx.Rollback();
288        throw ex;
289      }
290      finally {
291        if (session != null)
292          session.EndSession();
293      }
294    }
295
296    /// <summary>
297    /// Process the Job progress sent by a client
298    /// </summary>
299    /// <param name="hbData"></param>
300    /// <param name="jobAdapter"></param>
301    /// <param name="clientAdapter"></param>
302    /// <param name="response"></param>
303    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
304      if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
305        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
306        if (jobsOfClient == null || jobsOfClient.Count == 0) {
307          response.Success = false;
308          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
309          return;
310        }
311
312        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
313          Job curJob = jobAdapter.GetById(jobProgress.Key);
314          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
315            response.Success = false;
316            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
317          } else if (curJob.State == State.abort) {
318            // a request to abort the job has been set
319            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
320            curJob.State = State.finished;
321          } else {
322            // save job progress
323            curJob.Percentage = jobProgress.Value;
324
325            if (curJob.State == State.requestSnapshot) {
326              // a request for a snapshot has been set
327              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
328              curJob.State = State.requestSnapshotSent;
329            }
330          }
331          jobAdapter.Update(curJob);
332        }
333        foreach (Job currJob in jobsOfClient) {
334          bool found = false;
335          foreach (Guid jobId in hbData.JobProgress.Keys) {
336            if (jobId == currJob.Id) {
337              found = true;
338              break;
339            }
340          }
341          if (!found) {
342            lock (newAssignedJobs) {
343              if (newAssignedJobs.ContainsKey(currJob.Id)) {
344                newAssignedJobs[currJob.Id]--;
345
346                if (newAssignedJobs[currJob.Id] <= 0) {
347                  currJob.State = State.offline;
348                  jobAdapter.Update(currJob);
349                  newAssignedJobs.Remove(currJob.Id);
350                }
351              } else {
352                currJob.State = State.offline;
353                jobAdapter.Update(currJob);
354              }
355            } // lock
356          } else {
357            lock (newAssignedJobs) {
358              if (newAssignedJobs.ContainsKey(currJob.Id))
359                newAssignedJobs.Remove(currJob.Id);
360            }
361          }
362        }
363      }
364    }
365   
366    /// <summary>
367    /// if the client was told to pull a job he calls this method
368    /// the server selects a job and sends it to the client
369    /// </summary>
370    /// <param name="clientId"></param>
371    /// <returns></returns>
372    public ResponseJob SendJob(Guid clientId) {
373      ResponseJob response = new ResponseJob();
374
375      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
376      if (job2Calculate != null) {
377        response.Job = job2Calculate;
378        response.Success = true;
379        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
380        lock (newAssignedJobs) {
381          newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
382        }
383      } else {
384        response.Success = false;
385        response.Job = null;
386        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
387      }
388      return response;
389    }
390
391    private ResponseResultReceived ProcessJobResult(Guid clientId,
392      Guid jobId,
393      byte[] result,
394      double percentage,
395      Exception exception,
396      bool finished) {
397      ISession session = factory.GetSessionForCurrentThread();
398      ITransaction tx = null;
399
400      try {
401        IClientAdapter clientAdapter =
402          session.GetDataAdapter<ClientInfo, IClientAdapter>();
403        IJobAdapter jobAdapter =
404          session.GetDataAdapter<Job, IJobAdapter>();
405        IJobResultsAdapter jobResultAdapter =
406          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
407
408        tx = session.BeginTransaction();
409
410        ResponseResultReceived response = new ResponseResultReceived();
411        ClientInfo client =
412          clientAdapter.GetById(clientId);
413
414        Job job =
415          jobAdapter.GetById(jobId);
416       
417        if (job == null) {
418          response.Success = false;
419          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
420          response.JobId = jobId;
421          return response;
422        }
423        if (job.State == State.abort) {
424          response.Success = false;
425          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
426        }
427        if (job.Client == null) {
428          response.Success = false;
429          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
430          response.JobId = jobId;
431          return response;
432        }
433        if (job.Client.Id != clientId) {
434          response.Success = false;
435          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
436          response.JobId = jobId;
437          return response;
438        }
439        if (job.State == State.finished) {
440          response.Success = true;
441          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
442          response.JobId = jobId;
443          return response;
444        }
445        if (job.State == State.requestSnapshotSent) {
446          job.State = State.calculating;
447        }
448        if (job.State != State.calculating && job.State != State.pending) {
449          response.Success = false;
450          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
451          response.JobId = jobId;
452          return response;
453        }
454        job.SerializedJob = result;
455        job.Percentage = percentage;
456
457        if (finished) {
458          job.State = State.finished;
459          jobAdapter.Update(job);
460        }
461        List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
462        foreach (JobResult currentResult in jobResults)
463          jobResultAdapter.Delete(currentResult);
464
465        JobResult jobResult =
466          new JobResult();
467        jobResult.ClientId = client.Id;
468        jobResult.JobId = job.Id;
469        jobResult.Result = result;
470        jobResult.Percentage = percentage;
471        jobResult.Exception = exception;
472        jobResult.DateFinished = DateTime.Now;
473
474        jobResultAdapter.Update(jobResult);
475        jobAdapter.Update(job);
476
477        response.Success = true;
478        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
479        response.JobId = jobId;
480        response.finished = finished;
481
482        tx.Commit();
483        return response;
484      }
485      catch (Exception ex) {
486        if (tx != null)
487          tx.Rollback();
488        throw ex;
489      }
490      finally {
491        if (session != null)
492          session.EndSession();
493      }
494    }
495
496
497    /// <summary>
498    /// the client can send job results during calculating
499    /// and will send a final job result when he finished calculating
500    /// these job results will be stored in the database
501    /// </summary>
502    /// <param name="clientId"></param>
503    /// <param name="jobId"></param>
504    /// <param name="result"></param>
505    /// <param name="exception"></param>
506    /// <param name="finished"></param>
507    /// <returns></returns>
508    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
509      Guid jobId,
510      byte[] result,
511      double percentage,
512      Exception exception) {
513
514      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
515    }
516
517
518    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
519      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
520    }
521
522    /// <summary>
523    /// when a client logs out the state will be set
524    /// and the entry in the last hearbeats dictionary will be removed
525    /// </summary>
526    /// <param name="clientId"></param>
527    /// <returns></returns>                       
528    public Response Logout(Guid clientId) {
529      ISession session = factory.GetSessionForCurrentThread();
530      ITransaction tx = null;
531
532      try {
533        IClientAdapter clientAdapter =
534          session.GetDataAdapter<ClientInfo, IClientAdapter>();
535        IJobAdapter jobAdapter =
536          session.GetDataAdapter<Job, IJobAdapter>();
537
538        tx = session.BeginTransaction();
539
540        Response response = new Response();
541
542        heartbeatLock.EnterWriteLock();
543        if (lastHeartbeats.ContainsKey(clientId))
544          lastHeartbeats.Remove(clientId);
545        heartbeatLock.ExitWriteLock();
546
547        ClientInfo client = clientAdapter.GetById(clientId);
548        if (client == null) {
549          response.Success = false;
550          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
551          return response;
552        }
553        if (client.State == State.calculating) {
554          // check wich job the client was calculating and reset it
555          ICollection<Job> jobsOfClient = jobAdapter.GetJobsOf(client);
556          foreach (Job job in jobsOfClient) {
557            if (job.State != State.finished)
558              jobManager.ResetJobsDependingOnResults(job);
559          }
560        }
561
562        client.State = State.offline;
563        clientAdapter.Update(client);
564
565        response.Success = true;
566        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
567
568        tx.Commit();
569        return response;
570      }
571      catch (Exception ex) {
572        if (tx != null)
573          tx.Rollback();
574        throw ex;
575      }
576      finally {
577        if (session != null)
578          session.EndSession();
579      }
580    }
581
582    /// <summary>
583    /// If a client goes offline and restores a job he was calculating
584    /// he can ask the client if he still needs the job result
585    /// </summary>
586    /// <param name="jobId"></param>
587    /// <returns></returns>
588    public Response IsJobStillNeeded(Guid jobId) {
589      ISession session = factory.GetSessionForCurrentThread();
590      ITransaction tx = null;
591
592      try {
593        IJobAdapter jobAdapter =
594          session.GetDataAdapter<Job, IJobAdapter>();
595        tx = session.BeginTransaction();
596
597        Response response = new Response();
598        Job job = jobAdapter.GetById(jobId);
599        if (job == null) {
600          response.Success = false;
601          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
602          return response;
603        }
604        if (job.State == State.finished) {
605          response.Success = true;
606          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
607          return response;
608        }
609        job.State = State.pending;
610        lock (pendingJobs) {
611          pendingJobs.Add(job.Id, PENDING_TIMEOUT);
612        }
613
614        jobAdapter.Update(job);
615
616        response.Success = true;
617        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
618        tx.Commit();
619        return response;
620      }
621      catch (Exception ex) {
622        if (tx != null)
623          tx.Rollback();
624        throw ex;
625      }
626      finally {
627        if (session != null)
628          session.EndSession();
629      }
630    }
631
632    public ResponsePlugin SendPlugins(List<HivePluginInfo> pluginList) {
633      ResponsePlugin response = new ResponsePlugin();
634      PluginManager.Manager.Initialize();
635      ICollection<PluginInfo> allActivePlugins = PluginManager.Manager.ActivePlugins;
636
637      foreach (HivePluginInfo pluginInfo in pluginList) {
638        // TODO: BuildDate deleted, not needed???
639        // TODO: Split version to major, minor and revision number
640        foreach (PluginInfo currPlugin in allActivePlugins) {
641          if (currPlugin.Name == pluginInfo.Name
642              && currPlugin.Version.ToString() == pluginInfo.Version) {
643
644            CachedHivePluginInfo currCachedPlugin = new CachedHivePluginInfo {
645                Name = currPlugin.Name,
646                Version = currPlugin.Version.ToString(),
647                BuildDate = currPlugin.BuildDate };
648
649            foreach (String assemblyPath in currPlugin.Assemblies) {
650              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(assemblyPath));
651            }
652            response.Plugins.Add(currCachedPlugin);
653          }
654        }
655      }
656      response.Success = true;
657      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
658
659      return response;
660
661    }
662
663    #endregion
664  }
665}
Note: See TracBrowser for help on using the repository browser.