Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/3.2/ClientCommunicator.cs @ 1869

Last change on this file since 1869 was 1831, checked in by msteinbi, 16 years ago

corrected job state handling (#466)

File size: 23.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38using System.IO;
39
40namespace HeuristicLab.Hive.Server.Core {
41  /// <summary>
42  /// The ClientCommunicator manages the whole communication with the client
43  /// </summary>
44  public class ClientCommunicator: IClientCommunicator {
45    private static Dictionary<Guid, DateTime> lastHeartbeats =
46      new Dictionary<Guid,DateTime>();
47    private static Dictionary<Guid, int> newAssignedJobs =
48      new Dictionary<Guid, int>();
49
50    private static ReaderWriterLockSlim heartbeatLock =
51      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
52
53    private ISessionFactory factory;
54    private ILifecycleManager lifecycleManager;
55    private IInternalJobManager jobManager;
56    private IScheduler scheduler;
57
58    /// <summary>
59    /// Initialization of the Adapters to the database
60    /// Initialization of Eventhandler for the lifecycle management
61    /// Initialization of lastHearbeats Dictionary
62    /// </summary>
63    public ClientCommunicator() {
64      factory = ServiceLocator.GetSessionFactory();
65     
66      lifecycleManager = ServiceLocator.GetLifecycleManager();
67      jobManager = ServiceLocator.GetJobManager() as
68        IInternalJobManager;
69      scheduler = ServiceLocator.GetScheduler();
70
71      lifecycleManager.RegisterHeartbeat(
72        new EventHandler(lifecycleManager_OnServerHeartbeat));
73    }
74
75    /// <summary>
76    /// Check if online clients send their hearbeats
77    /// if not -> set them offline and check if they where calculating a job
78    /// </summary>
79    /// <param name="sender"></param>
80    /// <param name="e"></param>
81    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
82      ISession session = factory.GetSessionForCurrentThread();
83      ITransaction tx = null;
84
85      try {
86        IClientAdapter clientAdapter =
87          session.GetDataAdapter<ClientInfo, IClientAdapter>();
88        IJobAdapter jobAdapter =
89          session.GetDataAdapter<Job, IJobAdapter>();
90
91        tx = session.BeginTransaction();
92
93        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
94
95        foreach (ClientInfo client in allClients) {
96          if (client.State != State.offline && client.State != State.nullState) {
97            heartbeatLock.EnterUpgradeableReadLock();
98
99            if (!lastHeartbeats.ContainsKey(client.Id)) {
100              client.State = State.offline;
101              clientAdapter.Update(client);
102              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
103                jobManager.ResetJobsDependingOnResults(job);
104              }
105            } else {
106              DateTime lastHbOfClient = lastHeartbeats[client.Id];
107
108              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
109              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
110              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
111                // if client calculated jobs, the job must be reset
112                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
113                  jobManager.ResetJobsDependingOnResults(job);
114                  lock (newAssignedJobs) {
115                    if (newAssignedJobs.ContainsKey(job.Id))
116                      newAssignedJobs.Remove(job.Id);
117                  }
118                }
119
120                // client must be set offline
121                client.State = State.offline;
122                clientAdapter.Update(client);
123
124                heartbeatLock.EnterWriteLock();
125                lastHeartbeats.Remove(client.Id);
126                heartbeatLock.ExitWriteLock();
127              }
128            }
129
130            heartbeatLock.ExitUpgradeableReadLock();
131          } else {
132            heartbeatLock.EnterWriteLock();
133            if (lastHeartbeats.ContainsKey(client.Id))
134              lastHeartbeats.Remove(client.Id);
135            foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
136              jobManager.ResetJobsDependingOnResults(job);
137            }
138            heartbeatLock.ExitWriteLock();
139          }
140        }
141        tx.Commit();
142      }
143      catch (Exception ex) {
144        if (tx != null)
145          tx.Rollback();
146        throw ex;
147      }
148      finally {
149        if (session != null)
150          session.EndSession();
151      }
152    }
153
154    #region IClientCommunicator Members
155
156    /// <summary>
157    /// Login process for the client
158    /// A hearbeat entry is created as well (login is the first hearbeat)
159    /// </summary>
160    /// <param name="clientInfo"></param>
161    /// <returns></returns>
162    public Response Login(ClientInfo clientInfo) {
163      ISession session = factory.GetSessionForCurrentThread();
164      ITransaction tx = null;
165
166      try {
167        IClientAdapter clientAdapter =
168          session.GetDataAdapter<ClientInfo, IClientAdapter>();
169
170        tx = session.BeginTransaction();
171
172        Response response = new Response();
173
174        heartbeatLock.EnterWriteLock();
175        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
176          lastHeartbeats[clientInfo.Id] = DateTime.Now;
177        } else {
178          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
179        }
180        heartbeatLock.ExitWriteLock();
181
182        ClientInfo client = clientAdapter.GetById(clientInfo.Id);
183        if (client != null && client.State != State.offline && client.State != State.nullState) {
184          response.Success = false;
185          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
186          return response;
187        }
188        clientInfo.State = State.idle;
189        clientAdapter.Update(clientInfo);
190        response.Success = true;
191        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
192
193        tx.Commit();
194        return response;
195      }
196      catch (Exception ex) {
197        if (tx != null)
198          tx.Rollback();
199        throw ex;
200      }
201      finally {
202        if (session != null)
203          session.EndSession();
204      }
205    }
206
207    /// <summary>
208    /// The client has to send regulary heartbeats
209    /// this hearbeats will be stored in the heartbeats dictionary
210    /// check if there is work for the client and send the client a response if he should pull a job
211    /// </summary>
212    /// <param name="hbData"></param>
213    /// <returns></returns>
214    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
215      ISession session = factory.GetSessionForCurrentThread();
216      ITransaction tx = null;
217
218      try {
219        IClientAdapter clientAdapter =
220          session.GetDataAdapter<ClientInfo, IClientAdapter>();
221
222        IJobAdapter jobAdapter =
223          session.GetDataAdapter<Job, IJobAdapter>();
224
225        tx = session.BeginTransaction();
226
227        ResponseHB response = new ResponseHB();
228        response.ActionRequest = new List<MessageContainer>();
229
230        ClientInfo client = clientAdapter.GetById(hbData.ClientId);
231
232        // check if the client is logged in
233        if (client.State == State.offline || client.State == State.nullState) {
234          response.Success = false;
235          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
236          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
237          return response;
238        }
239
240        client.NrOfFreeCores = hbData.FreeCores;
241        client.FreeMemory = hbData.FreeMemory;
242
243        // save timestamp of this heartbeat
244        heartbeatLock.EnterWriteLock();
245        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
246          lastHeartbeats[hbData.ClientId] = DateTime.Now;
247        } else {
248          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
249        }
250        heartbeatLock.ExitWriteLock();
251
252        // check if client has a free core for a new job
253        // if true, ask scheduler for a new job for this client
254        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
255          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
256        else
257          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
258
259        response.Success = true;
260        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
261
262        processJobProcess(hbData, jobAdapter, clientAdapter, response);
263        clientAdapter.Update(client);
264
265        tx.Commit();
266        return response;
267      }
268      catch (Exception ex) {
269        if (tx != null)
270          tx.Rollback();
271        throw ex;
272      }
273      finally {
274        if (session != null)
275          session.EndSession();
276      }
277    }
278
279    /// <summary>
280    /// Process the Job progress sent by a client
281    /// </summary>
282    /// <param name="hbData"></param>
283    /// <param name="jobAdapter"></param>
284    /// <param name="clientAdapter"></param>
285    /// <param name="response"></param>
286    private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) {
287      if (hbData.JobProgress != null) {
288        List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
289        if (jobsOfClient == null || jobsOfClient.Count == 0) {
290          response.Success = false;
291          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
292          return;
293        }
294
295        foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
296          Job curJob = jobAdapter.GetById(jobProgress.Key);
297          if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
298            response.Success = false;
299            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
300          } else if (curJob.State == State.abort) {
301            // a request to abort the job has been set
302            response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
303            curJob.State = State.finished;
304          } else {
305            // save job progress
306            curJob.Percentage = jobProgress.Value;
307
308            if (curJob.State == State.requestSnapshot) {
309              // a request for a snapshot has been set
310              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
311              curJob.State = State.requestSnapshotSent;
312            }
313          }
314          jobAdapter.Update(curJob);
315        }
316        foreach (Job currJob in jobsOfClient) {
317          bool found = false;
318          foreach (Guid jobId in hbData.JobProgress.Keys) {
319            if (jobId == currJob.Id) {
320              found = true;
321              break;
322            }
323          }
324          if (!found) {
325            lock (newAssignedJobs) {
326              if (newAssignedJobs.ContainsKey(currJob.Id)) {
327                newAssignedJobs[currJob.Id]--;
328
329                if (newAssignedJobs[currJob.Id] <= 0) {
330                  currJob.State = State.offline;
331                  jobAdapter.Update(currJob);
332                  newAssignedJobs.Remove(currJob.Id);
333                }
334              } else {
335                currJob.State = State.offline;
336                jobAdapter.Update(currJob);
337              }
338            } // lock
339          } else {
340            lock (newAssignedJobs) {
341              if (newAssignedJobs.ContainsKey(currJob.Id))
342                newAssignedJobs.Remove(currJob.Id);
343            }
344          }
345        }
346      }
347    }
348   
349    /// <summary>
350    /// if the client was told to pull a job he calls this method
351    /// the server selects a job and sends it to the client
352    /// </summary>
353    /// <param name="clientId"></param>
354    /// <returns></returns>
355    public ResponseJob SendJob(Guid clientId) {
356      ResponseJob response = new ResponseJob();
357
358      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
359      if (job2Calculate != null) {
360        response.Job = job2Calculate;
361        response.Success = true;
362        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
363        lock (newAssignedJobs) {
364          newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
365        }
366      } else {
367        response.Success = false;
368        response.Job = null;
369        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
370      }
371      return response;
372    }
373
374    private ResponseResultReceived ProcessJobResult(Guid clientId,
375      Guid jobId,
376      byte[] result,
377      double percentage,
378      Exception exception,
379      bool finished) {
380      ISession session = factory.GetSessionForCurrentThread();
381      ITransaction tx = null;
382
383      try {
384        IClientAdapter clientAdapter =
385          session.GetDataAdapter<ClientInfo, IClientAdapter>();
386        IJobAdapter jobAdapter =
387          session.GetDataAdapter<Job, IJobAdapter>();
388        IJobResultsAdapter jobResultAdapter =
389          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
390
391        tx = session.BeginTransaction();
392
393        ResponseResultReceived response = new ResponseResultReceived();
394        ClientInfo client =
395          clientAdapter.GetById(clientId);
396
397        Job job =
398          jobAdapter.GetById(jobId);
399       
400        if (job == null) {
401          response.Success = false;
402          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
403          response.JobId = jobId;
404          return response;
405        }
406        if (job.State == State.abort) {
407          response.Success = false;
408          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_WAS_ABORTED;
409        }
410        if (job.Client == null) {
411          response.Success = false;
412          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
413          response.JobId = jobId;
414          return response;
415        }
416        if (job.Client.Id != clientId) {
417          response.Success = false;
418          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
419          response.JobId = jobId;
420          return response;
421        }
422        if (job.State == State.finished) {
423          response.Success = true;
424          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
425          response.JobId = jobId;
426          return response;
427        }
428        if (job.State == State.requestSnapshotSent) {
429          job.State = State.calculating;
430        }
431        if (job.State != State.calculating) {
432          response.Success = false;
433          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
434          response.JobId = jobId;
435          return response;
436        }
437        job.SerializedJob = result;
438        job.Percentage = percentage;
439
440        if (finished) {
441          job.State = State.finished;
442          jobAdapter.Update(job);
443
444          client.State = State.idle;
445          clientAdapter.Update(client);
446        }
447        List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
448        foreach (JobResult currentResult in jobResults)
449          jobResultAdapter.Delete(currentResult);
450
451        JobResult jobResult =
452          new JobResult();
453        jobResult.Client = client;
454        jobResult.Job = job;
455        jobResult.Result = result;
456        jobResult.Percentage = percentage;
457        jobResult.Exception = exception;
458        jobResult.DateFinished = DateTime.Now;
459
460        jobResultAdapter.Update(jobResult);
461        jobAdapter.Update(job);
462
463        response.Success = true;
464        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
465        response.JobId = jobId;
466        response.finished = finished;
467
468        tx.Commit();
469        return response;
470      }
471      catch (Exception ex) {
472        if (tx != null)
473          tx.Rollback();
474        throw ex;
475      }
476      finally {
477        if (session != null)
478          session.EndSession();
479      }
480    }
481
482
483    /// <summary>
484    /// the client can send job results during calculating
485    /// and will send a final job result when he finished calculating
486    /// these job results will be stored in the database
487    /// </summary>
488    /// <param name="clientId"></param>
489    /// <param name="jobId"></param>
490    /// <param name="result"></param>
491    /// <param name="exception"></param>
492    /// <param name="finished"></param>
493    /// <returns></returns>
494    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
495      Guid jobId,
496      byte[] result,
497      double percentage,
498      Exception exception) {
499
500      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
501    }
502
503
504    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
505      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
506    }
507
508    /// <summary>
509    /// when a client logs out the state will be set
510    /// and the entry in the last hearbeats dictionary will be removed
511    /// </summary>
512    /// <param name="clientId"></param>
513    /// <returns></returns>                       
514    public Response Logout(Guid clientId) {
515      ISession session = factory.GetSessionForCurrentThread();
516      ITransaction tx = null;
517
518      try {
519        IClientAdapter clientAdapter =
520          session.GetDataAdapter<ClientInfo, IClientAdapter>();
521        IJobAdapter jobAdapter =
522          session.GetDataAdapter<Job, IJobAdapter>();
523
524        tx = session.BeginTransaction();
525
526        Response response = new Response();
527
528        heartbeatLock.EnterWriteLock();
529        if (lastHeartbeats.ContainsKey(clientId))
530          lastHeartbeats.Remove(clientId);
531        heartbeatLock.ExitWriteLock();
532
533        ClientInfo client = clientAdapter.GetById(clientId);
534        if (client == null) {
535          response.Success = false;
536          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
537          return response;
538        }
539        List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
540        if (client.State == State.calculating) {
541          // check wich job the client was calculating and reset it
542          foreach (Job job in allJobs) {
543            if (job.Client != null) {
544              if (job.Client.Id == client.Id) {
545                jobManager.ResetJobsDependingOnResults(job);
546              }
547            }
548          }
549        }
550
551        client.State = State.offline;
552        clientAdapter.Update(client);
553
554        response.Success = true;
555        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
556
557        tx.Commit();
558        return response;
559      }
560      catch (Exception ex) {
561        if (tx != null)
562          tx.Rollback();
563        throw ex;
564      }
565      finally {
566        if (session != null)
567          session.EndSession();
568      }
569    }
570
571    /// <summary>
572    /// If a client goes offline and restores a job he was calculating
573    /// he can ask the client if he still needs the job result
574    /// </summary>
575    /// <param name="jobId"></param>
576    /// <returns></returns>
577    public Response IsJobStillNeeded(Guid jobId) {
578      ISession session = factory.GetSessionForCurrentThread();
579      ITransaction tx = null;
580
581      try {
582        IJobAdapter jobAdapter =
583          session.GetDataAdapter<Job, IJobAdapter>();
584        tx = session.BeginTransaction();
585
586        Response response = new Response();
587        Job job = jobAdapter.GetById(jobId);
588        if (job == null) {
589          response.Success = false;
590          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
591          return response;
592        }
593        if (job.State == State.finished) {
594          response.Success = true;
595          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
596          return response;
597        }
598        job.State = State.finished;
599        jobAdapter.Update(job);
600
601        response.Success = true;
602        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
603        tx.Commit();
604        return response;
605      }
606      catch (Exception ex) {
607        if (tx != null)
608          tx.Rollback();
609        throw ex;
610      }
611      finally {
612        if (session != null)
613          session.EndSession();
614      }
615    }
616
617    public ResponsePlugin SendPlugins(List<HivePluginInfo> pluginList) {
618      ResponsePlugin response = new ResponsePlugin();
619      PluginManager.Manager.Initialize();
620      ICollection<PluginInfo> allActivePlugins = PluginManager.Manager.ActivePlugins;
621
622      foreach (HivePluginInfo pluginInfo in pluginList) {
623        // TODO: BuildDate deleted, not needed???
624        // TODO: Split version to major, minor and revision number
625        foreach (PluginInfo currPlugin in allActivePlugins) {
626          if (currPlugin.Name == pluginInfo.Name
627              && currPlugin.Version.ToString() == pluginInfo.Version) {
628
629            CachedHivePluginInfo currCachedPlugin = new CachedHivePluginInfo {
630                Name = currPlugin.Name,
631                Version = currPlugin.Version.ToString(),
632                BuildDate = currPlugin.BuildDate };
633
634            foreach (String assemblyPath in currPlugin.Assemblies) {
635              currCachedPlugin.PluginFiles.Add(File.ReadAllBytes(assemblyPath));
636            }
637            response.Plugins.Add(currCachedPlugin);
638          }
639        }
640      }
641      response.Success = true;
642      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_PLUGINS_SENT;
643
644      return response;
645
646    }
647
648    #endregion
649  }
650}
Note: See TracBrowser for help on using the repository browser.