Changeset 5137
- Timestamp:
- 12/20/10 19:58:01 (14 years ago)
- Location:
- branches/HeuristicLab.Hive-3.4/sources
- Files:
-
- 1 added
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources
- Property svn:ignore
-
old new 1 *.suo 1 2 HeuristicLab.Hive-3.4.suo 2 3 TestResults 3 HeuristicLab.Hive 3.4.suo
-
- Property svn:ignore
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave.Tests/Mocks/MockHiveService.cs
r5106 r5137 1 1 using System; 2 2 using System.Collections.Generic; 3 using System.IO; 3 4 using System.Linq; 4 using System.Text; 5 using HeuristicLab.PluginInfrastructure; 6 using HeuristicLab.PluginInfrastructure.Manager; 7 using HeuristicLab.Services.Hive.Common; 8 using HeuristicLab.Services.Hive.Common.DataTransfer; 5 9 using HeuristicLab.Services.Hive.Common.ServiceContracts; 6 using HeuristicLab.Services.Hive.Common.DataTransfer;7 using HeuristicLab.Services.Hive.Common;8 using HeuristicLab.PluginInfrastructure.Manager;9 using System.IO;10 using System.Reflection;11 using HeuristicLab.PluginInfrastructure;12 10 13 11 namespace HeuristicLab.Clients.Hive.Slave.Tests { … … 15 13 private static List<Job> jobs; 16 14 private static List<JobData> jobDatas; 17 private static List<HeuristicLab.Services.Hive.Common.DataTransfer.HiveExperiment> hiveExperiments; 15 private static List<HeuristicLab.Services.Hive.Common.DataTransfer.HiveExperiment> hiveExperiments; 18 16 private static List<Plugin> plugins = null; 17 public List<Job> ResultJobs { get; set; } 18 public List<JobData> ResultJobDatas { get; set; } 19 19 private static IEnumerable<PluginDescription> pDescs = null; 20 20 private static int cnt = 0; … … 28 28 ServerConfigFilePath = Path.Combine(ServerPluginCachePath, "HeuristicLab 3.3.exe.config"); 29 29 30 byte[] data = PersistenceUtil.Serialize(new MockJob(4000, false));31 30 if (plugins == null) 32 31 plugins = ReadPluginsFromServerCache(); 33 32 34 jobs = new List<Job>(); 33 34 //TODO: reuse as fallback? 35 /*jobs = new List<Job>(); 35 36 jobs.Add(new Job { Id = Guid.NewGuid(), JobState = JobState.Waiting, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 0 }); 36 37 jobs.Add(new Job { Id = Guid.NewGuid(), JobState = JobState.Offline, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 100, ParentJobId = jobs.First().Id }); 37 38 jobs.Add(new Job { Id = Guid.NewGuid(), JobState = JobState.Offline, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 1024, ParentJobId = jobs.First().Id }); 38 39 jobs.Add(new Job { Id = Guid.NewGuid(), JobState = JobState.Offline, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 4096, ParentJobId = jobs.First().Id }); 39 40 byte[] data = PersistenceUtil.Serialize(new MockJob(400, false)); 40 41 jobDatas = new List<JobData>(); 41 42 foreach (var job in jobs) { 42 43 job.PluginsNeededIds = new List<Guid>(); 43 44 job.PluginsNeededIds.AddRange(plugins.Select(a => a.Id)); 44 jobDatas.Add(new JobData() { JobId = job.Id, Data = data }); 45 } 46 45 jobDatas.Add(new JobData() { JobId = job.Id, Data = data }); 46 } 47 48 hiveExperiments = new List<HeuristicLab.Services.Hive.Common.DataTransfer.HiveExperiment>(); 49 hiveExperiments.Add(new HeuristicLab.Services.Hive.Common.DataTransfer.HiveExperiment() { Id = Guid.NewGuid(), Name = "Hive Exp 1", Description = "", RootJobId = jobs[0].Id });*/ 50 } 51 52 public void updateJobs(List<Job> js, MockJob m) { 53 jobs = js; 54 byte[] data = PersistenceUtil.Serialize(m); 55 56 jobDatas = new List<JobData>(); 57 foreach (var job in jobs) { 58 job.PluginsNeededIds = new List<Guid>(); 59 job.PluginsNeededIds.AddRange(plugins.Select(a => a.Id)); 60 jobDatas.Add(new JobData() { JobId = job.Id, Data = data }); 61 } 62 47 63 hiveExperiments = new List<HeuristicLab.Services.Hive.Common.DataTransfer.HiveExperiment>(); 48 64 hiveExperiments.Add(new HeuristicLab.Services.Hive.Common.DataTransfer.HiveExperiment() { Id = Guid.NewGuid(), Name = "Hive Exp 1", Description = "", RootJobId = jobs[0].Id }); 65 66 ResultJobDatas = new List<JobData>(); 67 ResultJobs = new List<Job>(); 49 68 } 50 69 … … 60 79 p.Id = Guid.NewGuid(); 61 80 plugins.Add(p); 81 62 82 } 63 83 return plugins; … … 87 107 pd.Data = cf; 88 108 return pd; 109 89 110 } 90 111 91 112 public Job GetJob(Guid jobId) { 113 //MockHBM.SendHeartbeat(); 92 114 return jobs.Where(j => j.Id == jobId).SingleOrDefault(); 93 115 } … … 106 128 107 129 public JobData GetJobData(Guid jobId) { 108 return jobDatas.Where(jd => jd.JobId == jobId).SingleOrDefault(); 130 JobData ret = jobDatas.Where(jd => jd.JobId == jobId).SingleOrDefault(); 131 //MockHBM.SendHeartbeat(); 132 return ret; 109 133 } 110 134 111 135 public void UpdateJob(Job jobDto, JobData jobDataDto) { 112 136 Console.WriteLine("Update Job called!"); 137 ResultJobDatas.Add(jobDataDto); 138 ResultJobs.Add(jobDto); 113 139 } 114 140 … … 123 149 124 150 #region JobControl Methods 151 int curJobIdx = 0; 125 152 public Job AquireJob(Guid slaveId) { 126 var job = jobs.First(); 127 job.SlaveId = slaveId; 128 return job; 153 if (curJobIdx < jobs.Count) { 154 var job = jobs[curJobIdx]; 155 job.SlaveId = slaveId; 156 curJobIdx++; 157 return job; 158 } 159 throw new Exception("No Jobs left on MockHiveService!"); 129 160 } 130 161 … … 169 200 public void GoodBye() { 170 201 // do nothing 171 } 172 202 } 203 173 204 public List<MessageContainer> Heartbeat(Heartbeat heartbeat) { 174 205 if (Messages != null && cnt < Messages.Count) { … … 178 209 } 179 210 } 180 211 181 212 #endregion 182 213 183 214 #region Plugin Methods 184 public Guid AddPlugin(Plugin plugin, List<PluginData> pluginData) { 185 // todo 186 return Guid.NewGuid(); 215 public Guid AddPlugin(Plugin p, List<PluginData> pds) { 216 /*p.Id = Guid.NewGuid(); 217 plugins.Add(p); 218 foreach (var pluginData in pds) { 219 pluginData.PluginId = p.Id; 220 pluginDatas.Add(pluginData); 221 222 } 223 return p.Id;*/ 224 throw new NotImplementedException(); 225 187 226 } 188 227 … … 190 229 if (plugins == null) 191 230 plugins = ReadPluginsFromServerCache(); 192 return plugins; 231 return plugins; 193 232 } 194 233 … … 207 246 pluginDatas.Add(data); 208 247 } 209 } 210 } 248 } 249 } 211 250 } 212 251 return pluginDatas; -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave.Tests/Mocks/MockJob.cs
r5106 r5137 114 114 OnJobStopped(); 115 115 } 116 catch (ThreadAbortException) { 117 //this happens when the appdomain is killed (e.g. abort from server) 118 Stop(); 119 } 116 120 catch (Exception e) { 117 121 this.ExecutionState = Core.ExecutionState.Stopped; 118 OnJobFailed( );122 OnJobFailed(e); 119 123 } 120 124 } … … 170 174 171 175 public event EventHandler JobFailed; 172 protected virtual void OnJobFailed( ) {176 protected virtual void OnJobFailed(Exception e) { 173 177 EventHandler handler = JobFailed; 174 if (handler != null) handler(this, EventArgs.Empty); 178 EventArgs<Exception> ev = new EventArgs<Exception>(e); 179 if (handler != null) handler(this, ev); 175 180 } 176 181 -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave.Tests/SlaveTest.cs
r5104 r5137 1 1 using System; 2 using System.Text;3 2 using System.Collections.Generic; 4 3 using System.Linq; 4 using HeuristicLab.Clients.Common; 5 using HeuristicLab.Services.Hive.Common; 6 using HeuristicLab.Services.Hive.Common.DataTransfer; 7 using HeuristicLab.Services.Hive.Common.ServiceContracts; 5 8 using Microsoft.VisualStudio.TestTools.UnitTesting; 6 using HeuristicLab.Clients.Hive.Salve;7 using HeuristicLab.Services.Hive.Common;8 using HeuristicLab.Services.Hive.Common.ServiceContracts;9 using HeuristicLab.Clients.Common;10 9 11 10 namespace HeuristicLab.Clients.Hive.Slave.Tests { … … 16 15 [ClassInitialize] 17 16 public static void MyClassInitialize(TestContext testContext) { 18 PluginLoader.pluginAssemblies.Any(); 17 PluginLoader.pluginAssemblies.Any(); 19 18 ServiceLocator.Instance = new MockServiceLocator(); 20 19 } 21 20 22 p ublic List<List<MessageContainer>> CreateMsgs() {21 private List<List<MessageContainer>> CreateMsgsForSingleJob() { 23 22 List<List<MessageContainer>> allMsgs = new List<List<MessageContainer>>(); 24 23 //get slave to fetch job … … 28 27 29 28 //do nothing 29 msg = new List<MessageContainer>(); 30 allMsgs.Add(msg); 31 msg = new List<MessageContainer>(); 32 allMsgs.Add(msg); 33 msg = new List<MessageContainer>(); 34 allMsgs.Add(msg); 30 35 msg = new List<MessageContainer>(); 31 36 allMsgs.Add(msg); … … 38 43 39 44 [TestMethod] 40 public void TestMethod1() { 41 using (Disposable<IHiveService> service = ServiceLocator.Instance.GetService()) { 42 ((MockHiveService)service.Obj).Messages = CreateMsgs(); 45 public void TestSingleJob() { 46 Job testJob = new Job { Id = Guid.NewGuid(), JobState = JobState.Waiting, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 0 }; 47 List<Job> jobList = new List<Job>(); 48 jobList.Add(testJob); 49 MockJob job = new MockJob(400, false); 50 51 using (Disposable<IHiveService> service = ServiceLocator.Instance.GetService()) { 52 MockHiveService ms = (MockHiveService)service.Obj; 53 ((MockHiveService)service.Obj).Messages = CreateMsgsForSingleJob(); 54 ((MockHiveService)service.Obj).updateJobs(jobList, job); 55 56 57 HeuristicLab.Clients.Hive.Salve.Core core = new Salve.Core(); 58 core.Start(); 59 60 Assert.AreEqual<int>(1, ms.ResultJobs.Count); 61 Assert.AreEqual<Guid>(testJob.Id, ms.ResultJobs[0].Id); 43 62 } 44 45 HeuristicLab.Clients.Hive.Salve.Core core = new Salve.Core();46 core.Start();47 63 } 64 65 private List<List<MessageContainer>> CreateMsgsForShutdownSlaveWhileJobRunning() { 66 List<List<MessageContainer>> allMsgs = new List<List<MessageContainer>>(); 67 //get slave to fetch job 68 List<MessageContainer> msg = new List<MessageContainer>(); 69 msg.Add(new MessageContainer(MessageContainer.MessageType.AquireJob)); 70 allMsgs.Add(msg); 71 72 msg = new List<MessageContainer>(); 73 allMsgs.Add(msg); 74 75 msg = new List<MessageContainer>(); 76 msg.Add(new MessageContainer(MessageContainer.MessageType.ShutdownSlave)); 77 allMsgs.Add(msg); 78 return allMsgs; 79 } 80 81 [TestMethod] 82 public void TestShutdownSlaveWhileJobRunning() { 83 Job testJob = new Job { Id = Guid.NewGuid(), JobState = JobState.Waiting, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 0 }; 84 List<Job> jobList = new List<Job>(); 85 jobList.Add(testJob); 86 MockJob job = new MockJob(10000, false); 87 88 using (Disposable<IHiveService> service = ServiceLocator.Instance.GetService()) { 89 MockHiveService ms = (MockHiveService)service.Obj; 90 ((MockHiveService)service.Obj).Messages = CreateMsgsForShutdownSlaveWhileJobRunning(); 91 ((MockHiveService)service.Obj).updateJobs(jobList, job); 92 93 94 HeuristicLab.Clients.Hive.Salve.Core core = new Salve.Core(); 95 core.Start(); 96 97 } 98 } 99 100 101 private List<List<MessageContainer>> CreateMsgsForTwoJobs() { 102 List<List<MessageContainer>> allMsgs = new List<List<MessageContainer>>(); 103 104 //get slave to fetch jobs 105 List<MessageContainer> msg = new List<MessageContainer>(); 106 msg.Add(new MessageContainer(MessageContainer.MessageType.AquireJob)); 107 allMsgs.Add(msg); 108 109 msg = new List<MessageContainer>(); 110 allMsgs.Add(msg); 111 112 msg = new List<MessageContainer>(); 113 msg.Add(new MessageContainer(MessageContainer.MessageType.AquireJob)); 114 allMsgs.Add(msg); 115 116 117 msg = new List<MessageContainer>(); 118 allMsgs.Add(msg); 119 msg = new List<MessageContainer>(); 120 allMsgs.Add(msg); 121 msg = new List<MessageContainer>(); 122 allMsgs.Add(msg); 123 msg = new List<MessageContainer>(); 124 allMsgs.Add(msg); 125 msg = new List<MessageContainer>(); 126 allMsgs.Add(msg); 127 msg = new List<MessageContainer>(); 128 allMsgs.Add(msg); 129 msg = new List<MessageContainer>(); 130 allMsgs.Add(msg); 131 msg = new List<MessageContainer>(); 132 allMsgs.Add(msg); 133 msg = new List<MessageContainer>(); 134 allMsgs.Add(msg); 135 136 msg = new List<MessageContainer>(); 137 msg.Add(new MessageContainer(MessageContainer.MessageType.ShutdownSlave)); 138 allMsgs.Add(msg); 139 return allMsgs; 140 } 141 142 [TestMethod] 143 public void TestTwoJobs() { 144 Job testJob1 = new Job { Id = Guid.NewGuid(), JobState = JobState.Waiting, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 0 }; 145 Job testJob2 = new Job { Id = Guid.NewGuid(), JobState = JobState.Waiting, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 0 }; 146 List<Job> jobList = new List<Job>(); 147 jobList.Add(testJob1); 148 jobList.Add(testJob2); 149 MockJob job = new MockJob(2000, false); 150 151 using (Disposable<IHiveService> service = ServiceLocator.Instance.GetService()) { 152 MockHiveService ms = (MockHiveService)service.Obj; 153 ((MockHiveService)service.Obj).Messages = CreateMsgsForTwoJobs(); 154 ((MockHiveService)service.Obj).updateJobs(jobList, job); 155 156 157 HeuristicLab.Clients.Hive.Salve.Core core = new Salve.Core(); 158 core.Start(); 159 Assert.AreEqual<int>(2, ms.ResultJobs.Count); 160 161 } 162 } 163 164 165 166 167 168 169 170 171 48 172 } 49 173 } -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs
r5105 r5137 23 23 using System.Collections.Generic; 24 24 using System.IO; 25 using System.Runtime.CompilerServices; 25 26 using System.Threading; 27 using HeuristicLab.Clients.Hive.Slave; 26 28 using HeuristicLab.Common; 27 29 using HeuristicLab.Core; 30 using HeuristicLab.Services.Hive.Common; 28 31 using HeuristicLab.Services.Hive.Common.DataTransfer; 29 using HeuristicLab.Services.Hive.Common;30 using HeuristicLab.Clients.Hive.Slave;31 32 32 33 … … 37 38 public class Core : MarshalByRefObject { 38 39 public static bool abortRequested { get; set; } 39 40 40 public static ILog Log { get; set; } 41 41 … … 45 45 46 46 private WcfService wcfService; 47 private HeartbeatManager heartbeatManager; 48 49 private bool currentlyFetching; 50 private bool CurrentlyFetching { 51 get { 52 return currentlyFetching; 53 } 54 set { 55 currentlyFetching = value; 56 Logger.Debug("Set CurrentlyFetching to " + currentlyFetching); 57 } 58 } 47 public HeartbeatManager heartbeatManager; 48 49 private int coreThreadId; 59 50 60 51 public Dictionary<Guid, Executor> ExecutionEngines { … … 62 53 } 63 54 64 internal Dictionary<Guid, Job> Jobs { 55 internal Dictionary<Guid, Job> Jobs { 65 56 get { return jobs; } 57 } 58 59 public Core() { 60 coreThreadId = Thread.CurrentThread.ManagedThreadId; 66 61 } 67 62 … … 86 81 87 82 DeRegisterServiceEvents(); 88 83 89 84 //server.Close(); 90 85 Logger.Info("Program shutdown"); … … 92 87 93 88 private void StartHeartbeats() { 94 //Initialize the heartbeat 89 //Initialize the heartbeat 95 90 heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) }; 96 91 heartbeatManager.StartHeartbeat(); … … 119 114 private void DetermineAction(MessageContainer container) { 120 115 Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId); 121 switch (container.Message) { 122 //Server requests to abort a job 123 case MessageContainer.MessageType.AbortJob: 124 if (engines.ContainsKey(container.JobId)) 125 try { 126 engines[container.JobId].Abort(); 127 } 128 catch (AppDomainUnloadedException) { 129 // appdomain already unloaded. Finishing job probably ongoing 130 } 131 else 132 Logger.Error("AbortJob: Engine doesn't exist"); 133 break; 134 135 //Pull a Job from the Server 136 case MessageContainer.MessageType.AquireJob: 137 if (!CurrentlyFetching) { 116 //TODO: find a better solution 117 if (container is ExecutorMessageContainer<Guid>) { 118 ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container; 119 c.execute(); 120 } else if (container is MessageContainer) { 121 switch (container.Message) { 122 //Server requests to abort a job 123 case MessageContainer.MessageType.AbortJob: 124 if (engines.ContainsKey(container.JobId)) 125 try { 126 engines[container.JobId].Abort(); 127 } 128 catch (AppDomainUnloadedException) { 129 // appdomain already unloaded. Finishing job probably ongoing 130 } else 131 Logger.Error("AbortJob: Engine doesn't exist"); 132 break; 133 134 //Pull a Job from the Server 135 case MessageContainer.MessageType.AquireJob: 136 Job myJob = wcfService.AquireJob(); 138 137 //TODO: handle in own thread!! 139 Job myJob = wcfService.AquireJob();140 138 JobData jobData = wcfService.GetJobData(myJob.Id); 141 wcfService_GetJobCompleted(myJob, jobData); 142 CurrentlyFetching = true; 143 } else 144 Logger.Info("Currently fetching, won't fetch this time!"); 145 break; 146 147 //A Job has finished and can be sent back to the server 148 case MessageContainer.MessageType.JobStopped: 149 SendFinishedJob(container.JobId); 150 break; 151 152 case MessageContainer.MessageType.JobFailed: 153 SendFinishedJob(container.JobId); 154 break; 155 156 //Hard shutdown of the client 157 case MessageContainer.MessageType.ShutdownSlave: 158 ShutdownCore(); 159 break; 139 StartJobInAppDomain(myJob, jobData); 140 break; 141 142 //Hard shutdown of the client 143 case MessageContainer.MessageType.ShutdownSlave: 144 ShutdownCore(); 145 break; 146 } 147 } else { 148 Logger.Warn("Unknown MessageContainer: " + container); 160 149 } 161 150 } … … 163 152 public void ShutdownCore() { 164 153 Logger.Info("Shutdown Signal received"); 165 166 154 Logger.Debug("Stopping heartbeat"); 167 155 heartbeatManager.StopHeartBeat(); 168 abortRequested = true; 156 abortRequested = true; 169 157 Logger.Debug("Logging out"); 170 158 … … 177 165 } 178 166 } 179 180 WcfService.Instance.Disconnect(); 181 } 182 183 //TODO: make synchronized 184 //TODO: start own thread? 185 //only called when waiting for child jobs? 186 public void PauseJob(JobData data ) { 187 if(!Jobs.ContainsKey(data.JobId)) { 188 //TODO: error handling 189 } 190 Job job = Jobs[data.JobId]; 191 job.JobState = JobState.WaitingForChildJobs; 192 193 wcfService.UpdateJob(job, data); 194 KillAppDomain(data.JobId); 195 } 196 197 167 WcfService.Instance.Disconnect(); 168 } 169 170 /// <summary> 171 /// Pauses a job, which means sending it to the server and killing it locally; 172 /// atm only used when executor is waiting for child jobs 173 /// </summary> 174 /// <param name="data"></param> 175 [MethodImpl(MethodImplOptions.Synchronized)] 176 public void PauseJob(JobData data) { 177 if (!Jobs.ContainsKey(data.JobId)) { 178 Logger.Error("Can't find job with id " + data.JobId); 179 } else { 180 Job job = Jobs[data.JobId]; 181 job.JobState = JobState.WaitingForChildJobs; 182 wcfService.UpdateJob(job, data); 183 } 184 KillAppDomain(data.JobId); 185 } 186 198 187 /// <summary> 199 188 /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk. … … 201 190 /// </summary> 202 191 /// <param name="jobId"></param> 192 [MethodImpl(MethodImplOptions.Synchronized)] 203 193 public void SendFinishedJob(object jobId) { 204 194 try { … … 208 198 Logger.Info("Engine doesn't exist"); 209 199 return; 210 } 200 } 211 201 if (!jobs.ContainsKey(jId)) { 212 202 Logger.Info("Job doesn't exist"); … … 218 208 cJob.Exception = engines[jId].CurrentException; 219 209 cJob.ExecutionTime = engines[jId].ExecutionTime; 220 210 221 211 try { 222 212 Logger.Info("Sending the finished job with id: " + jId); 223 213 wcfService.UpdateJob(cJob, sJob); 224 SlaveStatusInfo.JobsProcessed++; 214 SlaveStatusInfo.JobsProcessed++; 225 215 } 226 216 catch (Exception e) { 227 Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")"); 217 Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")"); 228 218 } 229 219 finally { … … 236 226 } 237 227 } 238 228 239 229 /// <summary> 240 230 /// A new Job from the wcfService has been received and will be started within a AppDomain. … … 242 232 /// <param name="sender"></param> 243 233 /// <param name="e"></param> 244 void wcfService_GetJobCompleted(Job myJob, JobData jobData) { 245 Logger.Info("Received new job with id " + myJob.Id); 246 String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString()); 247 bool pluginsPrepared = false; 248 249 try { 250 PluginCache.Instance.PreparePlugins(myJob, jobData); 251 Logger.Debug("Plugins fetched for job " + myJob.Id); 252 pluginsPrepared = true; 234 private void StartJobInAppDomain(Job myJob, JobData jobData) { 235 Logger.Info("Received new job with id " + myJob.Id); 236 String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString()); 237 bool pluginsPrepared = false; 238 239 try { 240 PluginCache.Instance.PreparePlugins(myJob, jobData); 241 Logger.Debug("Plugins fetched for job " + myJob.Id); 242 pluginsPrepared = true; 243 } 244 catch (Exception exception) { 245 Logger.Error(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception)); 246 } 247 248 if (pluginsPrepared) { 249 try { 250 AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName)); 251 appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException); 252 lock (engines) { 253 if (!jobs.ContainsKey(myJob.Id)) { 254 jobs.Add(myJob.Id, myJob); 255 appDomains.Add(myJob.Id, appDomain); 256 Logger.Debug("Creating AppDomain"); 257 Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName); 258 Logger.Debug("Created AppDomain"); 259 engine.JobId = myJob.Id; 260 engine.core = this; 261 Logger.Debug("Starting Engine for job " + myJob.Id); 262 engines.Add(myJob.Id, engine); 263 engine.Start(jobData.Data); 264 SlaveStatusInfo.JobsFetched++; 265 Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched); 266 } 267 } 268 heartbeatManager.AwakeHeartBeatThread(); 253 269 } 254 270 catch (Exception exception) { 255 Logger.Error(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception)); 256 } 257 258 if (pluginsPrepared) { 259 try { 260 AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName)); 261 appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException); 262 lock (engines) { 263 if (!jobs.ContainsKey(myJob.Id)) { 264 jobs.Add(myJob.Id, myJob); 265 appDomains.Add(myJob.Id, appDomain); 266 Logger.Debug("Creating AppDomain"); 267 Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName); 268 Logger.Debug("Created AppDomain"); 269 engine.JobId = myJob.Id; 270 engine.core = this; 271 Logger.Debug("Starting Engine for job " + myJob.Id); 272 engines.Add(myJob.Id, engine); 273 engine.Start(jobData.Data); 274 SlaveStatusInfo.JobsFetched++; 275 Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched); 276 } 277 } 278 heartbeatManager.AwakeHeartBeatThread(); 279 } 280 catch (Exception exception) { 281 Logger.Error("Creating the Appdomain and loading the job failed for job " + myJob.Id); 282 Logger.Error("Error thrown is: ", exception); 283 CurrentlyFetching = false; 284 KillAppDomain(myJob.Id); 285 } 286 } 287 CurrentlyFetching = false; 288 } 289 271 Logger.Error("Creating the Appdomain and loading the job failed for job " + myJob.Id); 272 Logger.Error("Error thrown is: ", exception); 273 KillAppDomain(myJob.Id); 274 } 275 } 276 } 277 290 278 public event EventHandler<EventArgs<Exception>> ExceptionOccured; 291 279 private void OnExceptionOccured(Exception e) { … … 295 283 } 296 284 297 void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {285 private void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) { 298 286 Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString()); 299 287 KillAppDomain(new Guid(e.ExceptionObject.ToString())); … … 301 289 302 290 /// <summary> 291 /// Enqueues messages from the executor to the message queue. 292 /// This is necessary if the core thread has to execute certain actions, e.g. 293 /// killing of an app domain. 294 /// </summary> 295 /// <typeparam name="T"></typeparam> 296 /// <param name="action"></param> 297 /// <param name="parameter"></param> 298 /// <returns>true if the calling method can continue execution, else false</returns> 299 private bool EnqueueExecutorMessage<T>(Action<T> action, T parameter) { 300 if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) { 301 ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>(); 302 container.Callback = action; 303 container.CallbackParameter = parameter; 304 MessageQueue.GetInstance().AddMessage(container); 305 return false; 306 } else { 307 return true; 308 } 309 } 310 311 /// <summary> 303 312 /// Kill a appdomain with a specific id. 304 313 /// </summary> 305 314 /// <param name="id">the GUID of the job</param> 315 [MethodImpl(MethodImplOptions.Synchronized)] 306 316 public void KillAppDomain(Guid id) { 307 Logger.Debug("Shutting down Appdomain for Job " + id); 308 lock (engines) { 309 try { 310 if (engines.ContainsKey(id)) 311 engines[id].Dispose(); 312 if (appDomains.ContainsKey(id)) { 313 appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException); 314 315 int repeat = 5; 316 while (repeat > 0) { 317 try { 318 AppDomain.Unload(appDomains[id]); 319 repeat = 0; 320 } 321 catch (CannotUnloadAppDomainException) { 322 Logger.Error("Could not unload AppDomain, will try again in 1 sec."); 323 Thread.Sleep(1000); 324 repeat--; 325 if (repeat == 0) { 326 throw; // rethrow and let app crash 317 if (EnqueueExecutorMessage<Guid>(KillAppDomain, id)) { 318 Logger.Debug("Shutting down Appdomain for Job " + id); 319 lock (engines) { 320 try { 321 if (engines.ContainsKey(id)) { 322 engines[id].Dispose(); 323 engines.Remove(id); 324 } 325 326 if (appDomains.ContainsKey(id)) { 327 appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException); 328 329 int repeat = 5; 330 while (repeat > 0) { 331 try { 332 AppDomain.Unload(appDomains[id]); 333 repeat = 0; 334 } 335 catch (CannotUnloadAppDomainException) { 336 Logger.Error("Could not unload AppDomain, will try again in 1 sec."); 337 Thread.Sleep(1000); 338 repeat--; 339 if (repeat == 0) { 340 throw; // rethrow and let app crash 341 } 327 342 } 328 343 } 344 appDomains.Remove(id); 329 345 } 330 appDomains.Remove(id); 346 347 jobs.Remove(id); 348 PluginCache.Instance.DeletePluginsForJob(id); 349 GC.Collect(); 331 350 } 332 333 engines.Remove(id); 334 jobs.Remove(id); 335 PluginCache.Instance.DeletePluginsForJob(id); 336 GC.Collect(); 337 } 338 catch (Exception ex) { 339 Logger.Error("Exception when unloading the appdomain: ", ex); 340 } 341 } 342 GC.Collect(); 343 } 344 } 345 346 351 catch (Exception ex) { 352 Logger.Error("Exception when unloading the appdomain: ", ex); 353 } 354 } 355 GC.Collect(); 356 } 357 } 358 } 347 359 } -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Executor.cs
r5106 r5137 21 21 22 22 using System; 23 using System.Collections.Generic; 23 24 using System.Linq; 25 using HeuristicLab.Clients.Hive.Slave; 24 26 using HeuristicLab.Common; 25 27 using HeuristicLab.Core; 26 using HeuristicLab.Services.Hive.Common;27 28 using HeuristicLab.Hive; 29 using HeuristicLab.PluginInfrastructure; 28 30 using HeuristicLab.Services.Hive.Common.DataTransfer; 29 using System.Collections.Generic; 30 using HeuristicLab.Clients.Hive.Slave; 31 using HeuristicLab.PluginInfrastructure; 32 33 31 32 34 33 namespace HeuristicLab.Clients.Hive.Salve { 35 34 public class Executor : MarshalByRefObject, IDisposable { 36 35 public Guid JobId { get; set; } 37 36 public IJob Job { get; set; } 38 p ublic MessageContainer.MessageType CurrentMessage { get; set; }39 public Core core { get; set; } 40 37 private bool wasJobAborted = false; 38 public Core core { get; set; } 39 41 40 private Exception currentException; 42 41 public String CurrentException { … … 49 48 } 50 49 } 51 50 52 51 public ExecutionState ExecutionState { 53 52 get { … … 73 72 RegisterJobEvents(); 74 73 75 if (Job.CollectChildJobs) { 74 if (Job.CollectChildJobs) { 76 75 IEnumerable<JobData> childjobs = WcfService.Instance.GetChildJobs(JobId); 77 76 Job.Resume(childjobs.Select(j => PersistenceUtil.Deserialize<IJob>(j.Data))); … … 82 81 } 83 82 catch (Exception e) { 84 this.currentException = e; 83 this.currentException = e; 85 84 } 86 85 } … … 91 90 } 92 91 catch (Exception e) { 93 this.currentException = e; 92 this.currentException = e; 94 93 } 95 94 } 96 95 97 96 public void Abort() { 98 CurrentMessage = MessageContainer.MessageType.AbortJob;97 wasJobAborted = true; 99 98 if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) { 100 99 Job.Stop(); … … 105 104 106 105 private void RegisterJobEvents() { 107 //TODO: warum gibt es jobStopped und jobfailed nicht mehr?108 106 Job.JobStopped += new EventHandler(Job_JobStopped); 109 107 Job.JobFailed += new EventHandler(Job_JobFailed); … … 123 121 private List<Guid> FindPluginsNeeded(IJob obj) { 124 122 List<Guid> guids = new List<Guid>(); 125 foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) {123 foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) { 126 124 } 127 125 throw new NotImplementedException("FindPluginsNeeded for Job_NewChildJob"); … … 147 145 // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished 148 146 this.Job.CollectChildJobs = true; 149 147 150 148 JobData jdata = new JobData(); 151 149 jdata.Data = PersistenceUtil.Serialize(Job); 152 150 jdata.JobId = this.JobId; 153 151 154 152 core.PauseJob(jdata); 155 153 } … … 167 165 168 166 private void Job_JobStopped(object sender, EventArgs e) { 169 if (CurrentMessage == MessageContainer.MessageType.AbortJob) {170 core.KillAppDomain(JobId);167 if (wasJobAborted) { 168 core.KillAppDomain(JobId); 171 169 } else { 172 170 core.SendFinishedJob(JobId); … … 176 174 public JobData GetFinishedJob() { 177 175 if (Job == null) { 178 throw new InvalidStateException("Job is null"); 179 } 176 throw new InvalidStateException("Job is null"); 177 } 180 178 181 179 if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) { … … 185 183 186 184 if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) { 187 throw new InvalidStateException("Job is still running"); 185 throw new InvalidStateException("Job is still running"); 188 186 } else { 189 187 JobData jdata = new JobData(); … … 192 190 return jdata; 193 191 } 194 } 195 192 } 193 196 194 public Executor() { 197 // CurrentMessage = MessageContainer.MessageType.NoMessage; [chn] check usage of 'CurrentMessage'198 195 } 199 196 … … 201 198 if (Job != null) 202 199 DeregisterJobEvents(); 203 //Queue = null;204 200 Job = null; 205 201 } -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/HeartbeatManager.cs
r5105 r5137 21 21 22 22 using System; 23 using System.Collections.Generic; 23 24 using System.Diagnostics; 24 25 using System.Threading; 25 26 using HeuristicLab.Common; 26 using HeuristicLab. Clients.Hive.Salve;27 using HeuristicLab.Services.Hive.Common; 27 28 using HeuristicLab.Services.Hive.Common.DataTransfer; 28 using HeuristicLab.Services.Hive.Common;29 using System.Collections.Generic;30 29 31 30 namespace HeuristicLab.Clients.Hive.Salve { … … 49 48 private WcfService wcfService; 50 49 51 private bool abortThreadPending;50 private bool threadStopped; 52 51 53 52 ReaderWriterLockSlim heartBeatThreadIsSleepingLock = new ReaderWriterLockSlim(); 54 53 55 54 /// <summary> 56 55 /// Starts the Heartbeat signal. … … 58 57 public void StartHeartbeat() { 59 58 this.waitHandle = new AutoResetEvent(true); 60 wcfService = WcfService.Instance; 61 abortThreadPending= false;59 wcfService = WcfService.Instance; 60 threadStopped = false; 62 61 heartBeatThread = new Thread(RunHeartBeatThread); 63 62 heartBeatThread.Start(); … … 68 67 /// </summary> 69 68 public void StopHeartBeat() { 70 abortThreadPending= true;69 threadStopped = true; 71 70 waitHandle.Set(); 72 71 heartBeatThread.Join(); 73 72 } 74 73 75 74 /// <summary> 76 75 /// use this method to singalize there is work to do (to avoid the waiting period if its clear that actions are required) 77 76 /// </summary> 78 77 public void AwakeHeartBeatThread() { 79 waitHandle.Set(); 78 if (!threadStopped) 79 waitHandle.Set(); 80 80 } 81 81 82 82 private void RunHeartBeatThread() { 83 while (! abortThreadPending) {83 while (!threadStopped) { 84 84 try { 85 85 lock (locker) { … … 87 87 wcfService.Connect(ConfigManager.Instance.GetClientInfo()); // Login happens automatically upon successfull connection 88 88 } 89 if (wcfService.ConnState == NetworkEnum.WcfConnState.Connected) {89 if (wcfService.ConnState == NetworkEnum.WcfConnState.Connected) { 90 90 HeuristicLab.Services.Hive.Common.DataTransfer.Slave info = ConfigManager.Instance.GetClientInfo(); 91 91 92 Heartbeat heartBeatData = new Heartbeat 93 { 92 Heartbeat heartBeatData = new Heartbeat { 94 93 SlaveId = info.Id, 95 94 FreeCores = info.Cores.HasValue ? info.Cores.Value - ConfigManager.Instance.GetUsedCores() : 0, 96 95 FreeMemory = GetFreeMemory(), 97 JobProgress = ConfigManager.Instance.GetExecutionTimeOfAllJobs() 96 JobProgress = ConfigManager.Instance.GetExecutionTimeOfAllJobs() 98 97 }; 99 98 100 99 Logger.Debug("Sending Heartbeat: " + heartBeatData); 101 List<MessageContainer> msgs = 102 100 List<MessageContainer> msgs = wcfService.SendHeartbeat(heartBeatData); 101 103 102 if (msgs == null) { 104 103 Logger.Debug("Error getting response from Heartbeat"); … … 108 107 Logger.Debug("Heartbeat Response received: "); 109 108 msgs.ForEach(mc => Logger.Debug(mc.Message.ToString())); 110 msgs.ForEach(mc => MessageQueue.GetInstance().AddMessage(mc)); 109 msgs.ForEach(mc => MessageQueue.GetInstance().AddMessage(mc)); 111 110 } 112 } // lock111 } 113 112 } 114 113 catch (Exception e) { … … 117 116 } 118 117 waitHandle.WaitOne(this.Interval); 119 } // while118 } 120 119 waitHandle.Close(); 121 abortThreadPending = false;122 120 Logger.Debug("Heartbeat thread stopped"); 123 121 } 124 122 125 123 126 124 #region Eventhandler -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/HeuristicLab.Clients.Hive.Slave-3.4.csproj
r5105 r5137 72 72 <SubType>Code</SubType> 73 73 </Compile> 74 <Compile Include="ExecutorMessageContainer.cs" /> 74 75 <Compile Include="MessageQueue.cs" /> 75 76 <Compile Include="NetworkEnum.cs" /> -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Logger.cs
r5105 r5137 1 1 using System; 2 using System.Collections.Generic;3 using System.Linq;4 using System.Text;5 using HeuristicLab.Core;6 2 7 3 namespace HeuristicLab.Clients.Hive.Salve { … … 11 7 /// TODO: send messages to gui 12 8 /// </summary> 13 internal static class Logger {14 private static object locker = new object(); 9 public static class Logger { 10 private static object locker = new object(); 15 11 16 12 public static void Debug(object message) { … … 42 38 Console.WriteLine(message.ToString() + " \n Exception is :" + e.ToString()); 43 39 } 44 } 40 } 45 41 } 46 42 } -
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive.Common/3.4/MessageContainer.cs
r5106 r5137 43 43 SayHello, // Slave should say hello, because he is unknown to the server 44 44 45 // *** events from execution engine ***46 JobStopped, // job finished, submit results, unload appdomain47 JobPaused, // job paused, submit results, unload appdomain48 JobFailed, // job failed with an exception. submit the results, unload appdomain49 50 45 // *** commands from execution engine *** 51 46 AddChildJob, // adds a new child job for the provided jobId
Note: See TracChangeset
for help on using the changeset viewer.