Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Server.Core/ClientCommunicator.cs @ 1120

Last change on this file since 1120 was 1120, checked in by svonolfe, 15 years ago

Added execution engine facade (#465)

File size: 8.8 KB
RevLine 
[741]1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
[751]5using HeuristicLab.Hive.Contracts.BusinessObjects;
[780]6using HeuristicLab.Hive.Contracts.Interfaces;
7using HeuristicLab.Hive.Contracts;
[823]8using HeuristicLab.Core;
[842]9using HeuristicLab.Hive.Server.Core.InternalInterfaces.DataAccess;
10using System.Resources;
11using System.Reflection;
[1001]12using HeuristicLab.Hive.JobBase;
[1096]13using System.Runtime.CompilerServices;
[741]14
15namespace HeuristicLab.Hive.Server.Core {
[780]16  /// <summary>
17  /// The ClientCommunicator manages the whole communication with the client
18  /// </summary>
19  public class ClientCommunicator: IClientCommunicator {
[1022]20    int nrOfJobs = 0;
[1099]21    Dictionary<Guid, DateTime> lastHeartbeats =
22      new Dictionary<Guid,DateTime>();
[783]23
[842]24    IClientAdapter clientAdapter;
[970]25    IJobAdapter jobAdapter;
[1004]26    IJobResultsAdapter jobResultAdapter;
[1088]27    ILifecycleManager lifecycleManager;
[842]28
[783]29    public ClientCommunicator() {
[970]30      clientAdapter = ServiceLocator.GetClientAdapter();
31      jobAdapter = ServiceLocator.GetJobAdapter();
[1011]32      jobResultAdapter = ServiceLocator.GetJobResultsAdapter();
[1088]33      lifecycleManager = ServiceLocator.GetLifecycleManager();
[842]34
[1088]35      lifecycleManager.OnServerHeartbeat +=
36        new EventHandler(lifecycleManager_OnServerHeartbeat);
37
[1004]38      for (int i = 0; i < nrOfJobs; i++) {
[977]39        Job job = new Job();
[995]40        job.Id = i;
[977]41        job.State = State.offline;
[995]42        jobAdapter.Update(job);
[977]43      }
[1088]44      lastHeartbeats = new Dictionary<Guid, DateTime>();
[977]45
[783]46    }
47
[1096]48    [MethodImpl(MethodImplOptions.Synchronized)]
[1088]49    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
50      List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
[1096]51      List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
[1088]52
53      foreach (ClientInfo client in allClients) {
[1096]54        if (client.State != State.offline && client.State != State.nullState) {
55          if (!lastHeartbeats.ContainsKey(client.ClientId)) {
56            client.State = State.offline;
57            clientAdapter.Update(client);
58          } else {
59            DateTime lastHbOfClient = lastHeartbeats[client.ClientId];
[1099]60            TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
[1118]61            // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
62            if (dif.Seconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
63              // if client calculated jobs, the job must be reset
64              if (client.State == State.calculating) {
65                // check wich job the client was calculating and reset it
66                foreach (Job job in allJobs) {
67                  if (job.Client.ClientId == client.ClientId) {
68                    // TODO check for job results
69                    job.Client = null;
70                    job.Percentage = 0;
71                    job.State = State.idle;
72                  }
73                }
74              }
75             
76              // client must be set offline
77              client.State = State.offline;
78              clientAdapter.Update(client);
79              lastHeartbeats.Remove(client.ClientId);
80            }
[1096]81          }
82        } else {
83          if (lastHeartbeats.ContainsKey(client.ClientId))
84            lastHeartbeats.Remove(client.ClientId);
85        }
[1088]86      }
87    }
88
[741]89    #region IClientCommunicator Members
90
[1096]91    [MethodImpl(MethodImplOptions.Synchronized)]
[791]92    public Response Login(ClientInfo clientInfo) {
[741]93      Response response = new Response();
94
[1096]95      if (lastHeartbeats.ContainsKey(clientInfo.ClientId)) {
96        lastHeartbeats[clientInfo.ClientId] = DateTime.Now;
97      } else {
98        lastHeartbeats.Add(clientInfo.ClientId, DateTime.Now);
99      }
100
[995]101      ICollection<ClientInfo> allClients = clientAdapter.GetAll();
102      ClientInfo client = clientAdapter.GetById(clientInfo.ClientId);
[1096]103      if (client != null && client.State != State.offline && client.State != State.nullState) {
[1004]104        response.Success = false;
105        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
106        return response;
[842]107      }
[1096]108      clientInfo.State = State.idle;
[1004]109      clientAdapter.Update(clientInfo);
110      response.Success = true;
111      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
[842]112
[741]113      return response;
114    }
115
[1096]116    [MethodImpl(MethodImplOptions.Synchronized)]
[780]117    public ResponseHB SendHeartBeat(HeartBeatData hbData) {
[783]118      ResponseHB response = new ResponseHB();
119
[1096]120      response.ActionRequest = new List<MessageContainer>();
121      if (clientAdapter.GetById(hbData.ClientId).State == State.offline ||
122          clientAdapter.GetById(hbData.ClientId).State == State.nullState) {
123        response.Success = false;
124        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
125        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
126        return response;
127      }
128
[1088]129      if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
130        lastHeartbeats[hbData.ClientId] = DateTime.Now;
131      } else {
132        lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
133      }
134
[783]135      response.Success = true;
[929]136      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HARDBEAT_RECEIVED;
[1004]137      List<Job> allOfflineJobs = new List<Job>(jobAdapter.GetJobsByState(State.offline));
138      if (allOfflineJobs.Count > 0 && hbData.freeCores > 0)
[783]139        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
140      else
141        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
142
143      return response;
[780]144    }
145
[1099]146    [MethodImpl(MethodImplOptions.Synchronized)]
[780]147    public ResponseJob PullJob(Guid clientId) {
[783]148      ResponseJob response = new ResponseJob();
[805]149      lock (this) {
[1004]150        LinkedList<Job> allOfflineJobs = new LinkedList<Job>(jobAdapter.GetJobsByState(State.offline));
151        if (allOfflineJobs != null && allOfflineJobs.Count > 0) {
152          Job job2Calculate = allOfflineJobs.First.Value;
153          job2Calculate.State = State.calculating;
[1005]154          response.Job = job2Calculate;
[1120]155          jobAdapter.Update(job2Calculate);
[940]156          response.Success = true;
157          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
158          return response;
159        }
[805]160      }
[783]161      response.Success = true;
[940]162      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
[941]163      return response;
[780]164    }
165
[1099]166    [MethodImpl(MethodImplOptions.Synchronized)]
[1103]167    public ResponseResultReceived SendJobResult(Guid clientId,
168      long jobId,
169      byte[] result,
170      Exception exception, 
171      bool finished) {
[838]172      ResponseResultReceived response = new ResponseResultReceived();
[1103]173      ClientInfo client =
174        clientAdapter.GetById(clientId);
175
176      Job job =
177        jobAdapter.GetById(jobId);
[1004]178      if (job == null) {
179        response.Success = false;
180        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JO_WITH_THIS_ID;
181        return response;
182      }
183      if (job.State != State.calculating) {
184        response.Success = false;
185        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
186        return response;
187      }
188      if (finished) {
189        job.State = State.finished;
190        jobAdapter.Update(job);
191
192        List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
193        foreach (JobResult currentResult in jobResults)
194          jobResultAdapter.Delete(currentResult);
195      }
196
[1103]197      JobResult jobResult =
198        new JobResult();
199      jobResult.Client = client;
200      jobResult.Job = job;
201      jobResult.Result = result;
202      jobResult.Exception = exception;
203
204      jobResultAdapter.Update(jobResult);   
205
[783]206      response.Success = true;
[929]207      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
[1103]208      response.JobId = jobId;
[783]209
210      return response;
[780]211    }
[1096]212
213    [MethodImpl(MethodImplOptions.Synchronized)]                       
[780]214    public Response Logout(Guid clientId) {
[786]215      Response response = new Response();
[1096]216
217      if (lastHeartbeats.ContainsKey(clientId))
218        lastHeartbeats.Remove(clientId);
219
[995]220      ClientInfo client = clientAdapter.GetById(clientId);
[902]221      if (client == null) {
[783]222        response.Success = false;
[929]223        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
[902]224        return response;
[783]225      }
[902]226      client.State = State.offline;
[995]227      clientAdapter.Update(client);
[902]228
229      response.Success = true;
[929]230      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
[902]231     
[783]232      return response;
[780]233    }
234
[741]235    #endregion
236  }
237}
Note: See TracBrowser for help on using the repository browser.