Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 1770

Last change on this file since 1770 was 1770, checked in by msteinbi, 15 years ago

new functionality for snapshot requesting (#531)

File size: 22.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39
40namespace HeuristicLab.Hive.Server.Core {
41  /// <summary>
42  /// The ClientCommunicator manages the whole communication with the client
43  /// </summary>
44  public class ClientCommunicator: IClientCommunicator {
45    private static Dictionary<Guid, DateTime> lastHeartbeats =
46      new Dictionary<Guid,DateTime>();
47    private static Dictionary<Guid, int> newAssignedJobs =
48      new Dictionary<Guid, int>();
49
50    private static ReaderWriterLockSlim heartbeatLock =
51      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
52
53    private ISessionFactory factory;
54    private ILifecycleManager lifecycleManager;
55    private IInternalJobManager jobManager;
56    private IScheduler scheduler;
57
58    /// <summary>
59    /// Initialization of the Adapters to the database
60    /// Initialization of Eventhandler for the lifecycle management
61    /// Initialization of lastHearbeats Dictionary
62    /// </summary>
63    public ClientCommunicator() {
64      factory = ServiceLocator.GetSessionFactory();
65     
66      lifecycleManager = ServiceLocator.GetLifecycleManager();
67      jobManager = ServiceLocator.GetJobManager() as
68        IInternalJobManager;
69      scheduler = ServiceLocator.GetScheduler();
70
71      lifecycleManager.RegisterHeartbeat(
72        new EventHandler(lifecycleManager_OnServerHeartbeat));
73    }
74
75    /// <summary>
76    /// Check if online clients send their hearbeats
77    /// if not -> set them offline and check if they where calculating a job
78    /// </summary>
79    /// <param name="sender"></param>
80    /// <param name="e"></param>
81    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
82      ISession session = factory.GetSessionForCurrentThread();
83      ITransaction tx = null;
84
85      try {
86        IClientAdapter clientAdapter =
87          session.GetDataAdapter<ClientInfo, IClientAdapter>();
88        IJobAdapter jobAdapter =
89          session.GetDataAdapter<Job, IJobAdapter>();
90
91        tx = session.BeginTransaction();
92
93        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
94
95        foreach (ClientInfo client in allClients) {
96          if (client.State != State.offline && client.State != State.nullState) {
97            heartbeatLock.EnterUpgradeableReadLock();
98
99            if (!lastHeartbeats.ContainsKey(client.Id)) {
100              client.State = State.offline;
101              clientAdapter.Update(client);
102              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
103                jobManager.ResetJobsDependingOnResults(job);
104              }
105            } else {
106              DateTime lastHbOfClient = lastHeartbeats[client.Id];
107
108              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
109              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
110              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
111                // if client calculated jobs, the job must be reset
112                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
113                  jobManager.ResetJobsDependingOnResults(job);
114                  lock (newAssignedJobs) {
115                    if (newAssignedJobs.ContainsKey(job.Id))
116                      newAssignedJobs.Remove(job.Id);
117                  }
118                }
119
120                // client must be set offline
121                client.State = State.offline;
122                clientAdapter.Update(client);
123
124                heartbeatLock.EnterWriteLock();
125                lastHeartbeats.Remove(client.Id);
126                heartbeatLock.ExitWriteLock();
127              }
128            }
129
130            heartbeatLock.ExitUpgradeableReadLock();
131          } else {
132            heartbeatLock.EnterWriteLock();
133            if (lastHeartbeats.ContainsKey(client.Id))
134              lastHeartbeats.Remove(client.Id);
135            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
136              jobManager.ResetJobsDependingOnResults(job);
137            }
138            heartbeatLock.ExitWriteLock();
139          }
140        }
141        tx.Commit();
142      }
143      catch (Exception ex) {
144        if (tx != null)
145          tx.Rollback();
146        throw ex;
147      }
148      finally {
149        if (session != null)
150          session.EndSession();
151      }
152    }
153
154    #region IClientCommunicator Members
155
156    /// <summary>
157    /// Login process for the client
158    /// A hearbeat entry is created as well (login is the first hearbeat)
159    /// </summary>
160    /// <param name="clientInfo"></param>
161    /// <returns></returns>
162    public Response Login(ClientInfo clientInfo) {
163      ISession session = factory.GetSessionForCurrentThread();
164      ITransaction tx = null;
165
166      try {
167        IClientAdapter clientAdapter =
168          session.GetDataAdapter<ClientInfo, IClientAdapter>();
169
170        tx = session.BeginTransaction();
171
172        Response response = new Response();
173
174        heartbeatLock.EnterWriteLock();
175        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
176          lastHeartbeats[clientInfo.Id] = DateTime.Now;
177        } else {
178          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
179        }
180        heartbeatLock.ExitWriteLock();
181
182        ClientInfo client = clientAdapter.GetById(clientInfo.Id);
183        if (client != null && client.State != State.offline && client.State != State.nullState) {
184          response.Success = false;
185          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
186          return response;
187        }
188        clientInfo.State = State.idle;
189        clientAdapter.Update(clientInfo);
190        response.Success = true;
191        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
192
193        tx.Commit();
194        return response;
195      }
196      catch (Exception ex) {
197        if (tx != null)
198          tx.Rollback();
199        throw ex;
200      }
201      finally {
202        if (session != null)
203          session.EndSession();
204      }
205    }
206
207    /// <summary>
208    /// The client has to send regulary heartbeats
209    /// this hearbeats will be stored in the heartbeats dictionary
210    /// check if there is work for the client and send the client a response if he should pull a job
211    /// </summary>
212    /// <param name="hbData"></param>
213    /// <returns></returns>
214    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
215      ISession session = factory.GetSessionForCurrentThread();
216      ITransaction tx = null;
217
218      try {
219        IClientAdapter clientAdapter =
220          session.GetDataAdapter<ClientInfo, IClientAdapter>();
221
222        IJobAdapter jobAdapter =
223          session.GetDataAdapter<Job, IJobAdapter>();
224
225        tx = session.BeginTransaction();
226
227        ResponseHB response = new ResponseHB();
228        response.ActionRequest = new List<MessageContainer>();
229
230        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
231
232        // check if the client is logged in
233        if (client.State == State.offline || client.State == State.nullState) {
234          response.Success = false;
235          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
236          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
237          return response;
238        }
239
240        client.NrOfFreeCores = hbData.FreeCores;
241        client.FreeMemory = hbData.FreeMemory;
242
243        // save timestamp of this heartbeat
244        heartbeatLock.EnterWriteLock();
245        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
246          lastHeartbeats[hbData.ClientId] = DateTime.Now;
247        } else {
248          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
249        }
250        heartbeatLock.ExitWriteLock();
251
252        // check if client has a free core for a new job
253        // if true, ask scheduler for a new job for this client
254        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
255          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
256        else
257          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
258
259        response.Success = true;
260        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
261
262        processJobProcess(hbData, jobAdapter, clientAdapter, response);
263        clientAdapter.Update(client);
264
265        tx.Commit();
266        return response;
267      }
268      catch (Exception ex) {
269        if (tx != null)
270          tx.Rollback();
271        throw ex;
272      }
273      finally {
274        if (session != null)
275          session.EndSession();
276      }
277    }
278
279    /// <summary>
280    /// Process the Job progress sent by a client
281    /// </summary>
282    /// <param name="hbData"></param>
283    /// <param name="jobAdapter"></param>
284    /// <param name="clientAdapter"></param>
285    /// <param name="response"></param>
286    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
287      if (hbData.JobProgress != null) {
288        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
289        if (jobsOfClient == null || jobsOfClient.Count == 0) {
290          response.Success = false;
291          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
292          return;
293        }
294
295        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
296          Job curJob = jobAdapter.GetById(jobProgress.Key);
297          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
298            response.Success = false;
299            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
300          } else if (curJob.State == State.abort) {
301            // a request to abort the job has been set
302            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
303            curJob.State = State.finished;
304          } else {
305            // save job progress
306            curJob.Percentage = jobProgress.Value;
307            jobAdapter.Update(curJob);
308
309            if (curJob.State == State.requestSnapshot) {
310              // a request for a snapshot has been set
311              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
312              curJob.State = State.calculating;
313            }
314          }
315        }
316        foreach (Job currJob in jobsOfClient) {
317          bool found = false;
318          foreach (Guid jobId in hbData.JobProgress.Keys) {
319            if (jobId == currJob.Id) {
320              found = true;
321              break;
322            }
323          }
324          if (!found) {
325            lock (newAssignedJobs) {
326              if (newAssignedJobs.ContainsKey(currJob.Id)) {
327                newAssignedJobs[currJob.Id]--;
328
329                if (newAssignedJobs[currJob.Id] <= 0) {
330                  currJob.State = State.offline;
331                  jobAdapter.Update(currJob);
332                  newAssignedJobs.Remove(currJob.Id);
333                }
334              } else {
335                currJob.State = State.offline;
336                jobAdapter.Update(currJob);
337              }
338            } // lock
339          } else {
340            lock (newAssignedJobs) {
341              if (newAssignedJobs.ContainsKey(currJob.Id))
342                newAssignedJobs.Remove(currJob.Id);
343            }
344          }
345        }
346      }
347    }
348   
349    /// <summary>
350    /// if the client was told to pull a job he calls this method
351    /// the server selects a job and sends it to the client
352    /// </summary>
353    /// <param name="clientId"></param>
354    /// <returns></returns>
355    public ResponseJob SendJob(Guid clientId) {
356      ResponseJob response = new ResponseJob();
357
358      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
359      if (job2Calculate != null) {
360        response.Job = job2Calculate;
361        response.Success = true;
362        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
363        lock (newAssignedJobs) {
364          newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
365        }
366      } else {
367        response.Success = false;
368        response.Job = null;
369        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
370      }
371      return response;
372    }
373
374    private ResponseResultReceived ProcessJobResult(Guid clientId,
375      Guid jobId,
376      byte[] result,
377      double percentage,
378      Exception exception,
379      bool finished) {
380      ISession session = factory.GetSessionForCurrentThread();
381      ITransaction tx = null;
382
383      try {
384        IClientAdapter clientAdapter =
385          session.GetDataAdapter<ClientInfo, IClientAdapter>();
386        IJobAdapter jobAdapter =
387          session.GetDataAdapter<Job, IJobAdapter>();
388        IJobResultsAdapter jobResultAdapter =
389          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
390
391        tx = session.BeginTransaction();
392
393        ResponseResultReceived response = new ResponseResultReceived();
394        ClientInfo client =
395          clientAdapter.GetById(clientId);
396
397        Job job =
398          jobAdapter.GetById(jobId);
399
400        if (job == null) {
401          response.Success = false;
402          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
403          response.JobId = jobId;
404          return response;
405        }
406        if (job.Client == null) {
407          response.Success = false;
408          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
409          response.JobId = jobId;
410          return response;
411        }
412        if (job.Client.Id != clientId) {
413          response.Success = false;
414          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
415          response.JobId = jobId;
416          return response;
417        }
418        if (job.State == State.finished) {
419          response.Success = true;
420          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
421          response.JobId = jobId;
422          return response;
423        }
424        if (job.State != State.calculating) {
425          response.Success = false;
426          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
427          response.JobId = jobId;
428          return response;
429        }
430        job.SerializedJob = result;
431        job.Percentage = percentage;
432
433        if (finished) {
434          job.State = State.finished;
435          jobAdapter.Update(job);
436
437          client.State = State.idle;
438          clientAdapter.Update(client);
439
440          List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
441          foreach (JobResult currentResult in jobResults)
442            jobResultAdapter.Delete(currentResult);
443        }
444
445        JobResult jobResult =
446          new JobResult();
447        jobResult.Client = client;
448        jobResult.Job = job;
449        jobResult.Result = result;
450        jobResult.Percentage = percentage;
451        jobResult.Exception = exception;
452        jobResult.DateFinished = DateTime.Now;
453
454        jobResultAdapter.Update(jobResult);
455        jobAdapter.Update(job);
456
457        response.Success = true;
458        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
459        response.JobId = jobId;
460        response.finished = finished;
461
462        tx.Commit();
463        return response;
464      }
465      catch (Exception ex) {
466        if (tx != null)
467          tx.Rollback();
468        throw ex;
469      }
470      finally {
471        if (session != null)
472          session.EndSession();
473      }
474    }
475
476
477    /// <summary>
478    /// the client can send job results during calculating
479    /// and will send a final job result when he finished calculating
480    /// these job results will be stored in the database
481    /// </summary>
482    /// <param name="clientId"></param>
483    /// <param name="jobId"></param>
484    /// <param name="result"></param>
485    /// <param name="exception"></param>
486    /// <param name="finished"></param>
487    /// <returns></returns>
488    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
489      Guid jobId,
490      byte[] result,
491      double percentage,
492      Exception exception) {
493
494      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
495    }
496
497
498    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
499      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
500    }
501
502    /// <summary>
503    /// when a client logs out the state will be set
504    /// and the entry in the last hearbeats dictionary will be removed
505    /// </summary>
506    /// <param name="clientId"></param>
507    /// <returns></returns>                       
508    public Response Logout(Guid clientId) {
509      ISession session = factory.GetSessionForCurrentThread();
510      ITransaction tx = null;
511
512      try {
513        IClientAdapter clientAdapter =
514          session.GetDataAdapter<ClientInfo, IClientAdapter>();
515        IJobAdapter jobAdapter =
516          session.GetDataAdapter<Job, IJobAdapter>();
517
518        tx = session.BeginTransaction();
519
520        Response response = new Response();
521
522        heartbeatLock.EnterWriteLock();
523        if (lastHeartbeats.ContainsKey(clientId))
524          lastHeartbeats.Remove(clientId);
525        heartbeatLock.ExitWriteLock();
526
527        ClientInfo client = clientAdapter.GetById(clientId);
528        if (client == null) {
529          response.Success = false;
530          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
531          return response;
532        }
533        List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
534        if (client.State == State.calculating) {
535          // check wich job the client was calculating and reset it
536          foreach (Job job in allJobs) {
537            if (job.Client != null) {
538              if (job.Client.Id == client.Id) {
539                jobManager.ResetJobsDependingOnResults(job);
540              }
541            }
542          }
543        }
544
545        client.State = State.offline;
546        clientAdapter.Update(client);
547
548        response.Success = true;
549        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
550
551        tx.Commit();
552        return response;
553      }
554      catch (Exception ex) {
555        if (tx != null)
556          tx.Rollback();
557        throw ex;
558      }
559      finally {
560        if (session != null)
561          session.EndSession();
562      }
563    }
564
565    /// <summary>
566    /// If a client goes offline and restores a job he was calculating
567    /// he can ask the client if he still needs the job result
568    /// </summary>
569    /// <param name="jobId"></param>
570    /// <returns></returns>
571    public Response IsJobStillNeeded(Guid jobId) {
572      ISession session = factory.GetSessionForCurrentThread();
573      ITransaction tx = null;
574
575      try {
576        IJobAdapter jobAdapter =
577          session.GetDataAdapter<Job, IJobAdapter>();
578        tx = session.BeginTransaction();
579
580        Response response = new Response();
581        Job job = jobAdapter.GetById(jobId);
582        if (job == null) {
583          response.Success = false;
584          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
585          return response;
586        }
587        if (job.State == State.finished) {
588          response.Success = true;
589          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
590          return response;
591        }
592        job.State = State.finished;
593        jobAdapter.Update(job);
594
595        response.Success = true;
596        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
597        tx.Commit();
598        return response;
599      }
600      catch (Exception ex) {
601        if (tx != null)
602          tx.Rollback();
603        throw ex;
604      }
605      finally {
606        if (session != null)
607          session.EndSession();
608      }
609    }
610
611    public ResponsePlugin SendPlugins(List<HivePluginInfo> pluginList) {
612      ResponsePlugin response = new ResponsePlugin();
613      PluginManager.Manager.Initialize();
614      ICollection<PluginInfo> allActivePlugins = PluginManager.Manager.ActivePlugins;
615
616      foreach (HivePluginInfo pluginInfo in pluginList) {
617        // TODO: BuildDate deleted, not needed???
618        // TODO: Split version to major, minor and revision number
619        foreach (PluginInfo currPlugin in allActivePlugins) {
620          if (currPlugin.Name == pluginInfo.Name
621              && currPlugin.Version.ToString() == pluginInfo.Version) {
622
623            CachedHivePluginInfo currCachedPlugin = new CachedHivePluginInfo {
624                Name = currPlugin.Name,
625                Version = currPlugin.Version.ToString(),
626                BuildDate = currPlugin.BuildDate };
627
628            foreach (String assemblyPath in currPlugin.Assemblies) {
629              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(assemblyPath));
630            }
631            response.Plugins.Add(currCachedPlugin);
632          }
633        }
634      }
635      response.Success = true;
636      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
637
638      return response;
639
640    }
641
642    #endregion
643  }
644}
Note: See TracBrowser for help on using the repository browser.