Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
02/01/11 15:51:11 (13 years ago)
Author:
cneumuel
Message:

#1233

  • changed the workflow of aquireing a new job from server.
    • if a job is available for calculation, the slave receives the jobId already with the heartbeats. The job is then exclusively assigned to this slave.
  • extended the metainfo for a slave by OperatingSystem and CpuArchitecture
  • enhanced the way plugin-dependencies are discovered by using the types used by XmlGenerator. Now only mimimum amount of plugins are transferred.
  • selection of waiting jobs now consideres assigned slave-group
  • more unit tests for service
  • added unit tests for experiment manager
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/ExperimentManager/HiveExperimentClient.cs

    r5402 r5404  
    114114    }
    115115
    116     private List<Guid> pluginsNeededIds;
    117     public List<Guid> PluginsNeededIds {
    118       get { return pluginsNeededIds; }
    119       set { pluginsNeededIds = value; }
     116    private IEnumerable<Plugin> onlinePlugins;
     117    public IEnumerable<Plugin> OnlinePlugins {
     118      get { return onlinePlugins; }
     119      set { onlinePlugins = value; }
     120    }
     121
     122    private List<Plugin> alreadyUploadedPlugins;
     123    public List<Plugin> AlreadyUploadedPlugins {
     124      get { return alreadyUploadedPlugins; }
     125      set { alreadyUploadedPlugins = value; }
    120126    }
    121127
     
    247253
    248254          this.progress.Status = "Uploading plugins...";
    249           this.PluginsNeededIds = GetPluginsNeededIds(this.useLocalPlugins);
     255          this.OnlinePlugins = service.Obj.GetPlugins();
     256          this.AlreadyUploadedPlugins = new List<Plugin>();
    250257
    251258          this.progress.Status = "Uploading jobs...";
     
    269276        IsProgressing = false;
    270277      }
    271     }
    272 
    273     /// <summary>
    274     /// Gets the Ids of all plugins needed for executing the job.
    275     /// All loaded plugins are assumed to be necessary.
    276     /// If a plugin with the same name and version is already online, it is used. Otherwise the local plugin is uploaded.
    277     /// If useLocalPlugins is true, all local plugins are uploaded regardless of the existence of the same plugin online.
    278     /// </summary>
    279     public static List<Guid> GetPluginsNeededIds(bool useLocalPlugins) {
    280       IEnumerable<IPluginDescription> localPlugins = ApplicationManager.Manager.Plugins;
    281       List<Guid> pluginsNeededIds = new List<Guid>();
    282 
    283       using (var service = ServiceLocator.Instance.GetService()) {
    284         IEnumerable<Plugin> onlinePlugins = service.Obj.GetPlugins();
    285 
    286         foreach (IPluginDescription localPlugin in localPlugins) {
    287           Plugin found = onlinePlugins.Where(onlinePlugin => onlinePlugin.Name == localPlugin.Name && onlinePlugin.Version == localPlugin.Version).SingleOrDefault();
    288           if (!useLocalPlugins && found != null) {
    289             // plugin is available online; reuse
    290             pluginsNeededIds.Add(found.Id);
    291           } else {
    292             // upload the plugin
    293             Plugin p = new Plugin() { Name = localPlugin.Name, Version = localPlugin.Version, IsLocal = useLocalPlugins };
    294             List<PluginData> pluginDatas = new List<PluginData>();
    295 
    296             foreach (IPluginFile pf in localPlugin.Files) {
    297               PluginData pluginData = new PluginData();
    298 
    299               pluginData.Data = File.ReadAllBytes(pf.Name);
    300               pluginDatas.Add(pluginData);
    301             }
    302             pluginsNeededIds.Add(service.Obj.AddPlugin(p, pluginDatas));
    303           }
    304         }
    305       }
    306       return pluginsNeededIds;
    307278    }
    308279
     
    319290      this.progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount);
    320291      JobData jobData;
     292      List<IPluginDescription> plugins;
    321293      if (hiveJob.OptimizerJob.ComputeInParallel &&
    322294        (hiveJob.OptimizerJob.Optimizer is Optimization.Experiment || hiveJob.OptimizerJob.Optimizer is Optimization.BatchRun)) {
    323295        hiveJob.Job.JobState = JobState.WaitingForChildJobs;
    324296        hiveJob.OptimizerJob.CollectChildJobs = false; // don't collect child-jobs on slaves
    325         jobData = hiveJob.GetAsJobData(true);
     297        jobData = hiveJob.GetAsJobData(true, out plugins);
    326298      } else {
    327         jobData = hiveJob.GetAsJobData(false);
    328       }
    329 
    330       hiveJob.Job.PluginsNeededIds = this.PluginsNeededIds;
     299        jobData = hiveJob.GetAsJobData(false, out plugins);
     300      }
     301
     302      hiveJob.Job.PluginsNeededIds = GetPluginDependencies(service, onlinePlugins, alreadyUploadedPlugins, plugins, useLocalPlugins);
    331303
    332304      this.progress.Status = string.Format("Uploading job {0} of {1} ({2} kb)", jobCount, totalJobCount, jobData.Data.Count() / 1024);
     
    347319      }
    348320    }
    349 
     321   
    350322    /// <summary>
    351323    /// Converts a string which can contain Ids separated by ';' to a enumerable
     
    369341    #endregion
    370342
    371     public void StartResultPolling() {
    372       if (!jobResultPoller.IsPolling) {
    373         jobResultPoller.Start();
    374       } else {
    375         throw new JobResultPollingException("Result polling already running");
    376       }
    377     }
    378 
    379     public void StopResultPolling() {
    380       if (jobResultPoller.IsPolling) {
    381         jobResultPoller.Stop();
    382       } else {
    383         throw new JobResultPollingException("Result polling not running");
    384       }
    385     }
    386343
    387344    #region HiveJob Events
     
    483440
    484441    #region JobResultPoller Events
     442
     443    public void StartResultPolling() {
     444      if (!jobResultPoller.IsPolling) {
     445        jobResultPoller.Start();
     446      } else {
     447        throw new JobResultPollingException("Result polling already running");
     448      }
     449    }
     450
     451    public void StopResultPolling() {
     452      if (jobResultPoller.IsPolling) {
     453        jobResultPoller.Stop();
     454      } else {
     455        throw new JobResultPollingException("Result polling not running");
     456      }
     457    }
     458
    485459    private void RegisterResultPollingEvents() {
    486460      jobResultPoller.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
     
    497471      jobResultPoller.IsPollingChanged -= new EventHandler(jobResultPoller_IsPollingChanged);
    498472    }
    499     void jobResultPoller_IsPollingChanged(object sender, EventArgs e) {
     473    private void jobResultPoller_IsPollingChanged(object sender, EventArgs e) {
    500474      this.IsPollingResults = jobResultPoller.IsPolling;
    501475    }
    502     void jobResultPoller_PollingFinished(object sender, EventArgs e) {
     476    private void jobResultPoller_PollingFinished(object sender, EventArgs e) {
    503477      LogMessage("Polling results finished");
    504478    }
    505     void jobResultPoller_PollingStarted(object sender, EventArgs e) {
     479    private void jobResultPoller_PollingStarted(object sender, EventArgs e) {
    506480      LogMessage("Polling results started");
    507481    }
    508     void jobResultPoller_JobResultReceived(object sender, EventArgs<IEnumerable<LightweightJob>> e) {
     482    private void jobResultPoller_JobResultReceived(object sender, EventArgs<IEnumerable<LightweightJob>> e) {
    509483      foreach (LightweightJob lightweightJob in e.Value) {
    510484        HiveJob hj = hiveJob.GetHiveJobByJobId(lightweightJob.Id);
     
    543517    }
    544518
    545     void jobResultPoller_ExceptionOccured(object sender, EventArgs<Exception> e) {
     519    private void jobResultPoller_ExceptionOccured(object sender, EventArgs<Exception> e) {
    546520      OnExceptionOccured(e.Value);
    547521    }
     
    578552    #endregion
    579553
     554    #region Job Loading
    580555    /// <summary>
    581556    /// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem
     
    674649      }
    675650    }
    676    
     651    #endregion
     652
     653    #region Plugin Management
     654    /// <summary>
     655    /// Checks if plugins are available on Hive Server. If not they are uploaded. Ids are returned.
     656    /// </summary>
     657    /// <param name="service">An active service-proxy</param>
     658    /// <param name="onlinePlugins">List of plugins which are available online</param>
     659    /// <param name="alreadyUploadedPlugins">List of plugins which have been uploaded from this HiveExperiment</param>
     660    /// <param name="neededPlugins">List of plugins which need to be uploaded</param>
     661    /// <param name="useLocalPlugins">If true, the plugins which are already online are ignored. All local plugins are uploaded, but only once.</param>
     662    /// <returns></returns>
     663    private static List<Guid> GetPluginDependencies(IHiveService service, IEnumerable<Plugin> onlinePlugins, List<Plugin> alreadyUploadedPlugins, IEnumerable<IPluginDescription> neededPlugins, bool useLocalPlugins) {
     664      var pluginIds = new List<Guid>();
     665      foreach (var neededPlugin in neededPlugins) {
     666        Plugin foundPlugin = alreadyUploadedPlugins.SingleOrDefault(p => p.Name == neededPlugin.Name && p.Version == neededPlugin.Version);
     667        if (foundPlugin == null) {
     668          foundPlugin = onlinePlugins.SingleOrDefault(p => p.Name == neededPlugin.Name && p.Version == neededPlugin.Version);
     669          if (useLocalPlugins || foundPlugin == null) {
     670            Plugin p = CreatePlugin(neededPlugin, useLocalPlugins);
     671            List<PluginData> pd = CreatePluginDatas(neededPlugin);
     672            p.Id = service.AddPlugin(p, pd);
     673            alreadyUploadedPlugins.Add(p);
     674          } else {
     675            pluginIds.Add(foundPlugin.Id);
     676          }
     677        } else {
     678          pluginIds.Add(foundPlugin.Id);
     679        }
     680      }
     681      return pluginIds;
     682    }
     683
     684    private static Plugin CreatePlugin(IPluginDescription plugin, bool useLocalPlugins) {
     685      return new Plugin() { Name = plugin.Name, Version = plugin.Version, IsLocal = useLocalPlugins };
     686    }
     687
     688    private static List<PluginData> CreatePluginDatas(IPluginDescription plugin) {
     689      List<PluginData> pluginDatas = new List<PluginData>();
     690
     691      foreach (IPluginFile pf in plugin.Files) {
     692        PluginData pluginData = new PluginData();
     693
     694        pluginData.Data = File.ReadAllBytes(pf.Name);
     695        pluginData.FileName = Path.GetFileName(pf.Name);
     696        pluginDatas.Add(pluginData);
     697      }
     698      return pluginDatas;
     699    }
     700
     701    /// <summary>
     702    /// Gets the Ids of all plugins needed for executing the job.
     703    /// All loaded plugins are assumed to be necessary.
     704    /// If a plugin with the same name and version is already online, it is used. Otherwise the local plugin is uploaded.
     705    /// If useLocalPlugins is true, all local plugins are uploaded regardless of the existence of the same plugin online.
     706    /// </summary>
     707    //public static List<Guid> GetPluginsNeededIds(bool useLocalPlugins) {
     708    //  IEnumerable<IPluginDescription> localPlugins = ApplicationManager.Manager.Plugins;
     709    //  List<Guid> pluginsNeededIds = new List<Guid>();
     710
     711    //  using (var service = ServiceLocator.Instance.GetService()) {
     712    //    IEnumerable<Plugin> onlinePlugins = service.Obj.GetPlugins();
     713
     714    //    foreach (IPluginDescription localPlugin in localPlugins) {
     715    //      Plugin found = onlinePlugins.Where(onlinePlugin => onlinePlugin.Name == localPlugin.Name && onlinePlugin.Version == localPlugin.Version).SingleOrDefault();
     716    //      if (!useLocalPlugins && found != null) {
     717    //        // plugin is available online; reuse
     718    //        pluginsNeededIds.Add(found.Id);
     719    //      } else {
     720    //        // upload the plugin
     721    //        Plugin p = new Plugin() { Name = localPlugin.Name, Version = localPlugin.Version, IsLocal = useLocalPlugins };
     722    //        List<PluginData> pluginDatas = new List<PluginData>();
     723
     724    //        foreach (IPluginFile pf in localPlugin.Files) {
     725    //          PluginData pluginData = new PluginData();
     726
     727    //          pluginData.Data = File.ReadAllBytes(pf.Name);
     728    //          pluginDatas.Add(pluginData);
     729    //        }
     730    //        pluginsNeededIds.Add(service.Obj.AddPlugin(p, pluginDatas));
     731    //      }
     732    //    }
     733    //  }
     734    //  return pluginsNeededIds;
     735    //}
     736    #endregion
    677737  }
    678738}
Note: See TracChangeset for help on using the changeset viewer.