[4253] | 1 | #region License Information
|
---|
| 2 | /* HeuristicLab
|
---|
| 3 | * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
|
---|
| 4 | *
|
---|
| 5 | * This file is part of HeuristicLab.
|
---|
| 6 | *
|
---|
| 7 | * HeuristicLab is free software: you can redistribute it and/or modify
|
---|
| 8 | * it under the terms of the GNU General Public License as published by
|
---|
| 9 | * the Free Software Foundation, either version 3 of the License, or
|
---|
| 10 | * (at your option) any later version.
|
---|
| 11 | *
|
---|
| 12 | * HeuristicLab is distributed in the hope that it will be useful,
|
---|
| 13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of
|
---|
| 14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
---|
| 15 | * GNU General Public License for more details.
|
---|
| 16 | *
|
---|
| 17 | * You should have received a copy of the GNU General Public License
|
---|
| 18 | * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
|
---|
| 19 | */
|
---|
| 20 | #endregion
|
---|
| 21 |
|
---|
| 22 | using System;
|
---|
| 23 | using System.Collections.Generic;
|
---|
| 24 | using System.IO;
|
---|
| 25 | using System.Linq;
|
---|
| 26 | using System.Runtime.Serialization.Formatters.Binary;
|
---|
| 27 | using System.Threading;
|
---|
| 28 | using System.Transactions;
|
---|
| 29 | using HeuristicLab.Hive.Contracts;
|
---|
| 30 | using HeuristicLab.Hive.Contracts.BusinessObjects;
|
---|
| 31 | using HeuristicLab.Hive.Contracts.Interfaces;
|
---|
| 32 | using HeuristicLab.Hive.Server.Core.InternalInterfaces;
|
---|
| 33 | using HeuristicLab.PluginInfrastructure;
|
---|
| 34 | using HeuristicLab.Tracing;
|
---|
[4263] | 35 | using HeuristicLab.Hive.Contracts.ResponseObjects;
|
---|
[4253] | 36 |
|
---|
| 37 | namespace HeuristicLab.Hive.Server.Core {
|
---|
| 38 | /// <summary>
|
---|
| 39 | /// The ClientCommunicator manages the whole communication with the client
|
---|
| 40 | /// </summary>
|
---|
| 41 | public class SlaveCommunicator : ISlaveCommunicator,
|
---|
| 42 | IInternalSlaveCommunicator {
|
---|
| 43 | private static Dictionary<Guid, DateTime> lastHeartbeats = new Dictionary<Guid, DateTime>();
|
---|
| 44 | private static Dictionary<Guid, int> newAssignedJobs = new Dictionary<Guid, int>();
|
---|
| 45 | private static Dictionary<Guid, int> pendingJobs = new Dictionary<Guid, int>();
|
---|
| 46 |
|
---|
| 47 | private static ReaderWriterLockSlim heartbeatLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
|
---|
| 48 |
|
---|
| 49 | //private ISessionFactory factory;
|
---|
| 50 | private ILifecycleManager lifecycleManager;
|
---|
| 51 | private IInternalJobManager jobManager;
|
---|
| 52 | private IScheduler scheduler;
|
---|
| 53 |
|
---|
| 54 | private static int PENDING_TIMEOUT = 100;
|
---|
| 55 |
|
---|
| 56 | /// <summary>
|
---|
| 57 | /// Initialization of the Adapters to the database
|
---|
| 58 | /// Initialization of Eventhandler for the lifecycle management
|
---|
| 59 | /// Initialization of lastHearbeats Dictionary
|
---|
| 60 | /// </summary>
|
---|
| 61 | public SlaveCommunicator() {
|
---|
| 62 | //factory = ServiceLocator.GetSessionFactory();
|
---|
| 63 |
|
---|
| 64 | lifecycleManager = ServiceLocator.GetLifecycleManager();
|
---|
| 65 | jobManager = ServiceLocator.GetJobManager() as IInternalJobManager;
|
---|
| 66 | scheduler = ServiceLocator.GetScheduler();
|
---|
| 67 |
|
---|
| 68 | lifecycleManager.RegisterHeartbeat(new EventHandler(lifecycleManager_OnServerHeartbeat));
|
---|
| 69 | }
|
---|
| 70 |
|
---|
| 71 | /// <summary>
|
---|
| 72 | /// Check if online clients send their hearbeats
|
---|
| 73 | /// if not -> set them offline and check if they where calculating a job
|
---|
| 74 | /// </summary>
|
---|
| 75 | /// <param name="sender"></param>
|
---|
| 76 | /// <param name="e"></param>
|
---|
| 77 | void lifecycleManager_OnServerHeartbeat(object sender, EventArgs e) {
|
---|
| 78 | Logger.Debug("Server Heartbeat ticked");
|
---|
| 79 |
|
---|
| 80 | // [chn] why is transaction management done here
|
---|
| 81 | using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = ApplicationConstants.ISOLATION_LEVEL_SCOPE })) {
|
---|
| 82 | List<ClientDto> allClients = new List<ClientDto>(DaoLocator.ClientDao.FindAll());
|
---|
| 83 |
|
---|
| 84 | foreach (ClientDto client in allClients) {
|
---|
[4264] | 85 | if (client.State != SlaveState.Offline && client.State != SlaveState.NullState) {
|
---|
[4253] | 86 | heartbeatLock.EnterUpgradeableReadLock();
|
---|
| 87 |
|
---|
| 88 | if (!lastHeartbeats.ContainsKey(client.Id)) {
|
---|
| 89 | Logger.Info("Client " + client.Id +
|
---|
| 90 | " wasn't offline but hasn't sent heartbeats - setting offline");
|
---|
[4264] | 91 | client.State = SlaveState.Offline;
|
---|
[4253] | 92 | DaoLocator.ClientDao.Update(client);
|
---|
| 93 | Logger.Info("Client " + client.Id +
|
---|
| 94 | " wasn't offline but hasn't sent heartbeats - Resetting all his jobs");
|
---|
| 95 | foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(client)) {
|
---|
| 96 | //maybe implementa n additional Watchdog? Till then, just set them offline..
|
---|
| 97 | DaoLocator.JobDao.SetJobOffline(job);
|
---|
| 98 | }
|
---|
| 99 | } else {
|
---|
| 100 | DateTime lastHbOfClient = lastHeartbeats[client.Id];
|
---|
| 101 |
|
---|
| 102 | TimeSpan dif = DateTime.Now.Subtract(lastHbOfClient);
|
---|
| 103 | // check if time between last hearbeat and now is greather than HEARTBEAT_MAX_DIF
|
---|
| 104 | if (dif.TotalSeconds > ApplicationConstants.HEARTBEAT_MAX_DIF) {
|
---|
| 105 | // if client calculated jobs, the job must be reset
|
---|
| 106 | Logger.Info("Client timed out and is on RESET");
|
---|
| 107 | foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(client)) {
|
---|
| 108 | DaoLocator.JobDao.SetJobOffline(job);
|
---|
| 109 | lock (newAssignedJobs) {
|
---|
| 110 | if (newAssignedJobs.ContainsKey(job.Id))
|
---|
| 111 | newAssignedJobs.Remove(job.Id);
|
---|
| 112 | }
|
---|
| 113 | }
|
---|
| 114 | Logger.Debug("setting client offline");
|
---|
| 115 | // client must be set offline
|
---|
[4264] | 116 | client.State = SlaveState.Offline;
|
---|
[4253] | 117 |
|
---|
| 118 | //clientAdapter.Update(client);
|
---|
| 119 | DaoLocator.ClientDao.Update(client);
|
---|
| 120 |
|
---|
| 121 | Logger.Debug("removing it from the heartbeats list");
|
---|
| 122 | heartbeatLock.EnterWriteLock();
|
---|
| 123 | lastHeartbeats.Remove(client.Id);
|
---|
| 124 | heartbeatLock.ExitWriteLock();
|
---|
| 125 | }
|
---|
| 126 | }
|
---|
| 127 |
|
---|
| 128 | heartbeatLock.ExitUpgradeableReadLock();
|
---|
| 129 | } else {
|
---|
| 130 | //TODO: RLY neccesary?
|
---|
| 131 | //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Shouldn't have offline or nullstate, has " + client.State);
|
---|
| 132 | heartbeatLock.EnterWriteLock();
|
---|
| 133 | //HiveLogger.Info(this.ToString() + ": Client " + client.Id + " has wrong state: Resetting all his jobs");
|
---|
| 134 | if (lastHeartbeats.ContainsKey(client.Id))
|
---|
| 135 | lastHeartbeats.Remove(client.Id);
|
---|
| 136 | foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfSlave(client)) {
|
---|
| 137 | DaoLocator.JobDao.SetJobOffline(job);
|
---|
| 138 | }
|
---|
| 139 | heartbeatLock.ExitWriteLock();
|
---|
| 140 | }
|
---|
| 141 | }
|
---|
| 142 | CheckForPendingJobs();
|
---|
| 143 | // DaoLocator.DestroyContext();
|
---|
| 144 | scope.Complete();
|
---|
| 145 | }
|
---|
| 146 | }
|
---|
| 147 |
|
---|
| 148 | private void CheckForPendingJobs() {
|
---|
[4264] | 149 | IList<JobDto> pendingJobsInDB = new List<JobDto>(DaoLocator.JobDao.GetJobsByState(JobState.Pending));
|
---|
[4253] | 150 |
|
---|
| 151 | foreach (JobDto currJob in pendingJobsInDB) {
|
---|
| 152 | lock (pendingJobs) {
|
---|
| 153 | if (pendingJobs.ContainsKey(currJob.Id)) {
|
---|
| 154 | if (pendingJobs[currJob.Id] <= 0) {
|
---|
[4264] | 155 | currJob.State = JobState.Offline;
|
---|
[4253] | 156 | DaoLocator.JobDao.Update(currJob);
|
---|
| 157 | } else {
|
---|
| 158 | pendingJobs[currJob.Id]--;
|
---|
| 159 | }
|
---|
| 160 | }
|
---|
| 161 | }
|
---|
| 162 | }
|
---|
| 163 | }
|
---|
| 164 |
|
---|
| 165 | #region IClientCommunicator Members
|
---|
| 166 |
|
---|
| 167 | /// <summary>
|
---|
| 168 | /// Login process for the client
|
---|
| 169 | /// A hearbeat entry is created as well (login is the first hearbeat)
|
---|
| 170 | /// </summary>
|
---|
[4263] | 171 | /// <param name="slaveInfo"></param>
|
---|
[4253] | 172 | /// <returns></returns>
|
---|
[4263] | 173 | public Response Login(ClientDto slaveInfo) {
|
---|
[4253] | 174 | Response response = new Response();
|
---|
| 175 |
|
---|
| 176 | heartbeatLock.EnterWriteLock();
|
---|
[4263] | 177 | if (lastHeartbeats.ContainsKey(slaveInfo.Id)) {
|
---|
| 178 | lastHeartbeats[slaveInfo.Id] = DateTime.Now;
|
---|
[4253] | 179 | } else {
|
---|
[4263] | 180 | lastHeartbeats.Add(slaveInfo.Id, DateTime.Now);
|
---|
[4253] | 181 | }
|
---|
| 182 | heartbeatLock.ExitWriteLock();
|
---|
| 183 |
|
---|
[4263] | 184 | ClientDto dbClient = DaoLocator.ClientDao.FindById(slaveInfo.Id);
|
---|
[4253] | 185 |
|
---|
| 186 | //Really set offline?
|
---|
| 187 | //Reconnect issues with the currently calculating jobs
|
---|
[4264] | 188 | slaveInfo.State = SlaveState.Idle;
|
---|
[4263] | 189 | slaveInfo.CalendarSyncStatus = dbClient != null ? dbClient.CalendarSyncStatus : CalendarState.NotAllowedToFetch;
|
---|
[4253] | 190 |
|
---|
| 191 | if (dbClient == null)
|
---|
[4263] | 192 | DaoLocator.ClientDao.Insert(slaveInfo);
|
---|
[4253] | 193 | else
|
---|
[4263] | 194 | DaoLocator.ClientDao.Update(slaveInfo);
|
---|
| 195 |
|
---|
[4253] | 196 | return response;
|
---|
| 197 | }
|
---|
| 198 |
|
---|
| 199 | public ResponseCalendar GetCalendar(Guid clientId) {
|
---|
| 200 | ResponseCalendar response = new ResponseCalendar();
|
---|
| 201 |
|
---|
| 202 | ClientDto client = DaoLocator.ClientDao.FindById(clientId);
|
---|
| 203 | if (client == null) {
|
---|
[4263] | 204 | //response.Success = false;
|
---|
| 205 | response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
|
---|
[4253] | 206 | return response;
|
---|
| 207 | }
|
---|
| 208 |
|
---|
| 209 | response.ForceFetch = (client.CalendarSyncStatus == CalendarState.ForceFetch);
|
---|
| 210 |
|
---|
| 211 | IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForClient(client);
|
---|
| 212 | if (appointments.Count() == 0) {
|
---|
[4263] | 213 | response.StatusMessage = ResponseStatus.GetCalendar_NoCalendarFound;
|
---|
| 214 | //response.Success = false;
|
---|
[4253] | 215 | } else {
|
---|
[4263] | 216 | //response.Success = true;
|
---|
[4253] | 217 | response.Appointments = appointments;
|
---|
| 218 | }
|
---|
| 219 |
|
---|
| 220 | client.CalendarSyncStatus = CalendarState.Fetched;
|
---|
| 221 | DaoLocator.ClientDao.Update(client);
|
---|
| 222 | return response;
|
---|
| 223 | }
|
---|
| 224 |
|
---|
| 225 | public Response SetCalendarStatus(Guid clientId, CalendarState state) {
|
---|
| 226 | Response response = new Response();
|
---|
| 227 | ClientDto client = DaoLocator.ClientDao.FindById(clientId);
|
---|
| 228 | if (client == null) {
|
---|
[4263] | 229 | //response.Success = false;
|
---|
| 230 | response.StatusMessage = ResponseStatus.GetCalendar_ResourceNotFound;
|
---|
[4253] | 231 | return response;
|
---|
| 232 | }
|
---|
| 233 |
|
---|
| 234 | client.CalendarSyncStatus = state;
|
---|
| 235 | DaoLocator.ClientDao.Update(client);
|
---|
| 236 |
|
---|
| 237 | return response;
|
---|
| 238 | }
|
---|
| 239 |
|
---|
| 240 | /// <summary>
|
---|
| 241 | /// The client has to send regulary heartbeats
|
---|
| 242 | /// this hearbeats will be stored in the heartbeats dictionary
|
---|
| 243 | /// check if there is work for the client and send the client a response if he should pull a job
|
---|
| 244 | /// </summary>
|
---|
| 245 | /// <param name="hbData"></param>
|
---|
| 246 | /// <returns></returns>
|
---|
[4254] | 247 | public ResponseHeartBeat ProcessHeartBeat(HeartBeatData hbData) {
|
---|
[4253] | 248 | Logger.Debug("BEGIN Processing Heartbeat for Client " + hbData.SlaveId);
|
---|
| 249 |
|
---|
[4254] | 250 | ResponseHeartBeat response = new ResponseHeartBeat();
|
---|
[4253] | 251 | response.ActionRequest = new List<MessageContainer>();
|
---|
| 252 |
|
---|
| 253 | Logger.Debug("BEGIN Started Client Fetching");
|
---|
| 254 | ClientDto client = DaoLocator.ClientDao.FindById(hbData.SlaveId);
|
---|
| 255 | Logger.Debug("END Finished Client Fetching");
|
---|
| 256 | // check if the client is logged in
|
---|
[4264] | 257 | if (client.State == SlaveState.Offline || client.State == SlaveState.NullState) {
|
---|
[4263] | 258 | // response.Success = false;
|
---|
| 259 | response.StatusMessage = ResponseStatus.ProcessHeartBeat_UserNotLoggedIn;
|
---|
[4253] | 260 | response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
|
---|
| 261 |
|
---|
| 262 | Logger.Error("ProcessHeartBeat: Client state null or offline: " + client);
|
---|
| 263 |
|
---|
| 264 | return response;
|
---|
| 265 | }
|
---|
| 266 |
|
---|
| 267 | client.NrOfFreeCores = hbData.FreeCores;
|
---|
| 268 | client.FreeMemory = hbData.FreeMemory;
|
---|
| 269 |
|
---|
| 270 | // save timestamp of this heartbeat
|
---|
| 271 | Logger.Debug("BEGIN Locking for Heartbeats");
|
---|
| 272 | heartbeatLock.EnterWriteLock();
|
---|
| 273 | Logger.Debug("END Locked for Heartbeats");
|
---|
| 274 | if (lastHeartbeats.ContainsKey(hbData.SlaveId)) {
|
---|
| 275 | lastHeartbeats[hbData.SlaveId] = DateTime.Now;
|
---|
| 276 | } else {
|
---|
| 277 | lastHeartbeats.Add(hbData.SlaveId, DateTime.Now);
|
---|
| 278 | }
|
---|
| 279 | heartbeatLock.ExitWriteLock();
|
---|
| 280 |
|
---|
| 281 | Logger.Debug("BEGIN Processing Heartbeat Jobs");
|
---|
| 282 | ProcessJobProcess(hbData, response);
|
---|
| 283 | Logger.Debug("END Processed Heartbeat Jobs");
|
---|
| 284 |
|
---|
| 285 | //check if new Cal must be loaded
|
---|
| 286 | if (client.CalendarSyncStatus == CalendarState.Fetch || client.CalendarSyncStatus == CalendarState.ForceFetch) {
|
---|
| 287 | response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar));
|
---|
| 288 |
|
---|
| 289 | //client.CalendarSyncStatus = CalendarState.Fetching;
|
---|
| 290 |
|
---|
| 291 | Logger.Info("fetch or forcefetch sent");
|
---|
| 292 | }
|
---|
| 293 |
|
---|
| 294 | // check if client has a free core for a new job
|
---|
| 295 | // if true, ask scheduler for a new job for this client
|
---|
| 296 | Logger.Debug(" BEGIN Looking for Client Jobs");
|
---|
| 297 | if (hbData.FreeCores > 0 && scheduler.ExistsJobForSlave(hbData)) {
|
---|
| 298 | response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchJob));
|
---|
| 299 | } else {
|
---|
| 300 | response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.NoMessage));
|
---|
| 301 | }
|
---|
| 302 | Logger.Debug(" END Looked for Client Jobs");
|
---|
| 303 |
|
---|
| 304 | DaoLocator.ClientDao.Update(client);
|
---|
| 305 |
|
---|
| 306 | //tx.Commit();
|
---|
| 307 | Logger.Debug(" END Processed Heartbeat for Client " + hbData.SlaveId);
|
---|
| 308 | return response;
|
---|
| 309 | }
|
---|
| 310 |
|
---|
| 311 | /// <summary>
|
---|
| 312 | /// Process the Job progress sent by a client
|
---|
| 313 | /// [chn] this method needs to be refactored, because its a performance hog
|
---|
| 314 | ///
|
---|
| 315 | /// what it does:
|
---|
| 316 | /// (1) find out if the jobs that should be calculated by this client (from db) and compare if they are consistent with what the joblist the client sent
|
---|
| 317 | /// (2) find out if every job from the joblist really should be calculated by this client
|
---|
| 318 | /// (3) checks if a job should be aborted and issues Message
|
---|
| 319 | /// (4) update job-progress and write to db
|
---|
| 320 | /// (5) if snapshot is requested, issue Message
|
---|
| 321 | ///
|
---|
| 322 | /// (6) for each job from DB, check if there is a job from client (again).
|
---|
| 323 | /// (7) if job matches, it is removed from newAssigneJobs
|
---|
| 324 | /// (8) if job !matches, job's TTL is reduced by 1,
|
---|
| 325 | /// (9) if TTL==0, job is set to Abort (save to DB), and Message to Abort job is issued to client
|
---|
| 326 | ///
|
---|
| 327 | ///
|
---|
| 328 | ///
|
---|
| 329 | /// quirks:
|
---|
| 330 | /// (1) the response-object is modified during the foreach-loop (only last element counts)
|
---|
| 331 | /// (2) state Abort results in Finished. This should be: AbortRequested, Aborted.
|
---|
| 332 | /// </summary>
|
---|
| 333 | /// <param name="hbData"></param>
|
---|
| 334 | /// <param name="jobAdapter"></param>
|
---|
| 335 | /// <param name="clientAdapter"></param>
|
---|
| 336 | /// <param name="response"></param>
|
---|
[4254] | 337 | private void ProcessJobProcess(HeartBeatData hbData, ResponseHeartBeat response) {
|
---|
[4253] | 338 | Logger.Debug("Started for Client " + hbData.SlaveId);
|
---|
| 339 | List<JobDto> jobsOfClient = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfSlave(DaoLocator.ClientDao.FindById(hbData.SlaveId)));
|
---|
| 340 | if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) {
|
---|
| 341 | if (jobsOfClient == null || jobsOfClient.Count == 0) {
|
---|
[4263] | 342 | //response.Success = false;
|
---|
| 343 | //response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
|
---|
[4253] | 344 |
|
---|
| 345 | foreach (Guid jobId in hbData.JobProgress.Keys) {
|
---|
| 346 | response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, jobId));
|
---|
| 347 | }
|
---|
| 348 |
|
---|
| 349 | Logger.Error("There is no job calculated by this user " + hbData.SlaveId + ", advise him to abort all");
|
---|
| 350 | return;
|
---|
| 351 | }
|
---|
| 352 |
|
---|
| 353 | foreach (KeyValuePair<Guid, double> jobProgress in hbData.JobProgress) {
|
---|
| 354 | JobDto curJob = DaoLocator.JobDao.FindById(jobProgress.Key);
|
---|
| 355 | curJob.Client = DaoLocator.ClientDao.GetClientForJob(curJob.Id);
|
---|
| 356 | if (curJob.Client == null || curJob.Client.Id != hbData.SlaveId) {
|
---|
[4263] | 357 | //response.Success = false;
|
---|
| 358 | //response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_JOB_IS_NOT_BEEING_CALCULATED;
|
---|
[4253] | 359 | response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
|
---|
| 360 | Logger.Error("There is no job calculated by this user " + hbData.SlaveId + " Job: " + curJob);
|
---|
[4264] | 361 | } else if (curJob.State == JobState.Aborted) {
|
---|
[4253] | 362 | // a request to abort the job has been set
|
---|
| 363 | response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, curJob.Id));
|
---|
[4264] | 364 | curJob.State = JobState.Finished;
|
---|
[4253] | 365 | } else {
|
---|
| 366 | // save job progress
|
---|
| 367 | curJob.Percentage = jobProgress.Value;
|
---|
| 368 |
|
---|
[4264] | 369 | if (curJob.State == JobState.SnapshotRequested) {
|
---|
[4253] | 370 | // a request for a snapshot has been set
|
---|
| 371 | response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.RequestSnapshot, curJob.Id));
|
---|
[4264] | 372 | curJob.State = JobState.SnapshotSent;
|
---|
[4253] | 373 | }
|
---|
| 374 | }
|
---|
| 375 | DaoLocator.JobDao.Update(curJob);
|
---|
| 376 | }
|
---|
| 377 | }
|
---|
| 378 | foreach (JobDto currJob in jobsOfClient) {
|
---|
| 379 | bool found = false;
|
---|
| 380 | if (hbData.JobProgress != null) {
|
---|
| 381 | foreach (Guid jobId in hbData.JobProgress.Keys) {
|
---|
| 382 | if (jobId == currJob.Id) {
|
---|
| 383 | found = true;
|
---|
| 384 | break;
|
---|
| 385 | }
|
---|
| 386 | }
|
---|
| 387 | }
|
---|
| 388 | if (!found) {
|
---|
| 389 | lock (newAssignedJobs) {
|
---|
| 390 | if (newAssignedJobs.ContainsKey(currJob.Id)) {
|
---|
| 391 | newAssignedJobs[currJob.Id]--;
|
---|
| 392 | Logger.Error("Job TTL Reduced by one for job: " + currJob + "and is now: " + newAssignedJobs[currJob.Id] + ". User that sucks: " + currJob.Client);
|
---|
| 393 | if (newAssignedJobs[currJob.Id] <= 0) {
|
---|
| 394 | Logger.Error("Job TTL reached Zero, Job gets removed: " + currJob + " and set back to offline. User that sucks: " + currJob.Client);
|
---|
| 395 |
|
---|
[4264] | 396 | currJob.State = JobState.Offline;
|
---|
[4253] | 397 | DaoLocator.JobDao.Update(currJob);
|
---|
| 398 |
|
---|
| 399 | response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.AbortJob, currJob.Id));
|
---|
| 400 |
|
---|
| 401 | newAssignedJobs.Remove(currJob.Id);
|
---|
| 402 | }
|
---|
| 403 | } else {
|
---|
| 404 | Logger.Error("Job ID wasn't with the heartbeats: " + currJob);
|
---|
[4264] | 405 | currJob.State = JobState.Offline;
|
---|
[4253] | 406 | DaoLocator.JobDao.Update(currJob);
|
---|
| 407 | }
|
---|
| 408 | } // lock
|
---|
| 409 | } else {
|
---|
| 410 | lock (newAssignedJobs) {
|
---|
| 411 |
|
---|
| 412 | if (newAssignedJobs.ContainsKey(currJob.Id)) {
|
---|
| 413 | Logger.Info("Job is sending a heart beat, removing it from the newAssignedJobList: " + currJob);
|
---|
| 414 | newAssignedJobs.Remove(currJob.Id);
|
---|
| 415 | }
|
---|
| 416 | }
|
---|
| 417 | }
|
---|
| 418 | }
|
---|
| 419 | }
|
---|
| 420 |
|
---|
| 421 | /// <summary>
|
---|
| 422 | /// if the client was told to pull a job he calls this method
|
---|
| 423 | /// the server selects a job and sends it to the client
|
---|
| 424 | /// </summary>
|
---|
| 425 | /// <param name="clientId"></param>
|
---|
| 426 | /// <returns></returns>
|
---|
[4254] | 427 | public ResponseObject<JobDto> GetJob(Guid clientId) {
|
---|
| 428 | ResponseObject<JobDto> response = new ResponseObject<JobDto>();
|
---|
[4253] | 429 |
|
---|
| 430 | JobDto job2Calculate = scheduler.GetNextJobForSlave(clientId);
|
---|
| 431 | if (job2Calculate != null) {
|
---|
[4254] | 432 | response.Obj = job2Calculate;
|
---|
| 433 | response.Obj.PluginsNeeded = DaoLocator.PluginInfoDao.GetPluginDependenciesForJob(response.Obj);
|
---|
[4263] | 434 |
|
---|
[4253] | 435 | Logger.Info("Job pulled: " + job2Calculate + " for user " + clientId);
|
---|
| 436 | lock (newAssignedJobs) {
|
---|
| 437 | if (!newAssignedJobs.ContainsKey(job2Calculate.Id))
|
---|
| 438 | newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE);
|
---|
| 439 | }
|
---|
| 440 | } else {
|
---|
[4263] | 441 | //response.Success = false;
|
---|
[4254] | 442 | response.Obj = null;
|
---|
[4263] | 443 | response.StatusMessage = ResponseStatus.GetJob_NoJobsAvailable;
|
---|
[4253] | 444 | Logger.Info("No more Jobs left for " + clientId);
|
---|
| 445 | }
|
---|
| 446 | return response;
|
---|
| 447 | }
|
---|
| 448 |
|
---|
| 449 | public ResponseResultReceived ProcessJobResult(Stream stream, bool finished) {
|
---|
| 450 | Logger.Info("BEGIN Job received for Storage - main method:");
|
---|
| 451 |
|
---|
| 452 | //Stream jobResultStream = null;
|
---|
| 453 | //Stream jobStream = null;
|
---|
| 454 |
|
---|
| 455 | //try {
|
---|
| 456 | BinaryFormatter formatter = new BinaryFormatter();
|
---|
| 457 |
|
---|
| 458 | JobResult result = (JobResult)formatter.Deserialize(stream);
|
---|
| 459 |
|
---|
| 460 | //important - repeatable read isolation level is required here,
|
---|
| 461 | //otherwise race conditions could occur when writing the stream into the DB
|
---|
| 462 | //just removed TransactionIsolationLevel.RepeatableRead
|
---|
| 463 | //tx = session.BeginTransaction();
|
---|
| 464 |
|
---|
| 465 | ResponseResultReceived response = ProcessJobResult(result.ClientId, result.JobId, new byte[] { }, result.Percentage, result.Exception, finished);
|
---|
| 466 |
|
---|
[4263] | 467 | if (response.StatusMessage == ResponseStatus.Ok) {
|
---|
[4253] | 468 | Logger.Debug("Trying to aquire WCF Job Stream");
|
---|
| 469 | //jobStream = DaoLocator.JobDao.GetSerializedJobStream(result.JobId);
|
---|
| 470 | //Logger.Debug("Job Stream Aquired");
|
---|
| 471 | byte[] buffer = new byte[3024];
|
---|
| 472 | List<byte> serializedJob = new List<byte>();
|
---|
| 473 | int read = 0;
|
---|
| 474 | int i = 0;
|
---|
| 475 | while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
|
---|
| 476 | for (int j = 0; j < read; j++) {
|
---|
| 477 | serializedJob.Add(buffer[j]);
|
---|
| 478 | }
|
---|
| 479 | if (i % 100 == 0)
|
---|
| 480 | Logger.Debug("Writing to stream: " + i);
|
---|
| 481 | //jobStream.Write(buffer, 0, read);
|
---|
| 482 | i++;
|
---|
| 483 | }
|
---|
| 484 | Logger.Debug("Done Writing, closing the stream!");
|
---|
| 485 | //jobStream.Close();
|
---|
| 486 |
|
---|
| 487 | DaoLocator.JobDao.SetBinaryJobFile(result.JobId, serializedJob.ToArray());
|
---|
| 488 | }
|
---|
| 489 | Logger.Info("END Job received for Storage:");
|
---|
| 490 | stream.Dispose();
|
---|
| 491 | return response;
|
---|
| 492 | }
|
---|
| 493 |
|
---|
[4263] | 494 | private ResponseResultReceived ProcessJobResult(Guid clientId, Guid jobId, byte[] result, double? percentage, string exception, bool finished) {
|
---|
[4253] | 495 | Logger.Info("BEGIN Job received for Storage - SUB method: " + jobId);
|
---|
| 496 |
|
---|
| 497 | ResponseResultReceived response = new ResponseResultReceived();
|
---|
| 498 | ClientDto client = DaoLocator.ClientDao.FindById(clientId);
|
---|
| 499 |
|
---|
| 500 | SerializedJob job = new SerializedJob();
|
---|
| 501 |
|
---|
| 502 | if (job != null) {
|
---|
| 503 | job.JobInfo = DaoLocator.JobDao.FindById(jobId);
|
---|
| 504 | if (job.JobInfo != null) {
|
---|
| 505 | job.JobInfo.Client = job.JobInfo.Client = DaoLocator.ClientDao.GetClientForJob(jobId);
|
---|
| 506 | }
|
---|
| 507 | }
|
---|
[4263] | 508 |
|
---|
[4253] | 509 | if (job != null && job.JobInfo == null) {
|
---|
[4263] | 510 | //response.Success = false;
|
---|
| 511 | response.StatusMessage = ResponseStatus.ProcessJobResult_JobDoesNotExist;
|
---|
[4253] | 512 | response.JobId = jobId;
|
---|
| 513 | Logger.Error("No job with Id " + jobId);
|
---|
| 514 |
|
---|
| 515 | //tx.Rollback();
|
---|
| 516 | return response;
|
---|
| 517 | }
|
---|
[4264] | 518 | if (job.JobInfo.State == JobState.Aborted) {
|
---|
[4263] | 519 | //response.Success = false;
|
---|
| 520 | response.StatusMessage = ResponseStatus.ProcessJobResult_JobAborted;
|
---|
[4253] | 521 |
|
---|
| 522 | Logger.Error("Job was aborted! " + job.JobInfo);
|
---|
| 523 |
|
---|
| 524 | //tx.Rollback();
|
---|
| 525 | return response;
|
---|
| 526 | }
|
---|
| 527 | if (job.JobInfo.Client == null) {
|
---|
[4263] | 528 | //response.Success = false;
|
---|
| 529 | response.StatusMessage = ResponseStatus.ProcessJobResult_JobIsNotBeeingCalculated;
|
---|
[4253] | 530 | response.JobId = jobId;
|
---|
| 531 |
|
---|
| 532 | Logger.Error("Job is not being calculated (client = null)! " + job.JobInfo);
|
---|
| 533 |
|
---|
| 534 | //tx.Rollback();
|
---|
| 535 | return response;
|
---|
| 536 | }
|
---|
| 537 | if (job.JobInfo.Client.Id != clientId) {
|
---|
[4263] | 538 | //response.Success = false;
|
---|
| 539 | response.StatusMessage = ResponseStatus.ProcessJobResult_WrongClientForJob;
|
---|
[4253] | 540 | response.JobId = jobId;
|
---|
| 541 |
|
---|
| 542 | Logger.Error("Wrong Client for this Job! " + job.JobInfo + ", Sending Client is: " + clientId);
|
---|
| 543 |
|
---|
| 544 | //tx.Rollback();
|
---|
| 545 | return response;
|
---|
| 546 | }
|
---|
[4264] | 547 | if (job.JobInfo.State == JobState.Finished) {
|
---|
[4263] | 548 | response.StatusMessage = ResponseStatus.Ok;
|
---|
[4253] | 549 | response.JobId = jobId;
|
---|
| 550 |
|
---|
| 551 | Logger.Error("Job already finished! " + job.JobInfo + ", Sending Client is: " + clientId);
|
---|
| 552 |
|
---|
| 553 | //tx.Rollback();
|
---|
| 554 | return response;
|
---|
| 555 | }
|
---|
| 556 | //Todo: RequestsnapshotSent => calculating?
|
---|
[4264] | 557 | if (job.JobInfo.State == JobState.SnapshotSent) {
|
---|
| 558 | job.JobInfo.State = JobState.Calculating;
|
---|
[4253] | 559 | }
|
---|
[4264] | 560 | if (job.JobInfo.State != JobState.Calculating && job.JobInfo.State != JobState.Pending) {
|
---|
[4263] | 561 | //response.Success = false;
|
---|
| 562 | response.StatusMessage = ResponseStatus.ProcessJobResult_InvalidJobState;
|
---|
[4253] | 563 | response.JobId = jobId;
|
---|
| 564 |
|
---|
| 565 | Logger.Error("Wrong Job State, job is: " + job.JobInfo);
|
---|
| 566 |
|
---|
| 567 | //tx.Rollback();
|
---|
| 568 | return response;
|
---|
| 569 | }
|
---|
| 570 | job.JobInfo.Percentage = percentage;
|
---|
| 571 |
|
---|
| 572 | if (!string.IsNullOrEmpty(exception)) {
|
---|
[4264] | 573 | job.JobInfo.State = JobState.Failed;
|
---|
[4253] | 574 | job.JobInfo.Exception = exception;
|
---|
| 575 | job.JobInfo.DateFinished = DateTime.Now;
|
---|
| 576 | } else if (finished) {
|
---|
[4264] | 577 | job.JobInfo.State = JobState.Finished;
|
---|
[4253] | 578 | job.JobInfo.DateFinished = DateTime.Now;
|
---|
| 579 | }
|
---|
| 580 |
|
---|
| 581 | job.SerializedJobData = result;
|
---|
| 582 |
|
---|
| 583 | DaoLocator.JobDao.Update(job.JobInfo);
|
---|
| 584 |
|
---|
[4263] | 585 | response.StatusMessage = ResponseStatus.Ok;
|
---|
[4253] | 586 | response.JobId = jobId;
|
---|
| 587 | response.Finished = finished;
|
---|
| 588 |
|
---|
| 589 | Logger.Info("END Job received for Storage - SUB method: " + jobId);
|
---|
| 590 | return response;
|
---|
| 591 |
|
---|
| 592 | }
|
---|
| 593 |
|
---|
| 594 | /// <summary>
|
---|
| 595 | /// the client can send job results during calculating
|
---|
| 596 | /// and will send a final job result when he finished calculating
|
---|
| 597 | /// these job results will be stored in the database
|
---|
| 598 | /// </summary>
|
---|
| 599 | /// <param name="clientId"></param>
|
---|
| 600 | /// <param name="jobId"></param>
|
---|
| 601 | /// <param name="result"></param>
|
---|
| 602 | /// <param name="exception"></param>
|
---|
| 603 | /// <param name="finished"></param>
|
---|
| 604 | /// <returns></returns>
|
---|
| 605 | public ResponseResultReceived StoreFinishedJobResult(Guid clientId,
|
---|
| 606 | Guid jobId,
|
---|
| 607 | byte[] result,
|
---|
| 608 | double percentage,
|
---|
| 609 | string exception) {
|
---|
| 610 |
|
---|
| 611 | return ProcessJobResult(clientId, jobId, result, percentage, exception, true);
|
---|
| 612 | }
|
---|
| 613 |
|
---|
| 614 | public ResponseResultReceived ProcessSnapshot(Guid clientId, Guid jobId, byte[] result, double percentage, string exception) {
|
---|
| 615 | return ProcessJobResult(clientId, jobId, result, percentage, exception, false);
|
---|
| 616 | }
|
---|
| 617 |
|
---|
| 618 | /// <summary>
|
---|
| 619 | /// when a client logs out the state will be set
|
---|
| 620 | /// and the entry in the last hearbeats dictionary will be removed
|
---|
| 621 | /// </summary>
|
---|
| 622 | /// <param name="clientId"></param>
|
---|
| 623 | /// <returns></returns>
|
---|
| 624 | public Response Logout(Guid clientId) {
|
---|
| 625 | Logger.Info("Client logged out " + clientId);
|
---|
| 626 |
|
---|
| 627 | Response response = new Response();
|
---|
| 628 |
|
---|
| 629 | heartbeatLock.EnterWriteLock();
|
---|
| 630 | if (lastHeartbeats.ContainsKey(clientId))
|
---|
| 631 | lastHeartbeats.Remove(clientId);
|
---|
| 632 | heartbeatLock.ExitWriteLock();
|
---|
| 633 |
|
---|
| 634 | ClientDto client = DaoLocator.ClientDao.FindById(clientId);
|
---|
| 635 | if (client == null) {
|
---|
[4263] | 636 | //response.Success = false;
|
---|
| 637 | response.StatusMessage = ResponseStatus.Logout_SlaveNotRegistered;
|
---|
[4253] | 638 | return response;
|
---|
| 639 | }
|
---|
[4264] | 640 | if (client.State == SlaveState.Calculating) {
|
---|
[4253] | 641 | // check wich job the client was calculating and reset it
|
---|
| 642 | IEnumerable<JobDto> jobsOfClient = DaoLocator.JobDao.FindActiveJobsOfSlave(client);
|
---|
| 643 | foreach (JobDto job in jobsOfClient) {
|
---|
[4264] | 644 | if (job.State != JobState.Finished)
|
---|
[4253] | 645 | DaoLocator.JobDao.SetJobOffline(job);
|
---|
| 646 | }
|
---|
| 647 | }
|
---|
| 648 |
|
---|
[4264] | 649 | client.State = SlaveState.Offline;
|
---|
[4253] | 650 | DaoLocator.ClientDao.Update(client);
|
---|
| 651 |
|
---|
| 652 | return response;
|
---|
| 653 | }
|
---|
| 654 |
|
---|
| 655 | /// <summary>
|
---|
| 656 | /// If a client goes offline and restores a job he was calculating
|
---|
| 657 | /// he can ask the client if he still needs the job result
|
---|
| 658 | /// </summary>
|
---|
| 659 | /// <param name="jobId"></param>
|
---|
| 660 | /// <returns></returns>
|
---|
| 661 | public Response IsJobStillNeeded(Guid jobId) {
|
---|
| 662 | Response response = new Response();
|
---|
| 663 | JobDto job = DaoLocator.JobDao.FindById(jobId);
|
---|
| 664 | if (job == null) {
|
---|
[4263] | 665 | //response.Success = false;
|
---|
| 666 | response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobDoesNotExist;
|
---|
[4253] | 667 | Logger.Error("Job doesn't exist (anymore)! " + jobId);
|
---|
| 668 | return response;
|
---|
| 669 | }
|
---|
[4264] | 670 | if (job.State == JobState.Finished) {
|
---|
[4263] | 671 | //response.Success = true;
|
---|
| 672 | response.StatusMessage = ResponseStatus.IsJobStillNeeded_JobAlreadyFinished;
|
---|
[4253] | 673 | Logger.Error("already finished! " + job);
|
---|
| 674 | return response;
|
---|
| 675 | }
|
---|
[4264] | 676 | job.State = JobState.Pending;
|
---|
[4253] | 677 | lock (pendingJobs) {
|
---|
| 678 | pendingJobs.Add(job.Id, PENDING_TIMEOUT);
|
---|
| 679 | }
|
---|
| 680 |
|
---|
| 681 | DaoLocator.JobDao.Update(job);
|
---|
| 682 |
|
---|
| 683 | return response;
|
---|
| 684 | }
|
---|
| 685 |
|
---|
[4263] | 686 | public ResponseList<CachedHivePluginInfoDto> GetPlugins(List<HivePluginInfoDto> pluginList) {
|
---|
[4254] | 687 | ResponseList<CachedHivePluginInfoDto> response = new ResponseList<CachedHivePluginInfoDto>();
|
---|
[4263] | 688 | response.List = new List<CachedHivePluginInfoDto>();
|
---|
[4253] | 689 | foreach (HivePluginInfoDto pluginInfo in pluginList) {
|
---|
| 690 | if (pluginInfo.Update) {
|
---|
| 691 | //check if there is a newer version
|
---|
[4263] | 692 | IPluginDescription ipd = ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision > pluginInfo.Version.Revision).SingleOrDefault();
|
---|
[4253] | 693 | if (ipd != null) {
|
---|
[4263] | 694 | response.List.Add(ConvertPluginDescriptorToDto(ipd));
|
---|
[4253] | 695 | }
|
---|
| 696 | } else {
|
---|
[4263] | 697 | IPluginDescription ipd = ApplicationManager.Manager.Plugins.Where(pd => pd.Name == pluginInfo.Name && pd.Version.Major == pluginInfo.Version.Major && pd.Version.Minor == pluginInfo.Version.Minor && pd.Version.Revision >= pluginInfo.Version.Revision).SingleOrDefault();
|
---|
[4253] | 698 | if (ipd != null) {
|
---|
[4263] | 699 | response.List.Add(ConvertPluginDescriptorToDto(ipd));
|
---|
[4253] | 700 | } else {
|
---|
[4263] | 701 | //response.Success = false;
|
---|
| 702 | response.StatusMessage = ResponseStatus.GetPlugins_PluginsNotAvailable;
|
---|
[4253] | 703 | return response;
|
---|
| 704 | }
|
---|
| 705 | }
|
---|
| 706 | }
|
---|
| 707 | return response;
|
---|
| 708 | }
|
---|
| 709 |
|
---|
[4263] | 710 | private CachedHivePluginInfoDto ConvertPluginDescriptorToDto(IPluginDescription currPlugin) {
|
---|
[4253] | 711 | CachedHivePluginInfoDto currCachedPlugin = new CachedHivePluginInfoDto {
|
---|
| 712 | Name = currPlugin.Name,
|
---|
| 713 | Version = currPlugin.Version
|
---|
| 714 | };
|
---|
| 715 |
|
---|
| 716 | foreach (string fileName in from file in currPlugin.Files select file.Name) {
|
---|
| 717 | currCachedPlugin.PluginFiles.Add(new HivePluginFile(File.ReadAllBytes(fileName), fileName));
|
---|
| 718 | }
|
---|
| 719 | return currCachedPlugin;
|
---|
| 720 | }
|
---|
| 721 |
|
---|
| 722 | #endregion
|
---|
| 723 | }
|
---|
| 724 | }
|
---|