#region License Information /* HeuristicLab * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Text; using HeuristicLab.Hive.Contracts.BusinessObjects; using HeuristicLab.Hive.Contracts.Interfaces; using HeuristicLab.Hive.Contracts; using HeuristicLab.Core; using HeuristicLab.Hive.Server.DataAccess; using System.Resources; using System.Reflection; using HeuristicLab.Hive.JobBase; using HeuristicLab.Hive.Server.Core.InternalInterfaces; using System.Threading; using HeuristicLab.PluginInfrastructure; using HeuristicLab.DataAccess.Interfaces; namespace HeuristicLab.Hive.Server.Core { /// /// The ClientCommunicator manages the whole communication with the client /// public class ClientCommunicator: IClientCommunicator { private static Dictionary lastHeartbeats = new Dictionary(); private static ReaderWriterLockSlim heartbeatLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); private ISessionFactory factory; private ILifecycleManager lifecycleManager; private IInternalJobManager jobManager; private IScheduler scheduler; /// /// Initialization of the Adapters to the database /// Initialization of Eventhandler for the lifecycle management /// Initialization of lastHearbeats Dictionary /// public ClientCommunicator() { factory = ServiceLocator.GetSessionFactory(); lifecycleManager = ServiceLocator.GetLifecycleManager(); jobManager = ServiceLocator.GetJobManager() as IInternalJobManager; scheduler = ServiceLocator.GetScheduler(); lifecycleManager.RegisterHeartbeat( new EventHandler(lifecycleManager_OnServerHeartbeat)); } /// /// Check if online clients send their hearbeats /// if not -> set them offline and check if they where calculating a job /// /// /// void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) { ISession session = factory.GetSessionForCurrentThread(); ITransaction tx = null; try { IClientAdapter clientAdapter = session.GetDataAdapter(); IJobAdapter jobAdapter = session.GetDataAdapter(); tx = session.BeginTransaction(); List allClients = new List(clientAdapter.GetAll()); foreach (ClientInfo client in allClients) { if (client.State != State.offline && client.State != State.nullState) { heartbeatLock.EnterUpgradeableReadLock(); if (!lastHeartbeats.ContainsKey(client.Id)) { client.State = State.offline; clientAdapter.Update(client); foreach (Job job in jobAdapter.GetActiveJobsOf(client)) { jobManager.ResetJobsDependingOnResults(job); } } else { DateTime lastHbOfClient = lastHeartbeats[client.Id]; TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient); // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) { // if client calculated jobs, the job must be reset foreach (Job job in jobAdapter.GetActiveJobsOf(client)) { jobManager.ResetJobsDependingOnResults(job); } // client must be set offline client.State = State.offline; clientAdapter.Update(client); heartbeatLock.EnterWriteLock(); lastHeartbeats.Remove(client.Id); heartbeatLock.ExitWriteLock(); } } heartbeatLock.ExitUpgradeableReadLock(); } else { heartbeatLock.EnterWriteLock(); if (lastHeartbeats.ContainsKey(client.Id)) lastHeartbeats.Remove(client.Id); heartbeatLock.ExitWriteLock(); } } tx.Commit(); } catch (Exception ex) { if (tx != null) tx.Rollback(); throw ex; } finally { if (session != null) session.EndSession(); } } #region IClientCommunicator Members /// /// Login process for the client /// A hearbeat entry is created as well (login is the first hearbeat) /// /// /// public Response Login(ClientInfo clientInfo) { ISession session = factory.GetSessionForCurrentThread(); ITransaction tx = null; try { IClientAdapter clientAdapter = session.GetDataAdapter(); tx = session.BeginTransaction(); Response response = new Response(); heartbeatLock.EnterWriteLock(); if (lastHeartbeats.ContainsKey(clientInfo.Id)) { lastHeartbeats[clientInfo.Id] = DateTime.Now; } else { lastHeartbeats.Add(clientInfo.Id, DateTime.Now); } heartbeatLock.ExitWriteLock(); ClientInfo client = clientAdapter.GetById(clientInfo.Id); if (client != null && client.State != State.offline && client.State != State.nullState) { response.Success = false; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_USER_ALLREADY_ONLINE; return response; } clientInfo.State = State.idle; clientAdapter.Update(clientInfo); response.Success = true; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGIN_SUCCESS; tx.Commit(); return response; } catch (Exception ex) { if (tx != null) tx.Rollback(); throw ex; } finally { if (session != null) session.EndSession(); } } /// /// The client has to send regulary heartbeats /// this hearbeats will be stored in the heartbeats dictionary /// check if there is work for the client and send the client a response if he should pull a job /// /// /// public ResponseHB ProcessHeartBeat(HeartBeatData hbData) { ISession session = factory.GetSessionForCurrentThread(); ITransaction tx = null; try { IClientAdapter clientAdapter = session.GetDataAdapter(); IJobAdapter jobAdapter = session.GetDataAdapter(); tx = session.BeginTransaction(); ResponseHB response = new ResponseHB(); response.ActionRequest = new List(); ClientInfo client = clientAdapter.GetById(hbData.ClientId); // check if the client is logged in if (client.State == State.offline || client.State == State.nullState) { response.Success = false; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_USER_NOT_LOGGED_IN; response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage)); return response; } client.NrOfFreeCores = hbData.FreeCores; client.FreeMemory = hbData.FreeMemory; // save timestamp of this heartbeat heartbeatLock.EnterWriteLock(); if (lastHeartbeats.ContainsKey(hbData.ClientId)) { lastHeartbeats[hbData.ClientId] = DateTime.Now; } else { lastHeartbeats.Add(hbData.ClientId, DateTime.Now); } heartbeatLock.ExitWriteLock(); // check if client has a free core for a new job // if true, ask scheduler for a new job for this client if (hbData.FreeCores > 0 && scheduler.ExistsJobForClient(hbData)) response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob)); else response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage)); response.Success = true; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_HEARTBEAT_RECEIVED; processJobProcess(hbData, jobAdapter, clientAdapter, response); clientAdapter.Update(client); tx.Commit(); return response; } catch (Exception ex) { if (tx != null) tx.Rollback(); throw ex; } finally { if (session != null) session.EndSession(); } } /// /// Process the Job progress sent by a client /// /// /// /// /// private void processJobProcess(HeartBeatData hbData, IJobAdapter jobAdapter, IClientAdapter clientAdapter, ResponseHB response) { if (hbData.JobProgress != null) { List jobsOfClient = new List(jobAdapter.GetActiveJobsOf(clientAdapter.GetById(hbData.ClientId))); if (jobsOfClient == null || jobsOfClient.Count == 0) { response.Success = false; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED; return; } foreach (KeyValuePair jobProgress in hbData.JobProgress) { Job curJob = jobAdapter.GetById(jobProgress.Key); if (curJob.Client == null || curJob.Client.Id != hbData.ClientId) { response.Success = false; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED; } else if (curJob.State == State.abort) { // a request to abort the job has been set response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id)); curJob.State = State.offline; } else { // save job progress curJob.Percentage = jobProgress.Value; jobAdapter.Update(curJob); if (curJob.State == State.requestSnapshot) { // a request for a snapshot has been set response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id)); curJob.State = State.calculating; } } } } } /// /// if the client was told to pull a job he calls this method /// the server selects a job and sends it to the client /// /// /// public ResponseJob SendJob(Guid clientId) { ResponseJob response = new ResponseJob(); Job job2Calculate = scheduler.GetNextJobForClient(clientId); if (job2Calculate != null) { response.Job = job2Calculate; response.Success = true; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_PULLED; } else { response.Success = false; response.Job = null; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOBS_LEFT; } return response; } private ResponseResultReceived ProcessJobResult(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) { ISession session = factory.GetSessionForCurrentThread(); ITransaction tx = null; try { IClientAdapter clientAdapter = session.GetDataAdapter(); IJobAdapter jobAdapter = session.GetDataAdapter(); IJobResultsAdapter jobResultAdapter = session.GetDataAdapter(); tx = session.BeginTransaction(); ResponseResultReceived response = new ResponseResultReceived(); ClientInfo client = clientAdapter.GetById(clientId); Job job = jobAdapter.GetById(jobId); if (job == null) { response.Success = false; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_NO_JOB_WITH_THIS_ID; response.JobId = jobId; return response; } if (job.Client == null) { response.Success = false; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED; response.JobId = jobId; return response; } if (job.Client.Id != clientId) { response.Success = false; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_CLIENT_FOR_JOB; response.JobId = jobId; return response; } if (job.State == State.finished) { response.Success = true; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED; response.JobId = jobId; return response; } if (job.State != State.calculating) { response.Success = false; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_WRONG_JOB_STATE; response.JobId = jobId; return response; } job.SerializedJob = result; job.Percentage = percentage; if (finished) { job.State = State.finished; jobAdapter.Update(job); client.State = State.idle; clientAdapter.Update(client); List jobResults = new List(jobResultAdapter.GetResultsOf(job)); foreach (JobResult currentResult in jobResults) jobResultAdapter.Delete(currentResult); } JobResult jobResult = new JobResult(); jobResult.Client = client; jobResult.Job = job; jobResult.Result = result; jobResult.Percentage = percentage; jobResult.Exception = exception; jobResult.DateFinished = DateTime.Now; jobResultAdapter.Update(jobResult); jobAdapter.Update(job); response.Success = true; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOBRESULT_RECEIVED; response.JobId = jobId; response.finished = finished; tx.Commit(); return response; } catch (Exception ex) { if (tx != null) tx.Rollback(); throw ex; } finally { if (session != null) session.EndSession(); } } /// /// the client can send job results during calculating /// and will send a final job result when he finished calculating /// these job results will be stored in the database /// /// /// /// /// /// /// public ResponseResultReceived StoreFinishedJobResult(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) { return ProcessJobResult(clientId, jobId, result, percentage, exception, true); } public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) { return ProcessJobResult(clientId, jobId, result, percentage, exception, false); } /// /// when a client logs out the state will be set /// and the entry in the last hearbeats dictionary will be removed /// /// /// public Response Logout(Guid clientId) { ISession session = factory.GetSessionForCurrentThread(); ITransaction tx = null; try { IClientAdapter clientAdapter = session.GetDataAdapter(); IJobAdapter jobAdapter = session.GetDataAdapter(); tx = session.BeginTransaction(); Response response = new Response(); heartbeatLock.EnterWriteLock(); if (lastHeartbeats.ContainsKey(clientId)) lastHeartbeats.Remove(clientId); heartbeatLock.ExitWriteLock(); ClientInfo client = clientAdapter.GetById(clientId); if (client == null) { response.Success = false; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_CLIENT_NOT_REGISTERED; return response; } List allJobs = new List(jobAdapter.GetAll()); if (client.State == State.calculating) { // check wich job the client was calculating and reset it foreach (Job job in allJobs) { if (job.Client.Id == client.Id) { jobManager.ResetJobsDependingOnResults(job); } } } client.State = State.offline; clientAdapter.Update(client); response.Success = true; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_LOGOUT_SUCCESS; tx.Commit(); return response; } catch (Exception ex) { if (tx != null) tx.Rollback(); throw ex; } finally { if (session != null) session.EndSession(); } } /// /// If a client goes offline and restores a job he was calculating /// he can ask the client if he still needs the job result /// /// /// public Response IsJobStillNeeded(Guid jobId) { ISession session = factory.GetSessionForCurrentThread(); ITransaction tx = null; try { IJobAdapter jobAdapter = session.GetDataAdapter(); tx = session.BeginTransaction(); Response response = new Response(); Job job = jobAdapter.GetById(jobId); if (job == null) { response.Success = false; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_DOESNT_EXIST; return response; } if (job.State == State.finished) { response.Success = true; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_ALLREADY_FINISHED; return response; } job.State = State.finished; jobAdapter.Update(job); response.Success = true; response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_SEND_JOBRESULT; tx.Commit(); return response; } catch (Exception ex) { if (tx != null) tx.Rollback(); throw ex; } finally { if (session != null) session.EndSession(); } } public ResponsePlugin SendPlugins(List pluginList) { throw new NotImplementedException(); } #endregion } }