source: trunk/sources/HeuristicLab.Hive.Server.Core/ClientCommunicator.cs @ 1120

Last change on this file since 1120 was 1120, checked in by svonolfe, 12 years ago

Added execution engine facade (#465)

File size: 8.8 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using HeuristicLab.Hive.Contracts.BusinessObjects;
6using HeuristicLab.Hive.Contracts.Interfaces;
7using HeuristicLab.Hive.Contracts;
8using HeuristicLab.Core;
9using HeuristicLab.Hive.Server.Core.InternalInterfaces.DataAccess;
10using System.Resources;
11using System.Reflection;
12using HeuristicLab.Hive.JobBase;
13using System.Runtime.CompilerServices;
14
15namespace HeuristicLab.Hive.Server.Core {
16  /// <summary>
17  /// The ClientCommunicator manages the whole communication with the client
18  /// </summary>
19  public class ClientCommunicator: IClientCommunicator {
20    int nrOfJobs = 0;
21    Dictionary<Guid, DateTime> lastHeartbeats =
22      new Dictionary<Guid,DateTime>();
23
24    IClientAdapter clientAdapter;
25    IJobAdapter jobAdapter;
26    IJobResultsAdapter jobResultAdapter;
27    ILifecycleManager lifecycleManager;
28
29    public ClientCommunicator() {
30      clientAdapter = ServiceLocator.GetClientAdapter();
31      jobAdapter = ServiceLocator.GetJobAdapter();
32      jobResultAdapter = ServiceLocator.GetJobResultsAdapter();
33      lifecycleManager = ServiceLocator.GetLifecycleManager();
34
35      lifecycleManager.OnServerHeartbeat +=
36        new EventHandler(lifecycleManager_OnServerHeartbeat);
37
38      for (int i = 0; i < nrOfJobs; i++) {
39        Job job = new Job();
40        job.Id = i;
41        job.State = State.offline;
42        jobAdapter.Update(job);
43      }
44      lastHeartbeats = new Dictionary<Guid, DateTime>();
45
46    }
47
48    [MethodImpl(MethodImplOptions.Synchronized)]
49    void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
50      List<ClientInfo> allClients = new List<ClientInfo>(clientAdapter.GetAll());
51      List<Job> allJobs = new List<Job>(jobAdapter.GetAll());
52
53      foreach (ClientInfo client in allClients) {
54        if (client.State != State.offline && client.State != State.nullState) {
55          if (!lastHeartbeats.ContainsKey(client.ClientId)) {
56            client.State = State.offline;
57            clientAdapter.Update(client);
58          } else {
59            DateTime lastHbOfClient = lastHeartbeats[client.ClientId];
60            TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
61            // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
62            if (dif.Seconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
63              // if client calculated jobs, the job must be reset
64              if (client.State == State.calculating) {
65                // check wich job the client was calculating and reset it
66                foreach (Job job in allJobs) {
67                  if (job.Client.ClientId == client.ClientId) {
68                    // TODO check for job results
69                    job.Client = null;
70                    job.Percentage = 0;
71                    job.State = State.idle;
72                  }
73                }
74              }
75             
76              // client must be set offline
77              client.State = State.offline;
78              clientAdapter.Update(client);
79              lastHeartbeats.Remove(client.ClientId);
80            }
81          }
82        } else {
83          if (lastHeartbeats.ContainsKey(client.ClientId))
84            lastHeartbeats.Remove(client.ClientId);
85        }
86      }
87    }
88
89    #region IClientCommunicator Members
90
91    [MethodImpl(MethodImplOptions.Synchronized)]
92    public Response Login(ClientInfo clientInfo) {
93      Response response = new Response();
94
95      if (lastHeartbeats.ContainsKey(clientInfo.ClientId)) {
96        lastHeartbeats[clientInfo.ClientId] = DateTime.Now;
97      } else {
98        lastHeartbeats.Add(clientInfo.ClientId, DateTime.Now);
99      }
100
101      ICollection<ClientInfo> allClients = clientAdapter.GetAll();
102      ClientInfo client = clientAdapter.GetById(clientInfo.ClientId);
103      if (client != null && client.State != State.offline && client.State != State.nullState) {
104        response.Success = false;
105        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE;
106        return response;
107      }
108      clientInfo.State = State.idle;
109      clientAdapter.Update(clientInfo);
110      response.Success = true;
111      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS;
112
113      return response;
114    }
115
116    [MethodImpl(MethodImplOptions.Synchronized)]
117    public ResponseHB SendHeartBeat(HeartBeatData hbData) {
118      ResponseHB response = new ResponseHB();
119
120      response.ActionRequest = new List<MessageContainer>();
121      if (clientAdapter.GetById(hbData.ClientId).State == State.offline ||
122          clientAdapter.GetById(hbData.ClientId).State == State.nullState) {
123        response.Success = false;
124        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN;
125        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
126        return response;
127      }
128
129      if (lastHeartbeats.ContainsKey(hbData.ClientId)) {
130        lastHeartbeats[hbData.ClientId] = DateTime.Now;
131      } else {
132        lastHeartbeats.Add(hbData.ClientId, DateTime.Now);
133      }
134
135      response.Success = true;
136      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HARDBEAT_RECEIVED;
137      List<Job> allOfflineJobs = new List<Job>(jobAdapter.GetJobsByState(State.offline));
138      if (allOfflineJobs.Count > 0 && hbData.freeCores > 0)
139        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
140      else
141        response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
142
143      return response;
144    }
145
146    [MethodImpl(MethodImplOptions.Synchronized)]
147    public ResponseJob PullJob(Guid clientId) {
148      ResponseJob response = new ResponseJob();
149      lock (this) {
150        LinkedList<Job> allOfflineJobs = new LinkedList<Job>(jobAdapter.GetJobsByState(State.offline));
151        if (allOfflineJobs != null && allOfflineJobs.Count > 0) {
152          Job job2Calculate = allOfflineJobs.First.Value;
153          job2Calculate.State = State.calculating;
154          response.Job = job2Calculate;
155          jobAdapter.Update(job2Calculate);
156          response.Success = true;
157          response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED;
158          return response;
159        }
160      }
161      response.Success = true;
162      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT;
163      return response;
164    }
165
166    [MethodImpl(MethodImplOptions.Synchronized)]
167    public ResponseResultReceived SendJobResult(Guid clientId,
168      long jobId,
169      byte[] result,
170      Exception exception, 
171      bool finished) {
172      ResponseResultReceived response = new ResponseResultReceived();
173      ClientInfo client =
174        clientAdapter.GetById(clientId);
175
176      Job job =
177        jobAdapter.GetById(jobId);
178      if (job == null) {
179        response.Success = false;
180        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JO_WITH_THIS_ID;
181        return response;
182      }
183      if (job.State != State.calculating) {
184        response.Success = false;
185        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE;
186        return response;
187      }
188      if (finished) {
189        job.State = State.finished;
190        jobAdapter.Update(job);
191
192        List<JobResult> jobResults = new List<JobResult>(jobResultAdapter.GetResultsOf(job));
193        foreach (JobResult currentResult in jobResults)
194          jobResultAdapter.Delete(currentResult);
195      }
196
197      JobResult jobResult =
198        new JobResult();
199      jobResult.Client = client;
200      jobResult.Job = job;
201      jobResult.Result = result;
202      jobResult.Exception = exception;
203
204      jobResultAdapter.Update(jobResult);   
205
206      response.Success = true;
207      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED;
208      response.JobId = jobId;
209
210      return response;
211    }
212
213    [MethodImpl(MethodImplOptions.Synchronized)]                       
214    public Response Logout(Guid clientId) {
215      Response response = new Response();
216
217      if (lastHeartbeats.ContainsKey(clientId))
218        lastHeartbeats.Remove(clientId);
219
220      ClientInfo client = clientAdapter.GetById(clientId);
221      if (client == null) {
222        response.Success = false;
223        response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED;
224        return response;
225      }
226      client.State = State.offline;
227      clientAdapter.Update(client);
228
229      response.Success = true;
230      response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS;
231     
232      return response;
233    }
234
235    #endregion
236  }
237}
Note: See TracBrowser for help on using the repository browser.