Changeset 4091 for branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/ClientCommunicator.cs
- Timestamp:
- 07/23/10 09:37:57 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Server.Core/3.3/ClientCommunicator.cs
r4060 r4091 22 22 using System; 23 23 using System.Collections.Generic; 24 using System.IO; 24 25 using System.Linq; 25 using System.Text; 26 using System.Runtime.Serialization.Formatters.Binary; 27 using System.Threading; 28 using System.Transactions; 29 using HeuristicLab.Hive.Contracts; 26 30 using HeuristicLab.Hive.Contracts.BusinessObjects; 27 31 using HeuristicLab.Hive.Contracts.Interfaces; 28 using HeuristicLab.Hive.Contracts;29 using HeuristicLab.Core;30 using HeuristicLab.Hive.Server.DataAccess;31 using System.Resources;32 using System.Reflection;33 using HeuristicLab.Hive.JobBase;34 32 using HeuristicLab.Hive.Server.Core.InternalInterfaces; 35 using System.Threading;36 33 using HeuristicLab.PluginInfrastructure; 37 using HeuristicLab.DataAccess.Interfaces;38 using System.IO;39 using System.Runtime.Serialization.Formatters.Binary;40 34 using HeuristicLab.Tracing; 41 using Linq = HeuristicLab.Hive.Server.LINQDataAccess;42 using System.Transactions;43 using HeuristicLab.Hive.Server.LINQDataAccess;44 35 45 36 namespace HeuristicLab.Hive.Server.Core { … … 75 66 76 67 lifecycleManager = ServiceLocator.GetLifecycleManager(); 77 jobManager = ServiceLocator.GetJobManager() as 78 IInternalJobManager; 68 jobManager = ServiceLocator.GetJobManager() as IInternalJobManager; 79 69 scheduler = ServiceLocator.GetScheduler(); 80 70 81 lifecycleManager.RegisterHeartbeat( 82 new EventHandler(lifecycleManager_OnServerHeartbeat)); 71 lifecycleManager.RegisterHeartbeat(new EventHandler(lifecycleManager_OnServerHeartbeat)); 83 72 } 84 73 … … 109 98 foreach (JobDto job in DaoLocator.JobDao.FindActiveJobsOfClient(client)) { 110 99 //maybe implementa n additional Watchdog? Till then, just set them offline.. 111 DaoLocator.JobDao.SetJobOffline(job); 100 DaoLocator.JobDao.SetJobOffline(job); 112 101 } 113 102 } else { … … 168 157 if (pendingJobs[currJob.Id] <= 0) { 169 158 currJob.State = State.offline; 170 DaoLocator.JobDao.Update(currJob); 159 DaoLocator.JobDao.Update(currJob); 171 160 } else { 172 161 pendingJobs[currJob.Id]--; … … 216 205 217 206 ClientDto client = DaoLocator.ClientDao.FindById(clientId); 218 if (client == null) {207 if (client == null) { 219 208 response.Success = false; 220 209 response.StatusMessage = ApplicationConstants.RESPONSE_CLIENT_RESOURCE_NOT_FOUND; 221 210 return response; 222 211 } 223 212 224 213 response.ForceFetch = (client.CalendarSyncStatus == CalendarState.ForceFetch); 225 214 226 215 IEnumerable<AppointmentDto> appointments = DaoLocator.UptimeCalendarDao.GetCalendarForClient(client); 227 216 if (appointments.Count() == 0) { … … 239 228 240 229 public Response SetCalendarStatus(Guid clientId, CalendarState state) { 241 Response response = new Response(); 230 Response response = new Response(); 242 231 ClientDto client = DaoLocator.ClientDao.FindById(clientId); 243 232 if (client == null) { … … 246 235 return response; 247 236 } 248 237 249 238 client.CalendarSyncStatus = state; 250 239 DaoLocator.ClientDao.Update(client); 251 240 252 241 response.Success = true; 253 242 response.StatusMessage = ApplicationConstants.RESPONSE_UPTIMECALENDAR_STATUS_UPDATED; 254 243 255 244 return response; 256 245 } … … 298 287 299 288 Logger.Debug("BEGIN Processing Heartbeat Jobs"); 300 processJobProcess(hbData, response);289 ProcessJobProcess(hbData, response); 301 290 Logger.Debug("END Processed Heartbeat Jobs"); 302 291 … … 306 295 response.StatusMessage = ApplicationConstants.RESPONSE_COMMUNICATOR_FETCH_OR_FORCEFETCH_CALENDAR; 307 296 response.ActionRequest.Add(new MessageContainer(MessageContainer.MessageType.FetchOrForceFetchCalendar)); 308 297 309 298 //client.CalendarSyncStatus = CalendarState.Fetching; 310 311 Logger.Info("fetch or forcefetch sent"); 299 300 Logger.Info("fetch or forcefetch sent"); 312 301 } 313 302 … … 338 327 /// <param name="clientAdapter"></param> 339 328 /// <param name="response"></param> 340 private void processJobProcess(HeartBeatData hbData, ResponseHB response) {329 private void ProcessJobProcess(HeartBeatData hbData, ResponseHB response) { 341 330 Logger.Debug("Started for Client " + hbData.ClientId); 342 331 List<JobDto> jobsOfClient = new List<JobDto>(DaoLocator.JobDao.FindActiveJobsOfClient(DaoLocator.ClientDao.FindById(hbData.ClientId))); 343 if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) { 332 if (hbData.JobProgress != null && hbData.JobProgress.Count > 0) { 344 333 if (jobsOfClient == null || jobsOfClient.Count == 0) { 345 334 response.Success = false; … … 372 361 DaoLocator.JobDao.Update(curJob); 373 362 } 374 363 } 375 364 foreach (JobDto currJob in jobsOfClient) { 376 365 bool found = false; 377 if (hbData.JobProgress != null) {366 if (hbData.JobProgress != null) { 378 367 foreach (Guid jobId in hbData.JobProgress.Keys) { 379 368 if (jobId == currJob.Id) { … … 428 417 JobDto job2Calculate = scheduler.GetNextJobForClient(clientId); 429 418 if (job2Calculate != null) { 430 response.Job = job2Calculate; 419 response.Job = job2Calculate; 431 420 response.Job.PluginsNeeded = DaoLocator.PluginInfoDao.GetPluginDependenciesForJob(response.Job); 432 421 response.Success = true; … … 437 426 newAssignedJobs.Add(job2Calculate.Id, ApplicationConstants.JOB_TIME_TO_LIVE); 438 427 } 439 } else { 428 } else { 440 429 response.Success = false; 441 430 response.Job = null; … … 449 438 } 450 439 451 public ResponseResultReceived ProcessJobResult( 452 Stream stream, 453 bool finished) { 454 440 public ResponseResultReceived ProcessJobResult(Stream stream, bool finished) { 455 441 Logger.Info("BEGIN Job received for Storage - main method:"); 456 442 457 458 Stream jobResultStream = null; 459 Stream jobStream = null; 443 //Stream jobResultStream = null; 444 //Stream jobStream = null; 460 445 461 446 //try { 462 BinaryFormatter formatter = 463 new BinaryFormatter(); 464 465 JobResult result = 466 (JobResult)formatter.Deserialize(stream); 447 BinaryFormatter formatter = new BinaryFormatter(); 448 449 JobResult result = (JobResult)formatter.Deserialize(stream); 467 450 468 451 //important - repeatable read isolation level is required here, … … 487 470 List<byte> serializedJob = new List<byte>(); 488 471 int read = 0; 489 int i = 0; 490 while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) { 472 int i = 0; 473 while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) { 491 474 for (int j = 0; j < read; j++) { 492 475 serializedJob.Add(buffer[j]); 493 476 } 494 if (i % 100 == 0)495 Logger.Debug("Writing to stream: " + i); 477 if (i % 100 == 0) 478 Logger.Debug("Writing to stream: " + i); 496 479 //jobStream.Write(buffer, 0, read); 497 480 i++; 498 481 } 499 Logger.Debug("Done Writing, closing the stream!"); 482 Logger.Debug("Done Writing, closing the stream!"); 500 483 //jobStream.Close(); 501 484 … … 518 501 519 502 ResponseResultReceived response = new ResponseResultReceived(); 520 ClientDto client = 521 DaoLocator.ClientDao.FindById(clientId); 522 523 SerializedJob job = 524 new SerializedJob(); 503 ClientDto client = DaoLocator.ClientDao.FindById(clientId); 504 505 SerializedJob job = new SerializedJob(); 525 506 526 507 if (job != null) { 527 job.JobInfo = 528 DaoLocator.JobDao.FindById(jobId); 508 job.JobInfo = DaoLocator.JobDao.FindById(jobId); 529 509 job.JobInfo.Client = job.JobInfo.Client = DaoLocator.ClientDao.GetClientForJob(jobId); 530 510 } … … 650 630 651 631 Logger.Info("Client logged out " + clientId); 652 632 653 633 Response response = new Response(); 654 634 … … 680 660 681 661 return response; 682 } 662 } 683 663 684 664 /// <summary>
Note: See TracChangeset
for help on using the changeset viewer.