Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/19/11 23:21:21 (13 years ago)
Author:
cneumuel
Message:

#1233

  • stability improvements for HiveExperiment and HiveEngine
  • parallelized upload of jobs
  • enabled cancellation of job upload
  • reduced the amount of double-assignment of jobs by an additional check in HeartbeatManager
  • tried to tackle the amount of deadlocks by automatically rerunning transactions
  • some fixes
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/HiveClient.cs

    r6426 r6444  
    2727using System.Security.Cryptography;
    2828using System.Threading;
     29using System.Threading.Tasks;
    2930using HeuristicLab.Common;
    3031using HeuristicLab.Core;
     
    104105
    105106    #region Store
    106     public static void Store(IHiveItem item) {
     107    public static void Store(IHiveItem item, CancellationToken cancellationToken) {
    107108      if (item.Id == Guid.Empty) {
    108109        if (item is RefreshableHiveExperiment) {
    109           HiveClient.Instance.UploadExperiment((RefreshableHiveExperiment)item);
     110          HiveClient.Instance.UploadExperiment((RefreshableHiveExperiment)item, cancellationToken);
    110111        }
    111112      } else {
     
    114115      }
    115116    }
    116     public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item) {
     117    public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken) {
    117118      var call = new Func<Exception>(delegate() {
    118119        try {
    119           Store(item);
     120          Store(item, cancellationToken);
    120121        }
    121122        catch (Exception ex) {
     
    159160    #endregion
    160161
    161     public static void StartExperiment(Action<Exception> exceptionCallback, RefreshableHiveExperiment refreshableHiveExperiment) {
     162    public static void StartExperiment(Action<Exception> exceptionCallback, RefreshableHiveExperiment refreshableHiveExperiment, CancellationToken cancellationToken) {
    162163      HiveClient.StoreAsync(
    163164        new Action<Exception>((Exception ex) => {
    164165          refreshableHiveExperiment.HiveExperiment.ExecutionState = ExecutionState.Prepared;
    165166          exceptionCallback(ex);
    166         }), refreshableHiveExperiment);
     167        }), refreshableHiveExperiment, cancellationToken);
    167168      refreshableHiveExperiment.HiveExperiment.ExecutionState = ExecutionState.Started;
    168169    }
     
    189190
    190191    #region Upload Experiment
    191     private void UploadExperiment(RefreshableHiveExperiment refreshableHiveExperiment) {
     192    private Semaphore jobUploadSemaphore = new Semaphore(4, 4); // todo: take magic number into config
     193    private static object jobCountLocker = new object();
     194    private void UploadExperiment(RefreshableHiveExperiment refreshableHiveExperiment, CancellationToken cancellationToken) {
    192195      try {
    193196        refreshableHiveExperiment.HiveExperiment.Progress = new Progress("Connecting to server...");
    194197        refreshableHiveExperiment.HiveExperiment.IsProgressing = true;
    195         ServiceLocator.Instance.CallHiveService(service => {
    196           IEnumerable<string> resourceNames = ToResourceNameList(refreshableHiveExperiment.HiveExperiment.ResourceNames);
    197           var resourceIds = new List<Guid>();
    198           foreach (var resourceName in resourceNames) {
    199             Guid resourceId = service.GetResourceId(resourceName);
    200             if (resourceId == Guid.Empty) {
    201               throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
    202             }
    203             resourceIds.Add(resourceId);
     198
     199        IEnumerable<string> resourceNames = ToResourceNameList(refreshableHiveExperiment.HiveExperiment.ResourceNames);
     200        var resourceIds = new List<Guid>();
     201        foreach (var resourceName in resourceNames) {
     202          Guid resourceId = ServiceLocator.Instance.CallHiveService((s) => s.GetResourceId(resourceName));
     203          if (resourceId == Guid.Empty) {
     204            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
    204205          }
    205 
    206           foreach (OptimizerHiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs.OfType<OptimizerHiveJob>()) {
    207             hiveJob.SetIndexInParentOptimizerList(null);
    208           }
    209 
    210           // upload HiveExperiment
    211           refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading HiveExperiment...";
    212           refreshableHiveExperiment.HiveExperiment.Id = service.AddHiveExperiment(refreshableHiveExperiment.HiveExperiment);
    213 
    214           int totalJobCount = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs().Count();
    215           int jobCount = 0;
    216 
    217           // upload plugins
    218           refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading plugins...";
    219           this.OnlinePlugins = service.GetPlugins();
    220           this.AlreadyUploadedPlugins = new List<Plugin>();
    221           Plugin configFilePlugin = UploadConfigurationFile(service, onlinePlugins);
    222           this.alreadyUploadedPlugins.Add(configFilePlugin);
    223 
    224           // upload jobs
    225           refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading jobs...";
    226 
    227           foreach (HiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs) {
    228             UploadJobWithChildren(refreshableHiveExperiment.HiveExperiment.Progress, service, hiveJob, null, resourceIds, ref jobCount, totalJobCount, configFilePlugin.Id, refreshableHiveExperiment.HiveExperiment.Id, refreshableHiveExperiment.Log, refreshableHiveExperiment.HiveExperiment.IsPrivileged);
    229           }
    230 
    231           if (refreshableHiveExperiment.RefreshAutomatically) refreshableHiveExperiment.StartResultPolling();
    232         });
     206          resourceIds.Add(resourceId);
     207        }
     208
     209        foreach (OptimizerHiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs.OfType<OptimizerHiveJob>()) {
     210          hiveJob.SetIndexInParentOptimizerList(null);
     211        }
     212
     213        // upload HiveExperiment
     214        refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading HiveExperiment...";
     215        refreshableHiveExperiment.HiveExperiment.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddHiveExperiment(refreshableHiveExperiment.HiveExperiment));
     216        cancellationToken.ThrowIfCancellationRequested();
     217
     218        int totalJobCount = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs().Count();
     219        int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library
     220        cancellationToken.ThrowIfCancellationRequested();
     221
     222        // upload plugins
     223        refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading plugins...";
     224        this.OnlinePlugins = ServiceLocator.Instance.CallHiveService((s) => s.GetPlugins());
     225        this.AlreadyUploadedPlugins = new List<Plugin>();
     226        Plugin configFilePlugin = ServiceLocator.Instance.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins));
     227        this.alreadyUploadedPlugins.Add(configFilePlugin);
     228        cancellationToken.ThrowIfCancellationRequested();
     229
     230        if (refreshableHiveExperiment.RefreshAutomatically) refreshableHiveExperiment.StartResultPolling();
     231
     232        // upload jobs
     233        refreshableHiveExperiment.HiveExperiment.Progress.Status = "Uploading jobs...";
     234
     235        var tasks = new List<Task>();
     236        foreach (HiveJob hiveJob in refreshableHiveExperiment.HiveExperiment.HiveJobs) {
     237          tasks.Add(Task.Factory.StartNew((hj) => {
     238            UploadJobWithChildren(refreshableHiveExperiment.HiveExperiment.Progress, (HiveJob)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableHiveExperiment.HiveExperiment.Id, refreshableHiveExperiment.Log, refreshableHiveExperiment.HiveExperiment.IsPrivileged, cancellationToken);
     239          }, hiveJob)
     240          .ContinueWith((x) => refreshableHiveExperiment.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted));
     241        }
     242        Task.WaitAll(tasks.ToArray());
    233243      }
    234244      finally {
     
    267277    /// Uploads the given job and all its child-jobs while setting the proper parentJobId values for the childs
    268278    /// </summary>
    269     /// <param name="service"></param>
    270     /// <param name="hiveJob"></param>
    271279    /// <param name="parentHiveJob">shall be null if its the root job</param>
    272     /// <param name="groups"></param>
    273     private void UploadJobWithChildren(IProgress progress, IHiveService service, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<Guid> groups, ref int jobCount, int totalJobCount, Guid configPluginId, Guid hiveExperimentId, ILog log, bool isPrivileged) {
    274       jobCount++;
    275       progress.Status = string.Format("Serializing job {0} of {1}", jobCount, totalJobCount);
    276       JobData jobData;
    277       List<IPluginDescription> plugins;
    278 
    279       if (hiveJob.ItemJob.ComputeInParallel && (hiveJob.ItemJob.Item is Optimization.Experiment || hiveJob.ItemJob.Item is Optimization.BatchRun)) {
    280         hiveJob.Job.IsParentJob = true;
    281         hiveJob.Job.FinishWhenChildJobsFinished = true;
    282         jobData = hiveJob.GetAsJobData(true, out plugins);
    283       } else {
    284         hiveJob.Job.IsParentJob = false;
    285         hiveJob.Job.FinishWhenChildJobsFinished = false;
    286         jobData = hiveJob.GetAsJobData(false, out plugins);
    287       }
    288 
    289       TryAndRepeat(() => {
    290         hiveJob.Job.PluginsNeededIds = PluginUtil.GetPluginDependencies(service, this.onlinePlugins, this.alreadyUploadedPlugins, plugins);
    291       }, -1, "Failed to upload plugins");
    292       hiveJob.Job.PluginsNeededIds.Add(configPluginId);
    293       hiveJob.Job.HiveExperimentId = hiveExperimentId;
    294       hiveJob.Job.IsPrivileged = isPrivileged;
    295 
    296       progress.Status = string.Format("Uploading job {0} of {1} ({2} kb, {3} objects)", jobCount, totalJobCount, jobData.Data.Count() / 1024, hiveJob.ItemJob.GetObjectGraphObjects().Count());
    297       progress.ProgressValue = (double)jobCount / totalJobCount;
    298 
    299       log.LogMessage(progress.Status);
    300       TryAndRepeat(() => {
    301         if (parentHiveJob != null) {
    302           hiveJob.Job.Id = service.AddChildJob(parentHiveJob.Job.Id, hiveJob.Job, jobData);
    303         } else {
    304           hiveJob.Job.Id = service.AddJob(hiveJob.Job, jobData, groups.ToList());
    305         }
    306       }, -1, "Failed to add job", log);
    307 
    308       foreach (HiveJob child in hiveJob.ChildHiveJobs) {
    309         UploadJobWithChildren(progress, service, child, hiveJob, groups, ref jobCount, totalJobCount, configPluginId, hiveExperimentId, log, isPrivileged);
     280    private void UploadJobWithChildren(IProgress progress, HiveJob hiveJob, HiveJob parentHiveJob, IEnumerable<Guid> groups, int[] jobCount, int totalJobCount, Guid configPluginId, Guid hiveExperimentId, ILog log, bool isPrivileged, CancellationToken cancellationToken) {
     281      jobUploadSemaphore.WaitOne();
     282      try {
     283        cancellationToken.ThrowIfCancellationRequested();
     284        lock (jobCountLocker) {
     285          jobCount[0]++;
     286        }
     287        JobData jobData = null;
     288        List<IPluginDescription> plugins = null;
     289
     290        TryAndRepeat(() => { // workaround for persistence bug (thread-safe access to bitmaps) - remove later
     291          if (hiveJob.ItemJob.ComputeInParallel && (hiveJob.ItemJob.Item is Optimization.Experiment || hiveJob.ItemJob.Item is Optimization.BatchRun)) {
     292            hiveJob.Job.IsParentJob = true;
     293            hiveJob.Job.FinishWhenChildJobsFinished = true;
     294            jobData = hiveJob.GetAsJobData(true, out plugins);
     295          } else {
     296            hiveJob.Job.IsParentJob = false;
     297            hiveJob.Job.FinishWhenChildJobsFinished = false;
     298            jobData = hiveJob.GetAsJobData(false, out plugins);
     299          }
     300        }, 30, "Could not serialize job");
     301        cancellationToken.ThrowIfCancellationRequested();
     302
     303        TryAndRepeat(() => {
     304          if (!cancellationToken.IsCancellationRequested) {
     305            ServiceLocator.Instance.CallHiveService((s) => hiveJob.Job.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));
     306          }
     307        }, -1, "Failed to upload plugins");
     308        cancellationToken.ThrowIfCancellationRequested();
     309        hiveJob.Job.PluginsNeededIds.Add(configPluginId);
     310        hiveJob.Job.HiveExperimentId = hiveExperimentId;
     311        hiveJob.Job.IsPrivileged = isPrivileged;
     312
     313        log.LogMessage(string.Format("Uploading job ({0} kb, {1} objects)", jobData.Data.Count() / 1024, hiveJob.ItemJob.GetObjectGraphObjects().Count()));
     314        TryAndRepeat(() => {
     315          if (!cancellationToken.IsCancellationRequested) {
     316            if (parentHiveJob != null) {
     317              hiveJob.Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddChildJob(parentHiveJob.Job.Id, hiveJob.Job, jobData));
     318            } else {
     319              hiveJob.Job.Id = ServiceLocator.Instance.CallHiveService((s) => s.AddJob(hiveJob.Job, jobData, groups.ToList()));
     320            }
     321          }
     322        }, 50, "Failed to add job", log);
     323        cancellationToken.ThrowIfCancellationRequested();
     324
     325        lock (jobCountLocker) {
     326          progress.ProgressValue = (double)jobCount[0] / totalJobCount;
     327          progress.Status = string.Format("Uploaded job ({0} of {1})", jobCount[0], totalJobCount);
     328        }
     329
     330        var tasks = new List<Task>();
     331        foreach (HiveJob child in hiveJob.ChildHiveJobs) {
     332          tasks.Add(Task.Factory.StartNew((tuple) => {
     333            var arguments = (Tuple<HiveJob, HiveJob>)tuple;
     334            UploadJobWithChildren(progress, arguments.Item1, arguments.Item2, groups, jobCount, totalJobCount, configPluginId, hiveExperimentId, log, isPrivileged, cancellationToken);
     335          }, new Tuple<HiveJob, HiveJob>(child, hiveJob ))
     336          .ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted));
     337        }
     338        Task.WaitAll(tasks.ToArray());
     339      }
     340      finally {
     341        jobUploadSemaphore.Release();
    310342      }
    311343    }
Note: See TracChangeset for help on using the changeset viewer.