Changeset 5137


Ignore:
Timestamp:
12/20/10 19:58:01 (10 years ago)
Author:
ascheibe
Message:

#1233

  • more tests for the hive slave
  • implemented a better way to write tests for the slave
  • get rid of MessageTypes for core and executor
  • various improvements in core and executor (better communication, bugfixes,...)
Location:
branches/HeuristicLab.Hive-3.4/sources
Files:
1 added
10 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.4/sources

    • Property svn:ignore
      •  

        old new  
         1*.suo
        12HeuristicLab.Hive-3.4.suo
        23TestResults
        3 HeuristicLab.Hive 3.4.suo
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave.Tests/Mocks/MockHiveService.cs

    r5106 r5137  
    11using System;
    22using System.Collections.Generic;
     3using System.IO;
    34using System.Linq;
    4 using System.Text;
     5using HeuristicLab.PluginInfrastructure;
     6using HeuristicLab.PluginInfrastructure.Manager;
     7using HeuristicLab.Services.Hive.Common;
     8using HeuristicLab.Services.Hive.Common.DataTransfer;
    59using HeuristicLab.Services.Hive.Common.ServiceContracts;
    6 using HeuristicLab.Services.Hive.Common.DataTransfer;
    7 using HeuristicLab.Services.Hive.Common;
    8 using HeuristicLab.PluginInfrastructure.Manager;
    9 using System.IO;
    10 using System.Reflection;
    11 using HeuristicLab.PluginInfrastructure;
    1210
    1311namespace HeuristicLab.Clients.Hive.Slave.Tests {
     
    1513    private static List<Job> jobs;
    1614    private static List<JobData> jobDatas;
    17     private static List<HeuristicLab.Services.Hive.Common.DataTransfer.HiveExperiment> hiveExperiments;   
     15    private static List<HeuristicLab.Services.Hive.Common.DataTransfer.HiveExperiment> hiveExperiments;
    1816    private static List<Plugin> plugins = null;
     17    public List<Job> ResultJobs { get; set; }
     18    public List<JobData> ResultJobDatas { get; set; }
    1919    private static IEnumerable<PluginDescription> pDescs = null;
    2020    private static int cnt = 0;
     
    2828      ServerConfigFilePath = Path.Combine(ServerPluginCachePath, "HeuristicLab 3.3.exe.config");
    2929
    30       byte[] data = PersistenceUtil.Serialize(new MockJob(4000, false));
    3130      if (plugins == null)
    3231        plugins = ReadPluginsFromServerCache();
    3332
    34       jobs = new List<Job>();
     33
     34      //TODO: reuse as fallback?
     35      /*jobs = new List<Job>();
    3536      jobs.Add(new Job { Id = Guid.NewGuid(), JobState = JobState.Waiting, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 0 });
    3637      jobs.Add(new Job { Id = Guid.NewGuid(), JobState = JobState.Offline, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 100, ParentJobId = jobs.First().Id });
    3738      jobs.Add(new Job { Id = Guid.NewGuid(), JobState = JobState.Offline, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 1024, ParentJobId = jobs.First().Id });
    3839      jobs.Add(new Job { Id = Guid.NewGuid(), JobState = JobState.Offline, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 4096, ParentJobId = jobs.First().Id });
    39 
     40      byte[] data = PersistenceUtil.Serialize(new MockJob(400, false));
    4041      jobDatas = new List<JobData>();
    4142      foreach (var job in jobs) {
    4243        job.PluginsNeededIds = new List<Guid>();
    4344        job.PluginsNeededIds.AddRange(plugins.Select(a => a.Id));
    44         jobDatas.Add(new JobData() { JobId = job.Id, Data = data });       
    45       }
    46      
     45        jobDatas.Add(new JobData() { JobId = job.Id, Data = data });
     46      }
     47
     48      hiveExperiments = new List<HeuristicLab.Services.Hive.Common.DataTransfer.HiveExperiment>();
     49      hiveExperiments.Add(new HeuristicLab.Services.Hive.Common.DataTransfer.HiveExperiment() { Id = Guid.NewGuid(), Name = "Hive Exp 1", Description = "", RootJobId = jobs[0].Id });*/
     50    }
     51
     52    public void updateJobs(List<Job> js, MockJob m) {
     53      jobs = js;
     54      byte[] data = PersistenceUtil.Serialize(m);
     55
     56      jobDatas = new List<JobData>();
     57      foreach (var job in jobs) {
     58        job.PluginsNeededIds = new List<Guid>();
     59        job.PluginsNeededIds.AddRange(plugins.Select(a => a.Id));
     60        jobDatas.Add(new JobData() { JobId = job.Id, Data = data });
     61      }
     62
    4763      hiveExperiments = new List<HeuristicLab.Services.Hive.Common.DataTransfer.HiveExperiment>();
    4864      hiveExperiments.Add(new HeuristicLab.Services.Hive.Common.DataTransfer.HiveExperiment() { Id = Guid.NewGuid(), Name = "Hive Exp 1", Description = "", RootJobId = jobs[0].Id });
     65
     66      ResultJobDatas = new List<JobData>();
     67      ResultJobs = new List<Job>();
    4968    }
    5069
     
    6079        p.Id = Guid.NewGuid();
    6180        plugins.Add(p);
     81
    6282      }
    6383      return plugins;
     
    87107      pd.Data = cf;
    88108      return pd;
     109
    89110    }
    90111
    91112    public Job GetJob(Guid jobId) {
     113      //MockHBM.SendHeartbeat();
    92114      return jobs.Where(j => j.Id == jobId).SingleOrDefault();
    93115    }
     
    106128
    107129    public JobData GetJobData(Guid jobId) {
    108       return jobDatas.Where(jd => jd.JobId == jobId).SingleOrDefault();
     130      JobData ret = jobDatas.Where(jd => jd.JobId == jobId).SingleOrDefault();
     131      //MockHBM.SendHeartbeat();
     132      return ret;
    109133    }
    110134
    111135    public void UpdateJob(Job jobDto, JobData jobDataDto) {
    112136      Console.WriteLine("Update Job called!");
     137      ResultJobDatas.Add(jobDataDto);
     138      ResultJobs.Add(jobDto);
    113139    }
    114140
     
    123149
    124150    #region JobControl Methods
     151    int curJobIdx = 0;
    125152    public Job AquireJob(Guid slaveId) {
    126       var job = jobs.First();
    127       job.SlaveId = slaveId;
    128       return job;
     153      if (curJobIdx < jobs.Count) {
     154        var job = jobs[curJobIdx];
     155        job.SlaveId = slaveId;
     156        curJobIdx++;
     157        return job;
     158      }
     159      throw new Exception("No Jobs left on MockHiveService!");
    129160    }
    130161
     
    169200    public void GoodBye() {
    170201      // do nothing
    171     }   
    172    
     202    }
     203
    173204    public List<MessageContainer> Heartbeat(Heartbeat heartbeat) {
    174205      if (Messages != null && cnt < Messages.Count) {
     
    178209      }
    179210    }
    180      
     211
    181212    #endregion
    182213
    183214    #region Plugin Methods
    184     public Guid AddPlugin(Plugin plugin, List<PluginData> pluginData) {
    185       // todo
    186       return Guid.NewGuid();
     215    public Guid AddPlugin(Plugin p, List<PluginData> pds) {
     216      /*p.Id = Guid.NewGuid();
     217      plugins.Add(p);
     218      foreach (var pluginData in pds) {
     219        pluginData.PluginId = p.Id;
     220        pluginDatas.Add(pluginData);
     221
     222      }
     223      return p.Id;*/
     224      throw new NotImplementedException();
     225
    187226    }
    188227
     
    190229      if (plugins == null)
    191230        plugins = ReadPluginsFromServerCache();
    192       return plugins;             
     231      return plugins;
    193232    }
    194233
     
    207246              pluginDatas.Add(data);
    208247            }
    209           }         
    210         }     
     248          }
     249        }
    211250      }
    212251      return pluginDatas;
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave.Tests/Mocks/MockJob.cs

    r5106 r5137  
    114114        OnJobStopped();
    115115      }
     116      catch (ThreadAbortException) {
     117        //this happens when the appdomain is killed (e.g. abort from server)
     118        Stop();
     119      }
    116120      catch (Exception e) {
    117121        this.ExecutionState = Core.ExecutionState.Stopped;
    118         OnJobFailed();
     122        OnJobFailed(e);
    119123      }
    120124    }
     
    170174                                                     
    171175    public event EventHandler JobFailed;
    172     protected virtual void OnJobFailed() {
     176    protected virtual void OnJobFailed(Exception e) {
    173177      EventHandler handler = JobFailed;
    174       if (handler != null) handler(this, EventArgs.Empty);
     178      EventArgs<Exception> ev = new EventArgs<Exception>(e);         
     179      if (handler != null) handler(this, ev);
    175180    }
    176181
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave.Tests/SlaveTest.cs

    r5104 r5137  
    11using System;
    2 using System.Text;
    32using System.Collections.Generic;
    43using System.Linq;
     4using HeuristicLab.Clients.Common;
     5using HeuristicLab.Services.Hive.Common;
     6using HeuristicLab.Services.Hive.Common.DataTransfer;
     7using HeuristicLab.Services.Hive.Common.ServiceContracts;
    58using Microsoft.VisualStudio.TestTools.UnitTesting;
    6 using HeuristicLab.Clients.Hive.Salve;
    7 using HeuristicLab.Services.Hive.Common;
    8 using HeuristicLab.Services.Hive.Common.ServiceContracts;
    9 using HeuristicLab.Clients.Common;
    109
    1110namespace HeuristicLab.Clients.Hive.Slave.Tests {
     
    1615    [ClassInitialize]
    1716    public static void MyClassInitialize(TestContext testContext) {
    18       PluginLoader.pluginAssemblies.Any();     
     17      PluginLoader.pluginAssemblies.Any();
    1918      ServiceLocator.Instance = new MockServiceLocator();
    2019    }
    2120
    22     public List<List<MessageContainer>> CreateMsgs() {
     21    private List<List<MessageContainer>> CreateMsgsForSingleJob() {
    2322      List<List<MessageContainer>> allMsgs = new List<List<MessageContainer>>();
    2423      //get slave to fetch job
     
    2827
    2928      //do nothing
     29      msg = new List<MessageContainer>();
     30      allMsgs.Add(msg);
     31      msg = new List<MessageContainer>();
     32      allMsgs.Add(msg);
     33      msg = new List<MessageContainer>();
     34      allMsgs.Add(msg);
    3035      msg = new List<MessageContainer>();
    3136      allMsgs.Add(msg);
     
    3843
    3944    [TestMethod]
    40     public void TestMethod1() {
    41       using (Disposable<IHiveService> service = ServiceLocator.Instance.GetService()) {
    42         ((MockHiveService)service.Obj).Messages = CreateMsgs();
     45    public void TestSingleJob() {
     46      Job testJob = new Job { Id = Guid.NewGuid(), JobState = JobState.Waiting, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 0 };
     47      List<Job> jobList = new List<Job>();
     48      jobList.Add(testJob);
     49      MockJob job = new MockJob(400, false);
     50
     51      using (Disposable<IHiveService> service = ServiceLocator.Instance.GetService()) {
     52        MockHiveService ms = (MockHiveService)service.Obj;
     53        ((MockHiveService)service.Obj).Messages = CreateMsgsForSingleJob();
     54        ((MockHiveService)service.Obj).updateJobs(jobList, job);
     55
     56
     57        HeuristicLab.Clients.Hive.Salve.Core core = new Salve.Core();
     58        core.Start();
     59
     60        Assert.AreEqual<int>(1, ms.ResultJobs.Count);
     61        Assert.AreEqual<Guid>(testJob.Id, ms.ResultJobs[0].Id);
    4362      }
    44      
    45       HeuristicLab.Clients.Hive.Salve.Core core = new Salve.Core();
    46       core.Start();
    4763    }
     64
     65    private List<List<MessageContainer>> CreateMsgsForShutdownSlaveWhileJobRunning() {
     66      List<List<MessageContainer>> allMsgs = new List<List<MessageContainer>>();
     67      //get slave to fetch job
     68      List<MessageContainer> msg = new List<MessageContainer>();
     69      msg.Add(new MessageContainer(MessageContainer.MessageType.AquireJob));
     70      allMsgs.Add(msg);
     71
     72      msg = new List<MessageContainer>();
     73      allMsgs.Add(msg);
     74
     75      msg = new List<MessageContainer>();
     76      msg.Add(new MessageContainer(MessageContainer.MessageType.ShutdownSlave));
     77      allMsgs.Add(msg);
     78      return allMsgs;
     79    }
     80
     81    [TestMethod]
     82    public void TestShutdownSlaveWhileJobRunning() {
     83      Job testJob = new Job { Id = Guid.NewGuid(), JobState = JobState.Waiting, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 0 };
     84      List<Job> jobList = new List<Job>();
     85      jobList.Add(testJob);
     86      MockJob job = new MockJob(10000, false);
     87
     88      using (Disposable<IHiveService> service = ServiceLocator.Instance.GetService()) {
     89        MockHiveService ms = (MockHiveService)service.Obj;
     90        ((MockHiveService)service.Obj).Messages = CreateMsgsForShutdownSlaveWhileJobRunning();
     91        ((MockHiveService)service.Obj).updateJobs(jobList, job);
     92
     93
     94        HeuristicLab.Clients.Hive.Salve.Core core = new Salve.Core();
     95        core.Start();
     96
     97      }
     98    }
     99
     100
     101    private List<List<MessageContainer>> CreateMsgsForTwoJobs() {
     102      List<List<MessageContainer>> allMsgs = new List<List<MessageContainer>>();
     103
     104      //get slave to fetch jobs
     105      List<MessageContainer> msg = new List<MessageContainer>();
     106      msg.Add(new MessageContainer(MessageContainer.MessageType.AquireJob));
     107      allMsgs.Add(msg);
     108
     109      msg = new List<MessageContainer>();
     110      allMsgs.Add(msg);
     111
     112      msg = new List<MessageContainer>();
     113      msg.Add(new MessageContainer(MessageContainer.MessageType.AquireJob));
     114      allMsgs.Add(msg);
     115
     116
     117      msg = new List<MessageContainer>();
     118      allMsgs.Add(msg);
     119      msg = new List<MessageContainer>();
     120      allMsgs.Add(msg);
     121      msg = new List<MessageContainer>();
     122      allMsgs.Add(msg);
     123      msg = new List<MessageContainer>();
     124      allMsgs.Add(msg);
     125      msg = new List<MessageContainer>();
     126      allMsgs.Add(msg);
     127      msg = new List<MessageContainer>();
     128      allMsgs.Add(msg);
     129      msg = new List<MessageContainer>();
     130      allMsgs.Add(msg);
     131      msg = new List<MessageContainer>();
     132      allMsgs.Add(msg);
     133      msg = new List<MessageContainer>();
     134      allMsgs.Add(msg);
     135
     136      msg = new List<MessageContainer>();
     137      msg.Add(new MessageContainer(MessageContainer.MessageType.ShutdownSlave));
     138      allMsgs.Add(msg);
     139      return allMsgs;
     140    }
     141
     142    [TestMethod]
     143    public void TestTwoJobs() {
     144      Job testJob1 = new Job { Id = Guid.NewGuid(), JobState = JobState.Waiting, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 0 };
     145      Job testJob2 = new Job { Id = Guid.NewGuid(), JobState = JobState.Waiting, DateCreated = DateTime.Now, CoresNeeded = 1, MemoryNeeded = 0 };
     146      List<Job> jobList = new List<Job>();
     147      jobList.Add(testJob1);
     148      jobList.Add(testJob2);
     149      MockJob job = new MockJob(2000, false);
     150
     151      using (Disposable<IHiveService> service = ServiceLocator.Instance.GetService()) {
     152        MockHiveService ms = (MockHiveService)service.Obj;
     153        ((MockHiveService)service.Obj).Messages = CreateMsgsForTwoJobs();
     154        ((MockHiveService)service.Obj).updateJobs(jobList, job);
     155
     156
     157        HeuristicLab.Clients.Hive.Salve.Core core = new Salve.Core();
     158        core.Start();
     159        Assert.AreEqual<int>(2, ms.ResultJobs.Count);
     160
     161      }
     162    }
     163
     164
     165
     166
     167
     168
     169
     170
     171
    48172  }
    49173}
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Core.cs

    r5105 r5137  
    2323using System.Collections.Generic;
    2424using System.IO;
     25using System.Runtime.CompilerServices;
    2526using System.Threading;
     27using HeuristicLab.Clients.Hive.Slave;
    2628using HeuristicLab.Common;
    2729using HeuristicLab.Core;
     30using HeuristicLab.Services.Hive.Common;
    2831using HeuristicLab.Services.Hive.Common.DataTransfer;
    29 using HeuristicLab.Services.Hive.Common;
    30 using HeuristicLab.Clients.Hive.Slave;
    3132
    3233
     
    3738  public class Core : MarshalByRefObject {
    3839    public static bool abortRequested { get; set; }
    39 
    4040    public static ILog Log { get; set; }
    4141
     
    4545
    4646    private WcfService wcfService;
    47     private HeartbeatManager heartbeatManager;
    48 
    49     private bool currentlyFetching;
    50     private bool CurrentlyFetching {
    51       get {
    52         return currentlyFetching;
    53       }
    54       set {
    55         currentlyFetching = value;
    56         Logger.Debug("Set CurrentlyFetching to " + currentlyFetching);
    57       }
    58     }
     47    public HeartbeatManager heartbeatManager;
     48
     49    private int coreThreadId;
    5950
    6051    public Dictionary<Guid, Executor> ExecutionEngines {
     
    6253    }
    6354
    64     internal Dictionary<Guid, Job> Jobs {   
     55    internal Dictionary<Guid, Job> Jobs {
    6556      get { return jobs; }
     57    }
     58
     59    public Core() {
     60      coreThreadId = Thread.CurrentThread.ManagedThreadId;
    6661    }
    6762
     
    8681
    8782      DeRegisterServiceEvents();
    88      
     83
    8984      //server.Close();
    9085      Logger.Info("Program shutdown");
     
    9287
    9388    private void StartHeartbeats() {
    94       //Initialize the heartbeat
     89      //Initialize the heartbeat     
    9590      heartbeatManager = new HeartbeatManager { Interval = new TimeSpan(0, 0, 10) };
    9691      heartbeatManager.StartHeartbeat();
     
    119114    private void DetermineAction(MessageContainer container) {
    120115      Logger.Info("Message: " + container.Message.ToString() + " for job: " + container.JobId);
    121       switch (container.Message) {
    122         //Server requests to abort a job
    123         case MessageContainer.MessageType.AbortJob:
    124           if (engines.ContainsKey(container.JobId))
    125             try {
    126               engines[container.JobId].Abort();
    127             }
    128             catch (AppDomainUnloadedException) {
    129               // appdomain already unloaded. Finishing job probably ongoing
    130             }
    131           else
    132             Logger.Error("AbortJob: Engine doesn't exist");
    133           break;
    134                        
    135         //Pull a Job from the Server
    136         case MessageContainer.MessageType.AquireJob:         
    137           if (!CurrentlyFetching) {
     116      //TODO: find a better solution
     117      if (container is ExecutorMessageContainer<Guid>) {
     118        ExecutorMessageContainer<Guid> c = (ExecutorMessageContainer<Guid>)container;
     119        c.execute();
     120      } else if (container is MessageContainer) {
     121        switch (container.Message) {
     122          //Server requests to abort a job
     123          case MessageContainer.MessageType.AbortJob:
     124            if (engines.ContainsKey(container.JobId))
     125              try {
     126                engines[container.JobId].Abort();
     127              }
     128              catch (AppDomainUnloadedException) {
     129                // appdomain already unloaded. Finishing job probably ongoing
     130              } else
     131              Logger.Error("AbortJob: Engine doesn't exist");
     132            break;
     133
     134          //Pull a Job from the Server
     135          case MessageContainer.MessageType.AquireJob:
     136            Job myJob = wcfService.AquireJob();
    138137            //TODO: handle in own thread!!
    139             Job myJob = wcfService.AquireJob();
    140138            JobData jobData = wcfService.GetJobData(myJob.Id);
    141             wcfService_GetJobCompleted(myJob, jobData);           
    142             CurrentlyFetching = true;
    143           } else
    144             Logger.Info("Currently fetching, won't fetch this time!");
    145           break;
    146 
    147         //A Job has finished and can be sent back to the server
    148         case MessageContainer.MessageType.JobStopped:
    149           SendFinishedJob(container.JobId);
    150           break;
    151 
    152         case MessageContainer.MessageType.JobFailed:
    153           SendFinishedJob(container.JobId);
    154           break;       
    155              
    156         //Hard shutdown of the client
    157         case MessageContainer.MessageType.ShutdownSlave:
    158           ShutdownCore();
    159           break;
     139            StartJobInAppDomain(myJob, jobData);
     140            break;
     141
     142          //Hard shutdown of the client
     143          case MessageContainer.MessageType.ShutdownSlave:
     144            ShutdownCore();
     145            break;
     146        }
     147      } else {
     148        Logger.Warn("Unknown MessageContainer: " + container);
    160149      }
    161150    }
     
    163152    public void ShutdownCore() {
    164153      Logger.Info("Shutdown Signal received");
    165      
    166154      Logger.Debug("Stopping heartbeat");
    167155      heartbeatManager.StopHeartBeat();
    168       abortRequested = true;     
     156      abortRequested = true;
    169157      Logger.Debug("Logging out");
    170158
     
    177165        }
    178166      }
    179      
    180       WcfService.Instance.Disconnect();   
    181     }
    182 
    183     //TODO: make synchronized
    184     //TODO: start own thread?
    185     //only called when waiting for child jobs?
    186     public void PauseJob(JobData data ) {
    187       if(!Jobs.ContainsKey(data.JobId)) {
    188         //TODO: error handling
    189       }
    190       Job job = Jobs[data.JobId];
    191       job.JobState = JobState.WaitingForChildJobs;
    192 
    193       wcfService.UpdateJob(job, data);     
    194       KillAppDomain(data.JobId);     
    195     }
    196            
    197    
     167      WcfService.Instance.Disconnect();
     168    }
     169
     170    /// <summary>
     171    /// Pauses a job, which means sending it to the server and killing it locally;
     172    /// atm only used when executor is waiting for child jobs
     173    /// </summary>
     174    /// <param name="data"></param>
     175    [MethodImpl(MethodImplOptions.Synchronized)]
     176    public void PauseJob(JobData data) {
     177      if (!Jobs.ContainsKey(data.JobId)) {
     178        Logger.Error("Can't find job with id " + data.JobId);
     179      } else {
     180        Job job = Jobs[data.JobId];
     181        job.JobState = JobState.WaitingForChildJobs;
     182        wcfService.UpdateJob(job, data);
     183      }
     184      KillAppDomain(data.JobId);
     185    }
     186
    198187    /// <summary>
    199188    /// serializes the finished job and submits it to the server. If, at the time, a network connection is unavailable, the Job gets stored on the disk.
     
    201190    /// </summary>
    202191    /// <param name="jobId"></param>
     192    [MethodImpl(MethodImplOptions.Synchronized)]
    203193    public void SendFinishedJob(object jobId) {
    204194      try {
     
    208198          Logger.Info("Engine doesn't exist");
    209199          return;
    210         } 
     200        }
    211201        if (!jobs.ContainsKey(jId)) {
    212202          Logger.Info("Job doesn't exist");
     
    218208        cJob.Exception = engines[jId].CurrentException;
    219209        cJob.ExecutionTime = engines[jId].ExecutionTime;
    220        
     210
    221211        try {
    222212          Logger.Info("Sending the finished job with id: " + jId);
    223213          wcfService.UpdateJob(cJob, sJob);
    224           SlaveStatusInfo.JobsProcessed++;         
     214          SlaveStatusInfo.JobsProcessed++;
    225215        }
    226216        catch (Exception e) {
    227           Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");         
     217          Logger.Info("Transmitting to server failed. Storing the finished job with id: " + jId + " to hdd (" + e.ToString() + ")");
    228218        }
    229219        finally {
     
    236226      }
    237227    }
    238  
     228
    239229    /// <summary>
    240230    /// A new Job from the wcfService has been received and will be started within a AppDomain.
     
    242232    /// <param name="sender"></param>
    243233    /// <param name="e"></param>
    244     void wcfService_GetJobCompleted(Job myJob, JobData jobData) {     
    245         Logger.Info("Received new job with id " + myJob.Id);
    246         String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
    247         bool pluginsPrepared = false;
    248 
    249         try {       
    250           PluginCache.Instance.PreparePlugins(myJob, jobData);
    251           Logger.Debug("Plugins fetched for job " + myJob.Id);
    252           pluginsPrepared = true;
     234    private void StartJobInAppDomain(Job myJob, JobData jobData) {
     235      Logger.Info("Received new job with id " + myJob.Id);
     236      String pluginDir = Path.Combine(PluginCache.Instance.PluginTempBaseDir, myJob.Id.ToString());
     237      bool pluginsPrepared = false;
     238
     239      try {
     240        PluginCache.Instance.PreparePlugins(myJob, jobData);
     241        Logger.Debug("Plugins fetched for job " + myJob.Id);
     242        pluginsPrepared = true;
     243      }
     244      catch (Exception exception) {
     245        Logger.Error(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
     246      }
     247
     248      if (pluginsPrepared) {
     249        try {
     250          AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName));
     251          appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
     252          lock (engines) {
     253            if (!jobs.ContainsKey(myJob.Id)) {
     254              jobs.Add(myJob.Id, myJob);
     255              appDomains.Add(myJob.Id, appDomain);
     256              Logger.Debug("Creating AppDomain");
     257              Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
     258              Logger.Debug("Created AppDomain");
     259              engine.JobId = myJob.Id;
     260              engine.core = this;
     261              Logger.Debug("Starting Engine for job " + myJob.Id);
     262              engines.Add(myJob.Id, engine);
     263              engine.Start(jobData.Data);
     264              SlaveStatusInfo.JobsFetched++;
     265              Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
     266            }
     267          }
     268          heartbeatManager.AwakeHeartBeatThread();
    253269        }
    254270        catch (Exception exception) {
    255           Logger.Error(string.Format("Copying plugins for job {0} failed: {1}", myJob.Id, exception));
    256         }
    257 
    258         if (pluginsPrepared) {
    259           try {
    260             AppDomain appDomain = HeuristicLab.PluginInfrastructure.Sandboxing.SandboxManager.CreateAndInitSandbox(myJob.Id.ToString(), pluginDir, Path.Combine(pluginDir, PluginCache.ConfigFileName));
    261             appDomain.UnhandledException += new UnhandledExceptionEventHandler(appDomain_UnhandledException);
    262             lock (engines) {
    263               if (!jobs.ContainsKey(myJob.Id)) {
    264                 jobs.Add(myJob.Id, myJob);
    265                 appDomains.Add(myJob.Id, appDomain);
    266                 Logger.Debug("Creating AppDomain");
    267                 Executor engine = (Executor)appDomain.CreateInstanceAndUnwrap(typeof(Executor).Assembly.GetName().Name, typeof(Executor).FullName);
    268                 Logger.Debug("Created AppDomain");
    269                 engine.JobId = myJob.Id;
    270                 engine.core = this;
    271                 Logger.Debug("Starting Engine for job " + myJob.Id);
    272                 engines.Add(myJob.Id, engine);
    273                 engine.Start(jobData.Data);
    274                 SlaveStatusInfo.JobsFetched++;
    275                 Logger.Info("Increment FetchedJobs to:" + SlaveStatusInfo.JobsFetched);
    276               }
    277             }
    278             heartbeatManager.AwakeHeartBeatThread();
    279           }
    280           catch (Exception exception) {
    281             Logger.Error("Creating the Appdomain and loading the job failed for job " + myJob.Id);
    282             Logger.Error("Error thrown is: ", exception);
    283             CurrentlyFetching = false;
    284             KillAppDomain(myJob.Id);
    285           }
    286         }     
    287       CurrentlyFetching = false;
    288     }
    289  
     271          Logger.Error("Creating the Appdomain and loading the job failed for job " + myJob.Id);
     272          Logger.Error("Error thrown is: ", exception);
     273          KillAppDomain(myJob.Id);
     274        }
     275      }
     276    }
     277
    290278    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
    291279    private void OnExceptionOccured(Exception e) {
     
    295283    }
    296284
    297     void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
     285    private void appDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
    298286      Logger.Error("Exception in AppDomain: " + e.ExceptionObject.ToString());
    299287      KillAppDomain(new Guid(e.ExceptionObject.ToString()));
     
    301289
    302290    /// <summary>
     291    /// Enqueues messages from the executor to the message queue.
     292    /// This is necessary if the core thread has to execute certain actions, e.g.
     293    /// killing of an app domain.
     294    /// </summary>
     295    /// <typeparam name="T"></typeparam>
     296    /// <param name="action"></param>
     297    /// <param name="parameter"></param>
     298    /// <returns>true if the calling method can continue execution, else false</returns>
     299    private bool EnqueueExecutorMessage<T>(Action<T> action, T parameter) {
     300      if (Thread.CurrentThread.ManagedThreadId != this.coreThreadId) {
     301        ExecutorMessageContainer<T> container = new ExecutorMessageContainer<T>();
     302        container.Callback = action;
     303        container.CallbackParameter = parameter;
     304        MessageQueue.GetInstance().AddMessage(container);
     305        return false;
     306      } else {
     307        return true;
     308      }
     309    }
     310
     311    /// <summary>
    303312    /// Kill a appdomain with a specific id.
    304313    /// </summary>
    305314    /// <param name="id">the GUID of the job</param>
     315    [MethodImpl(MethodImplOptions.Synchronized)]
    306316    public void KillAppDomain(Guid id) {
    307       Logger.Debug("Shutting down Appdomain for Job " + id);
    308       lock (engines) {
    309         try {
    310           if (engines.ContainsKey(id))
    311             engines[id].Dispose();
    312           if (appDomains.ContainsKey(id)) {
    313             appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
    314 
    315             int repeat = 5;
    316             while (repeat > 0) {
    317               try {
    318                 AppDomain.Unload(appDomains[id]);
    319                 repeat = 0;
    320               }
    321               catch (CannotUnloadAppDomainException) {
    322                 Logger.Error("Could not unload AppDomain, will try again in 1 sec.");
    323                 Thread.Sleep(1000);
    324                 repeat--;
    325                 if (repeat == 0) {
    326                   throw; // rethrow and let app crash
     317      if (EnqueueExecutorMessage<Guid>(KillAppDomain, id)) {
     318        Logger.Debug("Shutting down Appdomain for Job " + id);
     319        lock (engines) {
     320          try {
     321            if (engines.ContainsKey(id)) {
     322              engines[id].Dispose();
     323              engines.Remove(id);
     324            }
     325
     326            if (appDomains.ContainsKey(id)) {
     327              appDomains[id].UnhandledException -= new UnhandledExceptionEventHandler(appDomain_UnhandledException);
     328
     329              int repeat = 5;
     330              while (repeat > 0) {
     331                try {
     332                  AppDomain.Unload(appDomains[id]);
     333                  repeat = 0;
     334                }
     335                catch (CannotUnloadAppDomainException) {
     336                  Logger.Error("Could not unload AppDomain, will try again in 1 sec.");
     337                  Thread.Sleep(1000);
     338                  repeat--;
     339                  if (repeat == 0) {
     340                    throw; // rethrow and let app crash
     341                  }
    327342                }
    328343              }
     344              appDomains.Remove(id);
    329345            }
    330             appDomains.Remove(id);
     346
     347            jobs.Remove(id);
     348            PluginCache.Instance.DeletePluginsForJob(id);
     349            GC.Collect();
    331350          }
    332 
    333           engines.Remove(id);
    334           jobs.Remove(id);
    335           PluginCache.Instance.DeletePluginsForJob(id);
    336           GC.Collect();
    337         }
    338         catch (Exception ex) {
    339           Logger.Error("Exception when unloading the appdomain: ", ex);
    340         }
    341       }
    342       GC.Collect();
    343     }
    344   }   
    345 
    346 
     351          catch (Exception ex) {
     352            Logger.Error("Exception when unloading the appdomain: ", ex);
     353          }
     354        }
     355        GC.Collect();
     356      }
     357    }
     358  }
    347359}
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Executor.cs

    r5106 r5137  
    2121
    2222using System;
     23using System.Collections.Generic;
    2324using System.Linq;
     25using HeuristicLab.Clients.Hive.Slave;
    2426using HeuristicLab.Common;
    2527using HeuristicLab.Core;
    26 using HeuristicLab.Services.Hive.Common;
    2728using HeuristicLab.Hive;
     29using HeuristicLab.PluginInfrastructure;
    2830using HeuristicLab.Services.Hive.Common.DataTransfer;
    29 using System.Collections.Generic;
    30 using HeuristicLab.Clients.Hive.Slave;
    31 using HeuristicLab.PluginInfrastructure;
    32 
    33                                              
     31
     32
    3433namespace HeuristicLab.Clients.Hive.Salve {
    3534  public class Executor : MarshalByRefObject, IDisposable {
    3635    public Guid JobId { get; set; }
    3736    public IJob Job { get; set; }
    38     public MessageContainer.MessageType CurrentMessage { get; set; }
    39     public Core core { get; set; }   
    40        
     37    private bool wasJobAborted = false;
     38    public Core core { get; set; }
     39
    4140    private Exception currentException;
    4241    public String CurrentException {
     
    4948      }
    5049    }
    51    
     50
    5251    public ExecutionState ExecutionState {
    5352      get {
     
    7372        RegisterJobEvents();
    7473
    75         if (Job.CollectChildJobs) {         
     74        if (Job.CollectChildJobs) {
    7675          IEnumerable<JobData> childjobs = WcfService.Instance.GetChildJobs(JobId);
    7776          Job.Resume(childjobs.Select(j => PersistenceUtil.Deserialize<IJob>(j.Data)));
     
    8281      }
    8382      catch (Exception e) {
    84         this.currentException = e;       
     83        this.currentException = e;
    8584      }
    8685    }
     
    9190      }
    9291      catch (Exception e) {
    93         this.currentException = e;       
     92        this.currentException = e;
    9493      }
    9594    }
    9695
    9796    public void Abort() {
    98       CurrentMessage = MessageContainer.MessageType.AbortJob;
     97      wasJobAborted = true;
    9998      if ((ExecutionState == ExecutionState.Started) || (ExecutionState == ExecutionState.Paused)) {
    10099        Job.Stop();
     
    105104
    106105    private void RegisterJobEvents() {
    107       //TODO: warum gibt es jobStopped und jobfailed nicht mehr?
    108106      Job.JobStopped += new EventHandler(Job_JobStopped);
    109107      Job.JobFailed += new EventHandler(Job_JobFailed);
     
    123121    private List<Guid> FindPluginsNeeded(IJob obj) {
    124122      List<Guid> guids = new List<Guid>();
    125       foreach(IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) {       
     123      foreach (IPluginDescription desc in PluginUtil.GetDeclaringPlugins(obj)) {
    126124      }
    127125      throw new NotImplementedException("FindPluginsNeeded for Job_NewChildJob");
     
    147145      // Pause the job and send it back to the hive. The server will awake it when all child-jobs are finished
    148146      this.Job.CollectChildJobs = true;
    149      
     147
    150148      JobData jdata = new JobData();
    151149      jdata.Data = PersistenceUtil.Serialize(Job);
    152150      jdata.JobId = this.JobId;
    153      
     151
    154152      core.PauseJob(jdata);
    155153    }
     
    167165
    168166    private void Job_JobStopped(object sender, EventArgs e) {
    169      if (CurrentMessage == MessageContainer.MessageType.AbortJob) {
    170        core.KillAppDomain(JobId);
     167      if (wasJobAborted) {
     168        core.KillAppDomain(JobId);
    171169      } else {
    172170        core.SendFinishedJob(JobId);
     
    176174    public JobData GetFinishedJob() {
    177175      if (Job == null) {
    178         throw new InvalidStateException("Job is null"); 
    179       }             
     176        throw new InvalidStateException("Job is null");
     177      }
    180178
    181179      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
     
    185183
    186184      if (Job.ExecutionState == HeuristicLab.Core.ExecutionState.Started) {
    187         throw new InvalidStateException("Job is still running"); 
     185        throw new InvalidStateException("Job is still running");
    188186      } else {
    189187        JobData jdata = new JobData();
     
    192190        return jdata;
    193191      }
    194     }   
    195    
     192    }
     193
    196194    public Executor() {
    197       // CurrentMessage = MessageContainer.MessageType.NoMessage; [chn] check usage of 'CurrentMessage'
    198195    }
    199196
     
    201198      if (Job != null)
    202199        DeregisterJobEvents();
    203       //Queue = null;
    204200      Job = null;
    205201    }
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/HeartbeatManager.cs

    r5105 r5137  
    2121
    2222using System;
     23using System.Collections.Generic;
    2324using System.Diagnostics;
    2425using System.Threading;
    2526using HeuristicLab.Common;
    26 using HeuristicLab.Clients.Hive.Salve;
     27using HeuristicLab.Services.Hive.Common;
    2728using HeuristicLab.Services.Hive.Common.DataTransfer;
    28 using HeuristicLab.Services.Hive.Common;
    29 using System.Collections.Generic;
    3029
    3130namespace HeuristicLab.Clients.Hive.Salve {
     
    4948    private WcfService wcfService;
    5049
    51     private bool abortThreadPending;
     50    private bool threadStopped;
    5251
    5352    ReaderWriterLockSlim heartBeatThreadIsSleepingLock = new ReaderWriterLockSlim();
    54    
     53
    5554    /// <summary>
    5655    /// Starts the Heartbeat signal.
     
    5857    public void StartHeartbeat() {
    5958      this.waitHandle = new AutoResetEvent(true);
    60       wcfService = WcfService.Instance;     
    61       abortThreadPending = false;
     59      wcfService = WcfService.Instance;
     60      threadStopped = false;
    6261      heartBeatThread = new Thread(RunHeartBeatThread);
    6362      heartBeatThread.Start();
     
    6867    /// </summary>
    6968    public void StopHeartBeat() {
    70       abortThreadPending = true;
     69      threadStopped = true;
    7170      waitHandle.Set();
    7271      heartBeatThread.Join();
    7372    }
    74    
     73
    7574    /// <summary>
    7675    /// use this method to singalize there is work to do (to avoid the waiting period if its clear that actions are required)
    7776    /// </summary>
    7877    public void AwakeHeartBeatThread() {
    79       waitHandle.Set();
     78      if (!threadStopped)
     79        waitHandle.Set();
    8080    }
    8181
    8282    private void RunHeartBeatThread() {
    83       while (!abortThreadPending) {
     83      while (!threadStopped) {
    8484        try {
    8585          lock (locker) {
     
    8787              wcfService.Connect(ConfigManager.Instance.GetClientInfo()); // Login happens automatically upon successfull connection
    8888            }
    89             if(wcfService.ConnState == NetworkEnum.WcfConnState.Connected) {
     89            if (wcfService.ConnState == NetworkEnum.WcfConnState.Connected) {
    9090              HeuristicLab.Services.Hive.Common.DataTransfer.Slave info = ConfigManager.Instance.GetClientInfo();
    9191
    92               Heartbeat heartBeatData = new Heartbeat
    93               {
     92              Heartbeat heartBeatData = new Heartbeat {
    9493                SlaveId = info.Id,
    9594                FreeCores = info.Cores.HasValue ? info.Cores.Value - ConfigManager.Instance.GetUsedCores() : 0,
    9695                FreeMemory = GetFreeMemory(),
    97                 JobProgress = ConfigManager.Instance.GetExecutionTimeOfAllJobs()               
     96                JobProgress = ConfigManager.Instance.GetExecutionTimeOfAllJobs()
    9897              };
    99                              
     98
    10099              Logger.Debug("Sending Heartbeat: " + heartBeatData);
    101               List<MessageContainer> msgs =  wcfService.SendHeartbeat(heartBeatData);
    102              
     100              List<MessageContainer> msgs = wcfService.SendHeartbeat(heartBeatData);
     101
    103102              if (msgs == null) {
    104103                Logger.Debug("Error getting response from Heartbeat");
     
    108107              Logger.Debug("Heartbeat Response received: ");
    109108              msgs.ForEach(mc => Logger.Debug(mc.Message.ToString()));
    110               msgs.ForEach(mc => MessageQueue.GetInstance().AddMessage(mc));     
     109              msgs.ForEach(mc => MessageQueue.GetInstance().AddMessage(mc));
    111110            }
    112           } // lock
     111          }
    113112        }
    114113        catch (Exception e) {
     
    117116        }
    118117        waitHandle.WaitOne(this.Interval);
    119       } // while
     118      }
    120119      waitHandle.Close();
    121       abortThreadPending = false;
    122120      Logger.Debug("Heartbeat thread stopped");
    123121    }
    124          
     122
    125123
    126124    #region Eventhandler
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/HeuristicLab.Clients.Hive.Slave-3.4.csproj

    r5105 r5137  
    7272      <SubType>Code</SubType>
    7373    </Compile>
     74    <Compile Include="ExecutorMessageContainer.cs" />
    7475    <Compile Include="MessageQueue.cs" />
    7576    <Compile Include="NetworkEnum.cs" />
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive.Slave/3.4/Logger.cs

    r5105 r5137  
    11using System;
    2 using System.Collections.Generic;
    3 using System.Linq;
    4 using System.Text;
    5 using HeuristicLab.Core;
    62
    73namespace HeuristicLab.Clients.Hive.Salve {
     
    117  /// TODO: send messages to gui
    128  /// </summary>
    13   internal static class Logger {   
    14     private static object locker = new object(); 
     9  public static class Logger {
     10    private static object locker = new object();
    1511
    1612    public static void Debug(object message) {
     
    4238        Console.WriteLine(message.ToString() + " \n Exception is :" + e.ToString());
    4339      }
    44     }   
     40    }
    4541  }
    4642}
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Services.Hive.Common/3.4/MessageContainer.cs

    r5106 r5137  
    4343      SayHello,  // Slave should say hello, because he is unknown to the server
    4444
    45       // *** events from execution engine ***
    46       JobStopped,      // job finished, submit results, unload appdomain
    47       JobPaused,       // job paused, submit results, unload appdomain
    48       JobFailed,       // job failed with an exception. submit the results, unload appdomain
    49 
    5045      // *** commands from execution engine ***
    5146      AddChildJob,     // adds a new child job for the provided jobId
Note: See TracChangeset for help on using the changeset viewer.