Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/ClientCommunicator.cs @ 1468

Last change on this file since 1468 was 1468, checked in by svonolfe, 16 years ago

Added transaction management (#527)

File size: 18.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38
39namespace HeuristicLab.Hive.Server.Core {
40  /// <summary>
41  /// The ClientCommunicator manages the whole communication with the client
42  /// </summary>
43  public class ClientCommunicator: IClientCommunicator {
44    private static Dictionary<Guid, DateTime> lastHeartbeats =
45      new Dictionary<Guid,DateTime>();
46
47    private static ReaderWriterLockSlim heartbeatLock =
48      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
49
50    private ISessionFactory factory;
51    private ILifecycleManager lifecycleManager;
52    private IInternalJobManager jobManager;
53    private IScheduler scheduler;
54
55    /// <summary>
56    /// Initialization of the Adapters to the database
57    /// Initialization of Eventhandler for the lifecycle management
58    /// Initialization of lastHearbeats Dictionary
59    /// </summary>
60    public ClientCommunicator() {
61      factory = ServiceLocator.GetSessionFactory();
62     
63      lifecycleManager = ServiceLocator.GetLifecycleManager();
64      jobManager = ServiceLocator.GetJobManager() as
65        IInternalJobManager;
66      scheduler = ServiceLocator.GetScheduler();
67
68      lifecycleManager.RegisterHeartbeat(
69        new EventHandler(lifecycleManager_OnServerHeartbeat));
70    }
71
72    /// <summary>
73    /// Check if online clients send their hearbeats
74    /// if not -> set them offline and check if they where calculating a job
75    /// </summary>
76    /// <param name="sender"></param>
77    /// <param name="e"></param>
78    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
79      ISession session = factory.GetSessionForCurrentThread();
80
81      try {
82        IClientAdapter clientAdapter =
83          session.GetDataAdapter<ClientInfo, IClientAdapter>();
84        IJobAdapter jobAdapter =
85          session.GetDataAdapter<Job, IJobAdapter>();
86
87        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
88
89        foreach (ClientInfo client in allClients) {
90          if (client.State != State.offline && client.State != State.nullState) {
91            heartbeatLock.EnterUpgradeableReadLock();
92
93            if (!lastHeartbeats.ContainsKey(client.Id)) {
94              client.State = State.offline;
95              clientAdapter.Update(client);
96              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
97                jobManager.ResetJobsDependingOnResults(job);
98              }
99            } else {
100              DateTime lastHbOfClient = lastHeartbeats[client.Id];
101
102              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
103              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
104              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
105                // if client calculated jobs, the job must be reset
106                if (client.State == State.calculating) {
107                  // check wich job the client was calculating and reset it
108                  foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
109                    jobManager.ResetJobsDependingOnResults(job);
110                  }
111                }
112
113                // client must be set offline
114                client.State = State.offline;
115                clientAdapter.Update(client);
116
117                heartbeatLock.EnterWriteLock();
118                lastHeartbeats.Remove(client.Id);
119                heartbeatLock.ExitWriteLock();
120              }
121            }
122
123            heartbeatLock.ExitUpgradeableReadLock();
124          } else {
125            heartbeatLock.EnterWriteLock();
126            if (lastHeartbeats.ContainsKey(client.Id))
127              lastHeartbeats.Remove(client.Id);
128            heartbeatLock.ExitWriteLock();
129          }
130        }
131      }
132      finally {
133        if (session != null)
134          session.EndSession();
135      }
136    }
137
138    #region IClientCommunicator Members
139
140    /// <summary>
141    /// Login process for the client
142    /// A hearbeat entry is created as well (login is the first hearbeat)
143    /// </summary>
144    /// <param name="clientInfo"></param>
145    /// <returns></returns>
146    public Response Login(ClientInfo clientInfo) {
147      ISession session = factory.GetSessionForCurrentThread();
148
149      try {
150        IClientAdapter clientAdapter =
151          session.GetDataAdapter<ClientInfo, IClientAdapter>();
152
153        Response response = new Response();
154
155        heartbeatLock.EnterWriteLock();
156        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
157          lastHeartbeats[clientInfo.Id] = DateTime.Now;
158        } else {
159          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
160        }
161        heartbeatLock.ExitWriteLock();
162
163        // todo: allClients legacy ?
164        ClientInfo client = clientAdapter.GetById(clientInfo.Id);
165        if (client != null && client.State != State.offline && client.State != State.nullState) {
166          response.Success = false;
167          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
168          return response;
169        }
170        clientInfo.State = State.idle;
171        clientAdapter.Update(clientInfo);
172        response.Success = true;
173        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
174
175        return response;
176      }
177      finally {
178        if (session != null)
179          session.EndSession();
180      }
181    }
182
183    /// <summary>
184    /// The client has to send regulary heartbeats
185    /// this hearbeats will be stored in the heartbeats dictionary
186    /// check if there is work for the client and send the client a response if he should pull a job
187    /// </summary>
188    /// <param name="hbData"></param>
189    /// <returns></returns>
190    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
191      ISession session = factory.GetSessionForCurrentThread();
192
193      try {
194        IClientAdapter clientAdapter =
195          session.GetDataAdapter<ClientInfo, IClientAdapter>();
196
197        IJobAdapter jobAdapter =
198          session.GetDataAdapter<Job, IJobAdapter>();
199
200        ResponseHB response = new ResponseHB();
201
202        // check if the client is logged in
203        response.ActionRequest = new List<MessageContainer>();
204        if (clientAdapter.GetById(hbData.ClientId).State == State.offline ||
205            clientAdapter.GetById(hbData.ClientId).State == State.nullState) {
206          response.Success = false;
207          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
208          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
209          return response;
210        }
211
212        // save timestamp of this heartbeat
213        heartbeatLock.EnterWriteLock();
214        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
215          lastHeartbeats[hbData.ClientId] = DateTime.Now;
216        } else {
217          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
218        }
219        heartbeatLock.ExitWriteLock();
220
221        response.Success = true;
222        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
223        // check if client has a free core for a new job
224        // if true, ask scheduler for a new job for this client
225        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
226          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
227        else
228          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
229
230        if (hbData.JobProgress != null) {
231          List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
232          if (jobsOfClient == null || jobsOfClient.Count == 0) {
233            response.Success = false;
234            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
235            return response;
236          }
237
238          foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
239            Job curJob = jobAdapter.GetById(jobProgress.Key);
240            if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
241              response.Success = false;
242              response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
243            } else if (curJob.State == State.finished) {
244              // another client has finished this job allready
245              // the client can abort it
246              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
247            } else {
248              // save job progress
249              curJob.Percentage = jobProgress.Value;
250              jobAdapter.Update(curJob);
251            }
252          }
253        }
254
255        return response;
256      }
257      finally {
258        if (session != null)
259          session.EndSession();
260      }
261    }
262   
263    /// <summary>
264    /// if the client asked to pull a job he calls this method
265    /// the server selects a job and sends it to the client
266    /// </summary>
267    /// <param name="clientId"></param>
268    /// <returns></returns>
269    public ResponseJob SendJob(Guid clientId) {
270      ResponseJob response = new ResponseJob();
271
272      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
273      if (job2Calculate != null) {
274        response.Job = job2Calculate;
275        response.Success = true;
276        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
277      } else {
278        response.Success = false;
279        response.Job = null;
280        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
281      }
282      return response;
283    }
284
285    private ResponseResultReceived ProcessJobResult(Guid clientId,
286      Guid jobId,
287      byte[] result,
288      double percentage,
289      Exception exception,
290      bool finished) {
291      ISession session = factory.GetSessionForCurrentThread();
292
293      try {
294        IClientAdapter clientAdapter =
295          session.GetDataAdapter<ClientInfo, IClientAdapter>();
296        IJobAdapter jobAdapter =
297          session.GetDataAdapter<Job, IJobAdapter>();
298        IJobResultsAdapter jobResultAdapter =
299          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
300
301        ResponseResultReceived response = new ResponseResultReceived();
302        ClientInfo client =
303          clientAdapter.GetById(clientId);
304
305        Job job =
306          jobAdapter.GetById(jobId);
307
308        if (job.Client == null) {
309          response.Success = false;
310          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
311          return response;
312        }
313        if (job.Client.Id != clientId) {
314          response.Success = false;
315          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
316          return response;
317        }
318        if (job == null) {
319          response.Success = false;
320          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
321          return response;
322        }
323        if (job.State == State.finished) {
324          response.Success = true;
325          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
326          return response;
327        }
328        if (job.State != State.calculating) {
329          response.Success = false;
330          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
331          return response;
332        }
333        job.SerializedJob = result;
334        job.Percentage = percentage;
335
336        if (finished) {
337          job.State = State.finished;
338          jobAdapter.Update(job);
339
340          client.State = State.idle;
341          clientAdapter.Update(client);
342
343          List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
344          foreach (JobResult currentResult in jobResults)
345            jobResultAdapter.Delete(currentResult);
346        }
347
348        JobResult jobResult =
349          new JobResult();
350        jobResult.Client = client;
351        jobResult.Job = job;
352        jobResult.Result = result;
353        jobResult.Percentage = percentage;
354        jobResult.Exception = exception;
355        jobResult.DateFinished = DateTime.Now;
356
357        jobResultAdapter.Update(jobResult);
358        jobAdapter.Update(job);
359
360        response.Success = true;
361        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
362        response.JobId = jobId;
363        response.finished = finished;
364
365        return response;
366      }
367      finally {
368        if (session != null)
369          session.EndSession();
370      }
371    }
372
373
374    /// <summary>
375    /// the client can send job results during calculating
376    /// and will send a final job result when he finished calculating
377    /// these job results will be stored in the database
378    /// </summary>
379    /// <param name="clientId"></param>
380    /// <param name="jobId"></param>
381    /// <param name="result"></param>
382    /// <param name="exception"></param>
383    /// <param name="finished"></param>
384    /// <returns></returns>
385    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
386      Guid jobId,
387      byte[] result,
388      double percentage,
389      Exception exception) {
390
391      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
392    }
393
394
395    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
396      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
397    }
398
399    /// <summary>
400    /// when a client logs out the state will be set
401    /// and the entry in the last hearbeats dictionary will be removed
402    /// </summary>
403    /// <param name="clientId"></param>
404    /// <returns></returns>                       
405    public Response Logout(Guid clientId) {
406      ISession session = factory.GetSessionForCurrentThread();
407
408      try {
409        IClientAdapter clientAdapter =
410          session.GetDataAdapter<ClientInfo, IClientAdapter>();
411        IJobAdapter jobAdapter =
412          session.GetDataAdapter<Job, IJobAdapter>();
413
414        Response response = new Response();
415
416        heartbeatLock.EnterWriteLock();
417        if (lastHeartbeats.ContainsKey(clientId))
418          lastHeartbeats.Remove(clientId);
419        heartbeatLock.ExitWriteLock();
420
421        ClientInfo client = clientAdapter.GetById(clientId);
422        if (client == null) {
423          response.Success = false;
424          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
425          return response;
426        }
427        List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
428        if (client.State == State.calculating) {
429          // check wich job the client was calculating and reset it
430          foreach (Job job in allJobs) {
431            if (job.Client.Id == client.Id) {
432              jobManager.ResetJobsDependingOnResults(job);
433            }
434          }
435        }
436
437        client.State = State.offline;
438        clientAdapter.Update(client);
439
440        response.Success = true;
441        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
442
443        return response;
444      }
445      finally {
446        if (session != null)
447          session.EndSession();
448      }
449    }
450
451    /// <summary>
452    /// If a client goes offline and restores a job he was calculating
453    /// he can ask the client if he still needs the job result
454    /// </summary>
455    /// <param name="jobId"></param>
456    /// <returns></returns>
457    public Response IsJobStillNeeded(Guid jobId) {
458      ISession session = factory.GetSessionForCurrentThread();
459
460      try {
461        IJobAdapter jobAdapter =
462          session.GetDataAdapter<Job, IJobAdapter>();
463
464        Response response = new Response();
465        Job job = jobAdapter.GetById(jobId);
466        if (job == null) {
467          response.Success = false;
468          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
469          return response;
470        }
471        if (job.State == State.finished) {
472          response.Success = true;
473          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
474          return response;
475        }
476        job.State = State.finished;
477        jobAdapter.Update(job);
478
479        response.Success = true;
480        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
481        return response;
482      }
483      finally {
484        if (session != null)
485          session.EndSession();
486      }
487    }
488
489    public ResponsePlugin SendPlugins(List<PluginInfo> pluginList) {
490      throw new NotImplementedException();
491    }
492
493    #endregion
494  }
495}
Note: See TracBrowser for help on using the repository browser.