Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/ClientCommunicator.cs @ 1478

Last change on this file since 1478 was 1478, checked in by msteinbi, 15 years ago

new properties for Job needed by Scheduler (#507)

File size: 17.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.Hive.Contracts.BusinessObjects;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Core;
30using HeuristicLab.Hive.Server.DataAccess;
31using System.Resources;
32using System.Reflection;
33using HeuristicLab.Hive.JobBase;
34using HeuristicLab.Hive.Server.Core.InternalInterfaces;
35using System.Threading;
36using HeuristicLab.PluginInfrastructure;
37using HeuristicLab.DataAccess.Interfaces;
38
39namespace HeuristicLab.Hive.Server.Core {
40  /// <summary>
41  /// The ClientCommunicator manages the whole communication with the client
42  /// </summary>
43  public class ClientCommunicator: IClientCommunicator {
44    private static Dictionary<Guid, DateTime> lastHeartbeats =
45      new Dictionary<Guid,DateTime>();
46
47    private static ReaderWriterLockSlim heartbeatLock =
48      new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
49
50    private ISessionFactory factory;
51    private ILifecycleManager lifecycleManager;
52    private IInternalJobManager jobManager;
53    private IScheduler scheduler;
54
55    /// <summary>
56    /// Initialization of the Adapters to the database
57    /// Initialization of Eventhandler for the lifecycle management
58    /// Initialization of lastHearbeats Dictionary
59    /// </summary>
60    public ClientCommunicator() {
61      factory = ServiceLocator.GetSessionFactory();
62     
63      lifecycleManager = ServiceLocator.GetLifecycleManager();
64      jobManager = ServiceLocator.GetJobManager() as
65        IInternalJobManager;
66      scheduler = ServiceLocator.GetScheduler();
67
68      lifecycleManager.RegisterHeartbeat(
69        new EventHandler(lifecycleManager_OnServerHeartbeat));
70    }
71
72    /// <summary>
73    /// Check if online clients send their hearbeats
74    /// if not -> set them offline and check if they where calculating a job
75    /// </summary>
76    /// <param name="sender"></param>
77    /// <param name="e"></param>
78    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
79      ISession session = factory.GetSessionForCurrentThread();
80
81      try {
82        IClientAdapter clientAdapter =
83          session.GetDataAdapter<ClientInfo, IClientAdapter>();
84        IJobAdapter jobAdapter =
85          session.GetDataAdapter<Job, IJobAdapter>();
86
87        List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
88
89        foreach (ClientInfo client in allClients) {
90          if (client.State != State.offline && client.State != State.nullState) {
91            heartbeatLock.EnterUpgradeableReadLock();
92
93            if (!lastHeartbeats.ContainsKey(client.Id)) {
94              client.State = State.offline;
95              clientAdapter.Update(client);
96              foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
97                jobManager.ResetJobsDependingOnResults(job);
98              }
99            } else {
100              DateTime lastHbOfClient = lastHeartbeats[client.Id];
101
102              TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
103              // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
104              if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
105                // if client calculated jobs, the job must be reset
106                foreach (Job job in jobAdapter.GetActiveJobsOf(client)) {
107                  jobManager.ResetJobsDependingOnResults(job);
108                }
109
110                // client must be set offline
111                client.State = State.offline;
112                clientAdapter.Update(client);
113
114                heartbeatLock.EnterWriteLock();
115                lastHeartbeats.Remove(client.Id);
116                heartbeatLock.ExitWriteLock();
117              }
118            }
119
120            heartbeatLock.ExitUpgradeableReadLock();
121          } else {
122            heartbeatLock.EnterWriteLock();
123            if (lastHeartbeats.ContainsKey(client.Id))
124              lastHeartbeats.Remove(client.Id);
125            heartbeatLock.ExitWriteLock();
126          }
127        }
128      }
129      finally {
130        if (session != null)
131          session.EndSession();
132      }
133    }
134
135    #region IClientCommunicator Members
136
137    /// <summary>
138    /// Login process for the client
139    /// A hearbeat entry is created as well (login is the first hearbeat)
140    /// </summary>
141    /// <param name="clientInfo"></param>
142    /// <returns></returns>
143    public Response Login(ClientInfo clientInfo) {
144      ISession session = factory.GetSessionForCurrentThread();
145
146      try {
147        IClientAdapter clientAdapter =
148          session.GetDataAdapter<ClientInfo, IClientAdapter>();
149
150        Response response = new Response();
151
152        heartbeatLock.EnterWriteLock();
153        if (lastHeartbeats.ContainsKey(clientInfo.Id)) {
154          lastHeartbeats[clientInfo.Id] = DateTime.Now;
155        } else {
156          lastHeartbeats.Add(clientInfo.Id, DateTime.Now);
157        }
158        heartbeatLock.ExitWriteLock();
159
160        ClientInfo client = clientAdapter.GetById(clientInfo.Id);
161        if (client != null && client.State != State.offline && client.State != State.nullState) {
162          response.Success = false;
163          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
164          return response;
165        }
166        clientInfo.State = State.idle;
167        clientAdapter.Update(clientInfo);
168        response.Success = true;
169        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
170
171        return response;
172      }
173      finally {
174        if (session != null)
175          session.EndSession();
176      }
177    }
178
179    /// <summary>
180    /// The client has to send regulary heartbeats
181    /// this hearbeats will be stored in the heartbeats dictionary
182    /// check if there is work for the client and send the client a response if he should pull a job
183    /// </summary>
184    /// <param name="hbData"></param>
185    /// <returns></returns>
186    public ResponseHB ProcessHeartBeat(HeartBeatData hbData) {
187      ISession session = factory.GetSessionForCurrentThread();
188
189      try {
190        IClientAdapter clientAdapter =
191          session.GetDataAdapter<ClientInfo, IClientAdapter>();
192
193        IJobAdapter jobAdapter =
194          session.GetDataAdapter<Job, IJobAdapter>();
195
196        ResponseHB response = new ResponseHB();
197
198        // check if the client is logged in
199        response.ActionRequest = new List<MessageContainer>();
200        if (clientAdapter.GetById(hbData.ClientId).State == State.offline ||
201            clientAdapter.GetById(hbData.ClientId).State == State.nullState) {
202          response.Success = false;
203          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
204          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
205          return response;
206        }
207
208        // save timestamp of this heartbeat
209        heartbeatLock.EnterWriteLock();
210        if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
211          lastHeartbeats[hbData.ClientId] = DateTime.Now;
212        } else {
213          lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
214        }
215        heartbeatLock.ExitWriteLock();
216
217        response.Success = true;
218        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED;
219        // check if client has a free core for a new job
220        // if true, ask scheduler for a new job for this client
221        if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData))
222          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
223        else
224          response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
225
226        if (hbData.JobProgress != null) {
227          List<Job> jobsOfClient = new List<Job>(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId)));
228          if (jobsOfClient == null || jobsOfClient.Count == 0) {
229            response.Success = false;
230            response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
231            return response;
232          }
233
234          foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
235            Job curJob = jobAdapter.GetById(jobProgress.Key);
236            if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) {
237              response.Success = false;
238              response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
239            } else if (curJob.State == State.finished) {
240              // another client has finished this job allready
241              // the client can abort it
242              response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
243            } else {
244              // save job progress
245              curJob.Percentage = jobProgress.Value;
246              jobAdapter.Update(curJob);
247            }
248          }
249        }
250
251        return response;
252      }
253      finally {
254        if (session != null)
255          session.EndSession();
256      }
257    }
258   
259    /// <summary>
260    /// if the client asked to pull a job he calls this method
261    /// the server selects a job and sends it to the client
262    /// </summary>
263    /// <param name="clientId"></param>
264    /// <returns></returns>
265    public ResponseJob SendJob(Guid clientId) {
266      ResponseJob response = new ResponseJob();
267
268      Job job2Calculate = scheduler.GetNextJobForClient(clientId);
269      if (job2Calculate != null) {
270        response.Job = job2Calculate;
271        response.Success = true;
272        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
273      } else {
274        response.Success = false;
275        response.Job = null;
276        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
277      }
278      return response;
279    }
280
281    private ResponseResultReceived ProcessJobResult(Guid clientId,
282      Guid jobId,
283      byte[] result,
284      double percentage,
285      Exception exception,
286      bool finished) {
287      ISession session = factory.GetSessionForCurrentThread();
288
289      try {
290        IClientAdapter clientAdapter =
291          session.GetDataAdapter<ClientInfo, IClientAdapter>();
292        IJobAdapter jobAdapter =
293          session.GetDataAdapter<Job, IJobAdapter>();
294        IJobResultsAdapter jobResultAdapter =
295          session.GetDataAdapter<JobResult, IJobResultsAdapter>();
296
297        ResponseResultReceived response = new ResponseResultReceived();
298        ClientInfo client =
299          clientAdapter.GetById(clientId);
300
301        Job job =
302          jobAdapter.GetById(jobId);
303
304        if (job.Client == null) {
305          response.Success = false;
306          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
307          return response;
308        }
309        if (job.Client.Id != clientId) {
310          response.Success = false;
311          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB;
312          return response;
313        }
314        if (job == null) {
315          response.Success = false;
316          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID;
317          return response;
318        }
319        if (job.State == State.finished) {
320          response.Success = true;
321          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
322          return response;
323        }
324        if (job.State != State.calculating) {
325          response.Success = false;
326          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
327          return response;
328        }
329        job.SerializedJob = result;
330        job.Percentage = percentage;
331
332        if (finished) {
333          job.State = State.finished;
334          jobAdapter.Update(job);
335
336          client.State = State.idle;
337          clientAdapter.Update(client);
338
339          List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
340          foreach (JobResult currentResult in jobResults)
341            jobResultAdapter.Delete(currentResult);
342        }
343
344        JobResult jobResult =
345          new JobResult();
346        jobResult.Client = client;
347        jobResult.Job = job;
348        jobResult.Result = result;
349        jobResult.Percentage = percentage;
350        jobResult.Exception = exception;
351        jobResult.DateFinished = DateTime.Now;
352
353        jobResultAdapter.Update(jobResult);
354        jobAdapter.Update(job);
355
356        response.Success = true;
357        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
358        response.JobId = jobId;
359        response.finished = finished;
360
361        return response;
362      }
363      finally {
364        if (session != null)
365          session.EndSession();
366      }
367    }
368
369
370    /// <summary>
371    /// the client can send job results during calculating
372    /// and will send a final job result when he finished calculating
373    /// these job results will be stored in the database
374    /// </summary>
375    /// <param name="clientId"></param>
376    /// <param name="jobId"></param>
377    /// <param name="result"></param>
378    /// <param name="exception"></param>
379    /// <param name="finished"></param>
380    /// <returns></returns>
381    public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
382      Guid jobId,
383      byte[] result,
384      double percentage,
385      Exception exception) {
386
387      return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
388    }
389
390
391    public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
392      return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
393    }
394
395    /// <summary>
396    /// when a client logs out the state will be set
397    /// and the entry in the last hearbeats dictionary will be removed
398    /// </summary>
399    /// <param name="clientId"></param>
400    /// <returns></returns>                       
401    public Response Logout(Guid clientId) {
402      ISession session = factory.GetSessionForCurrentThread();
403
404      try {
405        IClientAdapter clientAdapter =
406          session.GetDataAdapter<ClientInfo, IClientAdapter>();
407        IJobAdapter jobAdapter =
408          session.GetDataAdapter<Job, IJobAdapter>();
409
410        Response response = new Response();
411
412        heartbeatLock.EnterWriteLock();
413        if (lastHeartbeats.ContainsKey(clientId))
414          lastHeartbeats.Remove(clientId);
415        heartbeatLock.ExitWriteLock();
416
417        ClientInfo client = clientAdapter.GetById(clientId);
418        if (client == null) {
419          response.Success = false;
420          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
421          return response;
422        }
423        List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
424        if (client.State == State.calculating) {
425          // check wich job the client was calculating and reset it
426          foreach (Job job in allJobs) {
427            if (job.Client.Id == client.Id) {
428              jobManager.ResetJobsDependingOnResults(job);
429            }
430          }
431        }
432
433        client.State = State.offline;
434        clientAdapter.Update(client);
435
436        response.Success = true;
437        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
438
439        return response;
440      }
441      finally {
442        if (session != null)
443          session.EndSession();
444      }
445    }
446
447    /// <summary>
448    /// If a client goes offline and restores a job he was calculating
449    /// he can ask the client if he still needs the job result
450    /// </summary>
451    /// <param name="jobId"></param>
452    /// <returns></returns>
453    public Response IsJobStillNeeded(Guid jobId) {
454      ISession session = factory.GetSessionForCurrentThread();
455
456      try {
457        IJobAdapter jobAdapter =
458          session.GetDataAdapter<Job, IJobAdapter>();
459
460        Response response = new Response();
461        Job job = jobAdapter.GetById(jobId);
462        if (job == null) {
463          response.Success = false;
464          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST;
465          return response;
466        }
467        if (job.State == State.finished) {
468          response.Success = true;
469          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED;
470          return response;
471        }
472        job.State = State.finished;
473        jobAdapter.Update(job);
474
475        response.Success = true;
476        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT;
477        return response;
478      }
479      finally {
480        if (session != null)
481          session.EndSession();
482      }
483    }
484
485    public ResponsePlugin SendPlugins(List<PluginInfo> pluginList) {
486
487
488      throw new NotImplementedException();
489    }
490
491    #endregion
492  }
493}
Note: See TracBrowser for help on using the repository browser.