- Timestamp:
- 09/17/10 10:26:55 (14 years ago)
- Location:
- branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/ConfigurationManager/ConfigManager.cs
r4333 r4423 107 107 st.ConnectedSince = WcfService.Instance.ConnectedSince; 108 108 109 st.TotalCores = hardwareInfo.NrOfCores ;110 st.FreeCores = hardwareInfo.NrOfCores - GetUsedCores();109 st.TotalCores = hardwareInfo.NrOfCores.HasValue ? hardwareInfo.NrOfCores.Value : 0; 110 st.FreeCores = hardwareInfo.NrOfCores.HasValue ? hardwareInfo.NrOfCores.Value - GetUsedCores() : 0; 111 111 112 112 st.JobsAborted = SlaveStatusInfo.JobsAborted; … … 120 120 foreach (KeyValuePair<Guid, Executor> kvp in engines) { 121 121 Executor e = kvp.Value; 122 st.Jobs.Add(new JobStatus { JobId = e.JobId, Progress = e.Progress, Since = e.CreationTime });122 st.Jobs.Add(new JobStatus { JobId = e.JobId, ExecutionTime = e.ExecutionTime, Since = e.CreationTime }); 123 123 } 124 124 } … … 126 126 } 127 127 128 public Dictionary<Guid, double> GetProgressOfAllJobs() {129 Dictionary<Guid, double> prog = new Dictionary<Guid, double>();128 public Dictionary<Guid, TimeSpan> GetExecutionTimeOfAllJobs() { 129 Dictionary<Guid, TimeSpan> prog = new Dictionary<Guid, TimeSpan>(); 130 130 Dictionary<Guid, Executor> engines = Core.ExecutionEngines; 131 131 lock (engines) { 132 132 foreach (KeyValuePair<Guid, Executor> kvp in engines) { 133 133 Executor e = kvp.Value; 134 //if (!e.JobIsFinished) 135 prog[e.JobId] = e.Progress; 134 prog[e.JobId] = e.ExecutionTime; 136 135 } 137 136 } -
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/Core.cs
r4368 r4423 36 36 using HeuristicLab.Hive.Slave.ExecutionEngine; 37 37 using HeuristicLab.Tracing; 38 using HeuristicLab.Common; 38 39 39 40 namespace HeuristicLab.Hive.Slave.Core { … … 44 45 public static bool abortRequested { get; set; } 45 46 47 private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>(); 48 private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>(); 49 private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>(); 50 51 private WcfService wcfService; 52 private HeartbeatManager heartbeatManager; 53 46 54 private bool currentlyFetching; 47 55 private bool CurrentlyFetching { … … 55 63 } 56 64 57 private Dictionary<Guid, Executor> engines = new Dictionary<Guid, Executor>(); 58 private Dictionary<Guid, AppDomain> appDomains = new Dictionary<Guid, AppDomain>(); 59 private Dictionary<Guid, JobDto> jobs = new Dictionary<Guid, JobDto>(); 60 61 private WcfService wcfService; 62 private HeartbeatManager beat; 65 public Dictionary<Guid, Executor> ExecutionEngines { 66 get { return engines; } 67 } 68 69 internal Dictionary<Guid, JobDto> Jobs { 70 get { return jobs; } 71 } 63 72 64 73 /// <summary> … … 70 79 SlaveConsoleServer server = new SlaveConsoleServer(); 71 80 server.Start(); 72 81 73 82 ConfigManager manager = ConfigManager.Instance; 74 83 manager.Core = this; … … 95 104 private void StartHeartbeats() { 96 105 //Initialize the heartbeat 97 beat= new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };98 beat.StartHeartbeat();106 heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) }; 107 heartbeatManager.StartHeartbeat(); 99 108 } 100 109 … … 111 120 wcfService.GetFinishedJobResultCompleted += new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted); 112 121 wcfService.ProcessSnapshotCompleted += new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted); 113 wcfService.ConnectionRestored += new EventHandler(wcfService_ConnectionRestored);114 //wcfService.ServerChanged += new EventHandler(wcfService_ServerChanged);115 122 wcfService.Connected += new EventHandler(wcfService_Connected); 116 123 } … … 120 127 wcfService.GetFinishedJobResultCompleted -= new EventHandler<StoreFinishedJobResultCompletedEventArgs>(wcfService_StoreFinishedJobResultCompleted); 121 128 wcfService.ProcessSnapshotCompleted -= new EventHandler<ProcessSnapshotCompletedEventArgs>(wcfService_ProcessSnapshotCompleted); 122 wcfService.ConnectionRestored -= new EventHandler(wcfService_ConnectionRestored);123 //wcfService.ServerChanged -= new EventHandler(wcfService_ServerChanged);124 129 wcfService.Connected -= new EventHandler(wcfService_Connected); 125 130 } … … 135 140 case MessageContainer.MessageType.AbortJob: 136 141 if (engines.ContainsKey(container.JobId)) 137 engines[container.JobId].Abort(); 142 try { 143 engines[container.JobId].Abort(); 144 } 145 catch (AppDomainUnloadedException) { 146 // appdomain already unloaded. Finishing job probably ongoing 147 } 138 148 else 139 149 Logger.Error("AbortJob: Engine doesn't exist"); … … 156 166 //Snapshot is ready and can be sent back to the Server 157 167 case MessageContainer.MessageType.SnapshotReady: 158 ThreadPool.QueueUserWorkItem(new WaitCallback(GetSnapshot),container.JobId);168 GetSnapshot(container.JobId); 159 169 break; 160 170 … … 168 178 break; 169 179 170 171 180 //A Job has finished and can be sent back to the server 172 181 case MessageContainer.MessageType.FinishedJob: 173 ThreadPool.QueueUserWorkItem(new WaitCallback(GetFinishedJob), container.JobId); 174 break; 175 182 SendFinishedJob(container.JobId); 183 break; 176 184 177 185 //When the timeslice is up 178 186 case MessageContainer.MessageType.UptimeLimitDisconnect: 179 187 Logger.Info("Uptime Limit reached, storing jobs and sending them back"); 180 181 188 ShutdownRunningJobsAndSubmitSnapshots(); 182 189 break; … … 201 208 Logger.Debug("Stopping heartbeat"); 202 209 abortRequested = true; 203 beat.StopHeartBeat();210 heartbeatManager.StopHeartBeat(); 204 211 Logger.Debug("Logging out"); 205 212 WcfService.Instance.Logout(ConfigManager.Instance.GetClientInfo().Id); … … 216 223 217 224 case MessageContainer.MessageType.GetChildJobs: 218 // send the job back to hive219 225 GetChildJobs((MessageContainerWithCallback<SerializedJobList>)container); 226 break; 227 228 case MessageContainer.MessageType.DeleteChildJobs: 229 wcfService.DeleteChildJobs(container.JobId); 220 230 break; 221 231 } … … 224 234 private void GetChildJobs(MessageContainerWithCallback<SerializedJobList> mc) { 225 235 ResponseObject<SerializedJobList> response = wcfService.GetChildJobs(mc.JobId); 226 if (response .StatusMessage != ResponseStatus.Ok) {227 Logger.Error("GetChildJobs failed: " + response.StatusMessage);236 if (response != null && response.StatusMessage == ResponseStatus.Ok) { 237 mc.Callback(response.Obj); 228 238 } else { 229 mc.Callback(response.Obj); 239 if (response != null) { 240 Logger.Error(string.Format("GetChildJobs failed: {0}", response.StatusMessage)); 241 } else { 242 Logger.Error("GetChildJobs failed."); 243 } 230 244 } 231 245 } … … 234 248 ResponseObject<JobDto> response = wcfService.PauseJob(mc.SerializedJob); 235 249 KillAppDomain(mc.JobId); 236 if (response .StatusMessage != ResponseStatus.Ok) {250 if (response == null || response.StatusMessage != ResponseStatus.Ok) { 237 251 Logger.Error("PauseJob failed: " + response.StatusMessage); 238 252 } … … 241 255 private ResponseObject<JobDto> AddChildJob(MessageContainerWithJob mc) { 242 256 ResponseObject<JobDto> response = wcfService.AddChildJob(mc.JobId, mc.SerializedJob); 243 if (response .StatusMessage != ResponseStatus.Ok) {257 if (response == null || response.StatusMessage != ResponseStatus.Ok) { 244 258 Logger.Error("AddChildJob failed: " + response.StatusMessage); 245 259 } … … 267 281 /// </summary> 268 282 /// <param name="jobId"></param> 269 private void GetFinishedJob(object jobId) { 270 Guid jId = (Guid)jobId; 271 Logger.Info("Getting the finished job with id: " + jId); 283 private void SendFinishedJob(object jobId) { 272 284 try { 285 Guid jId = (Guid)jobId; 286 Logger.Info("Getting the finished job with id: " + jId); 273 287 if (!engines.ContainsKey(jId)) { 274 288 Logger.Info("Engine doesn't exist"); … … 278 292 byte[] sJob = engines[jId].GetFinishedJob(); 279 293 280 if (WcfService.Instance.LoggedIn){294 try { 281 295 Logger.Info("Sending the finished job with id: " + jId); 282 wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, 1.0, engines[jId].CurrentException, true); 296 wcfService.GetFinishedJobResultAsync(ConfigManager.Instance.GetClientInfo().Id, jId, sJob, engines[jId].ExecutionTime, engines[jId].CurrentException, true); 297 } 298 catch (Exception e) { 299 Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")"); 300 JobStorageManager.PersistObjectToDisc(wcfService.ServerIp, 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment 301 } 302 finally { 303 KillAppDomain(jId); // kill app-domain in every case 304 } 305 } 306 catch (Exception e) { 307 OnExceptionOccured(e); 308 } 309 } 310 311 private void GetSnapshot(object jobId) { 312 try { 313 Logger.Info("Fetching a snapshot for job " + jobId); 314 Guid jId = (Guid)jobId; 315 byte[] obj = engines[jId].GetSnapshot(); 316 wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id, jId, obj, engines[jId].ExecutionTime, null); 317 318 //Uptime Limit reached, now is a good time to destroy this jobs. 319 Logger.Debug("Checking if uptime limit is reached"); 320 if (!UptimeManager.Instance.IsAllowedToCalculate()) { 321 Logger.Debug("Uptime limit reached"); 322 Logger.Debug("Killing Appdomain"); 323 KillAppDomain(jId); 324 //Still anything running? 325 if (engines.Count == 0) { 326 Logger.Info("All jobs snapshotted and sent back, disconnecting"); 327 WcfService.Instance.Disconnect(); 328 } else { 329 Logger.Debug("There are still active Jobs in the Field, not disconnecting"); 330 } 283 331 } else { 284 Logger.Info("Storing the finished job with id: " + jId + " to hdd"); 285 JobStorageManager.PersistObjectToDisc(wcfService.ServerIp, 0, jId, sJob); // [chn] Port is not unique anymore (since we need two ports for http and net.tcp-streaming). also the port is now specified only in app.config. use port 0 for the moment 286 KillAppDomain(jId); 287 } 288 } 289 catch (InvalidStateException ise) { 290 Logger.Error("Invalid State while Snapshoting:", ise); 291 } 292 } 293 294 private void GetSnapshot(object jobId) { 295 Logger.Info("Fetching a snapshot for job " + jobId); 296 Guid jId = (Guid)jobId; 297 byte[] obj = engines[jId].GetSnapshot(); 298 Logger.Debug("BEGIN: Sending snapshot sync"); 299 wcfService.ProcessSnapshotSync(ConfigManager.Instance.GetClientInfo().Id, 300 jId, 301 obj, 302 engines[jId].Progress, 303 null); 304 Logger.Debug("END: Sended snapshot sync"); 305 //Uptime Limit reached, now is a good time to destroy this jobs. 306 Logger.Debug("Checking if uptime limit is reached"); 307 if (!UptimeManager.Instance.IsAllowedToCalculate()) { 308 Logger.Debug("Uptime limit reached"); 309 Logger.Debug("Killing Appdomain"); 310 KillAppDomain(jId); 311 //Still anything running? 312 if (engines.Count == 0) { 313 Logger.Info("All jobs snapshotted and sent back, disconnecting"); 314 WcfService.Instance.Disconnect(); 315 } else { 316 Logger.Debug("There are still active Jobs in the Field, not disconnecting"); 317 } 318 319 } else { 320 Logger.Debug("Restarting the job" + jobId); 321 engines[jId].StartOnlyJob(); 322 Logger.Info("Restarted the job" + jobId); 332 Logger.Debug("Restarting the job" + jobId); 333 engines[jId].StartOnlyJob(); 334 Logger.Info("Restarted the job" + jobId); 335 } 336 } 337 catch (Exception e) { 338 OnExceptionOccured(e); 323 339 } 324 340 } … … 349 365 if (e.Result.StatusMessage != ResponseStatus.GetJob_NoJobsAvailable) { 350 366 Logger.Info("Received new job with id " + e.Result.Obj.Id); 351 bool sandboxed = false;352 367 Logger.Debug("Fetching plugins for job " + e.Result.Obj.Id); 353 368 try { 354 355 369 PluginCache.Instance.PreparePlugins(e.Result.Obj.PluginsNeeded); 356 370 PluginCache.Instance.CopyPluginsForJob(e.Result.Obj.PluginsNeeded, e.Result.Obj.Id); 357 371 358 // foreach (CachedHivePluginInfoDto plugininfo in PluginCache.Instance.GetPlugins(e.Result.Job.PluginsNeeded))359 // files.AddRange(plugininfo.PluginFiles);360 372 Logger.Debug("Plugins fetched for job " + e.Result.Obj.Id); 361 373 String pluginDir = Path.Combine(PluginCache.Instance.PluginRepositoryDir, e.Result.Obj.Id.ToString()); … … 375 387 engine.Start(e.Data); 376 388 engines.Add(e.Result.Obj.Id, engine); 377 378 389 SlaveStatusInfo.JobsFetched++; 379 390 Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched); 380 391 } 381 392 } 382 beat.InterruptHeartBeatThread();393 heartbeatManager.AwakeHeartBeatThread(); 383 394 } 384 395 catch (Exception exception) { … … 387 398 CurrentlyFetching = false; 388 399 KillAppDomain(e.Result.Obj.Id); 389 wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, 1, exception, true);400 wcfService.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, e.Result.Obj.Id, new byte[] { }, e.Result.Obj.ExecutionTime, exception.ToString(), true); 390 401 } 391 402 } else { … … 406 417 SlaveStatusInfo.JobsProcessed++; 407 418 Logger.Info("Increased ProcessedJobs to:" + SlaveStatusInfo.JobsProcessed); 408 beat.InterruptHeartBeatThread();419 heartbeatManager.AwakeHeartBeatThread(); 409 420 } else { 410 421 Logger.Error("Sending of job " + e.Result.JobId + " failed, job has been wasted. Message: " + e.Result.StatusMessage); … … 433 444 FetchCalendarFromServer(); 434 445 } 435 //if the fetching from the server failed - still set the client online... maybe we get436 //a result within the next few heartbeats437 //if (!UptimeManager.Instance.CalendarAvailable || UptimeManager.Instance.IsOnline()) {438 446 Logger.Info("CalendarAvailable is " + UptimeManager.Instance.CalendarAvailable + " and IsOnline is: " + UptimeManager.Instance.IsAllowedToCalculate()); 439 Logger.Info("Setting client online");440 wcfService.Login(ConfigManager.Instance.GetClientInfo());447 CurrentlyFetching = false; 448 CheckRunningAppDomains(); 441 449 JobStorageManager.CheckAndSubmitJobsFromDisc(); 442 CurrentlyFetching = false;443 450 } 444 451 … … 459 466 } 460 467 461 //this is a little bit tricky - 462 void wcfService_ConnectionRestored(object sender, EventArgs e) { 463 Logger.Info("Reconnected to old server - checking currently running appdomains"); 464 468 private void CheckRunningAppDomains() { 465 469 foreach (KeyValuePair<Guid, Executor> execKVP in engines) { 466 470 if (execKVP.Value.ExecutionState != ExecutionState.Started && execKVP.Value.CurrentMessage == MessageContainer.MessageType.NoMessage) { 467 471 Logger.Info("Checking for JobId: " + execKVP.Value.JobId); 468 Thread finThread = new Thread(new ParameterizedThreadStart( GetFinishedJob));472 Thread finThread = new Thread(new ParameterizedThreadStart(SendFinishedJob)); 469 473 finThread.Start(execKVP.Value.JobId); 470 474 } … … 474 478 #endregion 475 479 476 public Dictionary<Guid, Executor> ExecutionEngines { 477 get { return engines; } 478 } 479 480 internal Dictionary<Guid, JobDto> Jobs { 481 get { return jobs; } 480 public event EventHandler<EventArgs<Exception>> ExceptionOccured; 481 private void OnExceptionOccured(Exception e) { 482 Logger.Error("Error: " + e.ToString()); 483 var handler = ExceptionOccured; 484 if (handler != null) handler(this, new EventArgs<Exception>(e)); 482 485 } 483 486 … … 498 501 if (appDomains.ContainsKey(id)) { 499 502 appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException); 500 AppDomain.Unload(appDomains[id]); 503 504 int repeat = 5; 505 while (repeat > 0) { 506 try { 507 AppDomain.Unload(appDomains[id]); 508 repeat = 0; 509 } 510 catch (CannotUnloadAppDomainException) { 511 Logger.Error("Could not unload AppDomain, will try again in 1 sec."); 512 Thread.Sleep(1000); 513 repeat--; 514 if (repeat == 0) { 515 throw; // rethrow and let app crash 516 } 517 } 518 } 501 519 appDomains.Remove(id); 502 520 } -
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/HeartbeatManager.cs
r4368 r4423 34 34 using System.Threading; 35 35 using HeuristicLab.Hive.Slave.Communication.SlaveFacade; 36 using HeuristicLab.Common; 36 37 37 38 namespace HeuristicLab.Hive.Slave.Core { … … 40 41 /// </summary> 41 42 public class HeartbeatManager { 42 43 private bool offline; 44 43 private static object locker = new object(); 45 44 public TimeSpan Interval { get; set; } 46 47 45 private Thread heartBeatThread; 48 49 private static object locker = new object(); 46 private AutoResetEvent waitHandle; 50 47 51 48 public HeartbeatManager() { 52 Interval = new TimeSpan(0, 0,10);49 Interval = new TimeSpan(0, 0, 10); 53 50 } 54 51 … … 68 65 /// </summary> 69 66 public void StartHeartbeat() { 67 this.waitHandle = new AutoResetEvent(true); 70 68 wcfService = WcfService.Instance; 71 69 wcfService.ProcessHeartBeatCompleted += new EventHandler<ProcessHeartBeatCompletedEventArgs>(wcfService_ProcessHeartBeatCompleted); 72 70 abortThreadPending = false; 73 heartBeatThread = GetHeartBeatThread();71 heartBeatThread = new Thread(RunHeartBeatThread); 74 72 heartBeatThread.Start(); 75 73 } 76 74 77 private Thread GetHeartBeatThread() { 78 return new Thread(() => { 79 while (!abortThreadPending) { 80 try { 81 lock (locker) { 82 if (wcfService.ConnState != NetworkEnum.WcfConnState.Connected) { 83 wcfService.Connect(); // Login happens automatically upon successufl connection 84 } 85 if (!wcfService.LoggedIn) { 86 wcfService.Login(ConfigManager.Instance.GetClientInfo()); // if login faild previously try again 75 /// <summary> 76 /// Stop the heartbeat 77 /// </summary> 78 public void StopHeartBeat() { 79 abortThreadPending = true; 80 waitHandle.Set(); 81 heartBeatThread.Join(); 82 } 83 84 /// <summary> 85 /// use this method to singalize there is work to do (to avoid the waiting period if its clear that actions are required) 86 /// </summary> 87 public void AwakeHeartBeatThread() { 88 waitHandle.Set(); 89 } 90 91 private void RunHeartBeatThread() { 92 while (!abortThreadPending) { 93 try { 94 lock (locker) { 95 if (wcfService.ConnState != NetworkEnum.WcfConnState.Connected) { 96 wcfService.Connect(ConfigManager.Instance.GetClientInfo()); // Login happens automatically upon successufl connection 97 } 98 if(wcfService.ConnState == NetworkEnum.WcfConnState.Connected) { 99 SlaveDto info = ConfigManager.Instance.GetClientInfo(); 100 101 HeartBeatData heartBeatData = new HeartBeatData { 102 SlaveId = info.Id, 103 FreeCores = info.NrOfCores.HasValue ? info.NrOfCores.Value - ConfigManager.Instance.GetUsedCores() : 0, 104 FreeMemory = GetFreeMemory(), 105 JobProgress = ConfigManager.Instance.GetExecutionTimeOfAllJobs(), 106 IsAllowedToCalculate = UptimeManager.Instance.IsAllowedToCalculate() && UptimeManager.Instance.CalendarAvailable 107 }; 108 109 if (!heartBeatData.IsAllowedToCalculate) { 110 // stop all running jobs and send snapshots to server 111 MessageQueue.GetInstance().AddMessage(MessageContainer.MessageType.UptimeLimitDisconnect); 87 112 } 88 113 89 if (wcfService.LoggedIn) { 90 SlaveDto info = ConfigManager.Instance.GetClientInfo(); 91 92 HeartBeatData heartBeatData = new HeartBeatData { 93 SlaveId = info.Id, 94 FreeCores = info.NrOfCores - ConfigManager.Instance.GetUsedCores(), 95 FreeMemory = GetFreeMemory(), 96 JobProgress = ConfigManager.Instance.GetProgressOfAllJobs(), 97 IsAllowedToCalculate = UptimeManager.Instance.IsAllowedToCalculate() && UptimeManager.Instance.CalendarAvailable 98 }; 99 100 if (!heartBeatData.IsAllowedToCalculate) { 101 // stop all running jobs and send snapshots to server 102 MessageQueue.GetInstance().AddMessage(MessageContainer.MessageType.UptimeLimitDisconnect); 103 } 104 105 Logger.Debug("Sending Heartbeat: " + heartBeatData); 106 wcfService.ProcessHeartBeatSync(heartBeatData); 107 } 108 109 } // lock 110 } 111 catch (Exception e) { 112 Logger.Error("Heartbeat Thread failed badly: " + e.Message); 113 } 114 115 try { 116 heartBeatThreadIsSleepingLock.EnterWriteLock(); 117 heartBeatThreadIsSleeping = true; 118 heartBeatThreadIsSleepingLock.ExitWriteLock(); 119 120 Thread.Sleep(Interval); 121 122 heartBeatThreadIsSleepingLock.EnterWriteLock(); 123 heartBeatThreadIsSleeping = false; 124 heartBeatThreadIsSleepingLock.ExitWriteLock(); 125 } 126 catch (ThreadInterruptedException e) { 127 Logger.Debug("Heartbeat sleep interrupted"); 128 } 129 } // while 130 abortThreadPending = false; 131 Logger.Debug("Heartbeat thread stopped"); 132 }); 133 } 134 135 private int GetFreeMemory() { 136 PerformanceCounter counter = new PerformanceCounter("Memory", "Available Bytes", true); 137 int mb = (int)(counter.NextValue() / 1024 / 1024); 138 return mb; 114 Logger.Debug("Sending Heartbeat: " + heartBeatData); 115 wcfService.ProcessHeartBeatSync(heartBeatData); 116 } 117 } // lock 118 } 119 catch (Exception e) { 120 Logger.Error("Heartbeat Thread failed badly: " + e.Message); 121 OnExceptionOccured(e); 122 } 123 waitHandle.WaitOne(this.Interval); 124 } // while 125 waitHandle.Close(); 126 abortThreadPending = false; 127 Logger.Debug("Heartbeat thread stopped"); 139 128 } 140 129 … … 144 133 } 145 134 146 public void StopHeartBeat() { 147 abortThreadPending = true; 148 heartBeatThread.Interrupt(); 149 heartBeatThread.Join(); 135 #region Eventhandler 136 public event EventHandler<EventArgs<Exception>> ExceptionOccured; 137 private void OnExceptionOccured(Exception e) { 138 var handler = ExceptionOccured; 139 if (handler != null) handler(this, new EventArgs<Exception>(e)); 150 140 } 141 #endregion 151 142 152 /// <summary> 153 /// use this method to singalize there is work to do (to avoid the waiting period if its clear that actions are required) 154 /// </summary> 155 public void InterruptHeartBeatThread() { 156 // deal with ReadWriteLock to avoid interrupting the thread while it is in SleepOrWait due to another reason (like waiting for a resource in a lock-statement) 157 heartBeatThreadIsSleepingLock.EnterReadLock(); 158 Debug.WriteLine("InterruptHeartBeatThread"); 159 if (heartBeatThreadIsSleeping) { 160 Debug.WriteLine("will interrupt heartbeatthread"); 161 heartBeatThread.Interrupt(); 162 } else { 163 Debug.WriteLine("will not interrupt heartbeatthread"); 164 } 165 heartBeatThreadIsSleepingLock.ExitReadLock(); 143 #region Helpers 144 private int GetFreeMemory() { 145 PerformanceCounter counter = new PerformanceCounter("Memory", "Available Bytes", true); 146 int mb = (int)(counter.NextValue() / 1024 / 1024); 147 return mb; 166 148 } 149 #endregion 167 150 } 168 151 } -
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/JobStorage/JobStorageManager.cs
r4337 r4423 45 45 public static void CheckAndSubmitJobsFromDisc() { 46 46 for (int index = storedJobsList.Count; index > 0; index--) { 47 if ( WcfService.Instance.LoggedIn && (storedJobsList[index - 1].ServerIP == WcfService.Instance.ServerIp)) {47 if (storedJobsList[index - 1].ServerIP == WcfService.Instance.ServerIp) { 48 48 String filename = storedJobsList[index - 1].ServerIP + "." + storedJobsList[index - 1].ServerPort + "." + storedJobsList[index - 1].JobID.ToString(); 49 49 Logger.Info("Sending stored job " + storedJobsList[index - 1].JobID + " to the server"); … … 51 51 byte[] job = File.ReadAllBytes(path + filename + ".dat"); 52 52 if (WcfService.Instance.IsJobStillNeeded(storedJobsList[index - 1].JobID).StatusMessage == ResponseStatus.Ok) { 53 ResponseResultReceived res = WcfService.Instance.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, storedJobsList[index - 1].JobID, job, 1.00, null, true);53 ResponseResultReceived res = WcfService.Instance.StoreFinishedJobResultsSync(ConfigManager.Instance.GetClientInfo().Id, storedJobsList[index - 1].JobID, job, TimeSpan.Zero, null, true); // unfortunately we do not have the correct ExecutionTime since we would need to unzip the job for that 54 54 Logger.Info("Sending of job " + storedJobsList[index - 1].JobID + " done"); 55 55 } -
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/SlaveConsoleService/SlaveConsoleCommunicator.cs
r4337 r4423 20 20 public void SetConnection(ConnectionContainer container) { 21 21 ConfigManager.Instance.SetServerIP(container); 22 //WcfService.Instance.Connect(container.IPAdress);23 22 WcfService.Instance.ServerIp = container.IPAdress; 24 23 } 25 26 //public void Disconnect() {27 // WcfService.Instance.Disconnect();28 //}29 24 30 25 public ConnectionContainer GetCurrentConnection() { -
branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Core/3.3/SlaveConsoleService/TransferObjects/JobStatus.cs
r4320 r4423 43 43 /// </summary> 44 44 [DataMember] 45 public double Progress{ get; set; }45 public TimeSpan ExecutionTime { get; set; } 46 46 } 47 47 }
Note: See TracChangeset
for help on using the changeset viewer.