Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
05/16/11 00:18:48 (13 years ago)
Author:
cneumuel
Message:

#1233

  • stability improvements for HiveEngine
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.HiveEngine/3.4/HiveEngine.cs

    r6198 r6200  
    77using HeuristicLab.Common;
    88using HeuristicLab.Core;
    9 using HeuristicLab.Hive;
    109using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
    11 using HeuristicLab.PluginInfrastructure;
    1210
    1311namespace HeuristicLab.HiveEngine {
     
    1816  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
    1917  public class HiveEngine : Engine {
    20     private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections
    21     private Semaphore maxSerializedJobsInMemory = new Semaphore(4, 4); // avoid memory problems
     18    private static object logLocker = new object();
    2219    private CancellationToken cancellationToken;
    23 
     20   
    2421    [Storable]
    2522    private IOperator currentOperator;
     
    5451    }
    5552
    56     [Storable]
    57     private ItemCollection<RefreshableHiveExperiment> hiveExperiments;
     53    // [Storable] -> HiveExperiment can't be storable, so RefreshableHiveExperiment can't be stored
     54    private ItemCollection<RefreshableHiveExperiment> hiveExperiments = new ItemCollection<RefreshableHiveExperiment>();
    5855    public ItemCollection<RefreshableHiveExperiment> HiveExperiments {
    5956      get { return hiveExperiments; }
     
    7673    public HiveEngine() {
    7774      ResourceNames = "HEAL";
    78       HiveExperiments = new ItemCollection<RefreshableHiveExperiment>();
    7975      Priority = 0;
    8076    }
     
    8985      this.executionTimeOnHive = original.executionTimeOnHive;
    9086      this.useLocalPlugins = original.useLocalPlugins;
     87      this.hiveExperiments = cloner.Clone(original.hiveExperiments);
    9188    }
    9289    public override IDeepCloneable Clone(Cloner cloner) {
     
    202199      target.SubScopes.AddRange(source.SubScopes);
    203200      // TODO: validate if parent scopes match - otherwise source is invalid
    204     }
    205 
    206     // testfunction:
    207     private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
    208       IScope[] scopes = new Scope[jobs.Length];
    209       for (int i = 0; i < jobs.Length; i++) {
    210         var serialized = PersistenceUtil.Serialize(jobs[i]);
    211         var deserialized = PersistenceUtil.Deserialize<IJob>(serialized);
    212         deserialized.Start();
    213         while (deserialized.ExecutionState != ExecutionState.Stopped) {
    214           Thread.Sleep(100);
    215         }
    216         var serialized2 = PersistenceUtil.Serialize(deserialized);
    217         var deserialized2 = PersistenceUtil.Deserialize<EngineJob>(serialized2);
    218         var newScope = ((IAtomicOperation)deserialized2.InitialOperation).Scope;
    219         scopes[i] = newScope;
    220       }
    221       return scopes;
    222201    }
    223202
     
    256235            random.Reset(random.Next());
    257236        }
    258         ExperimentManagerClient.StartExperiment((e) => { throw e; }, refreshableHiveExperiment);
     237        ExperimentManagerClient.StartExperiment((e) => {
     238          LogException(e);
     239        }, refreshableHiveExperiment);
     240
    259241        // do polling until experiment is finished and all jobs are downloaded
    260242        while (!refreshableHiveExperiment.AllJobsFinished()) {
     
    273255        refreshableHiveExperiment.RefreshAutomatically = false;
    274256        DeleteHiveExperiment(hiveExperiment.Id);
     257        ClearData(refreshableHiveExperiment);
    275258        return scopes;
    276259      }
     
    290273    }
    291274
     275    private void ClearData(RefreshableHiveExperiment refreshableHiveExperiment) {
     276      var jobs = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs();
     277      foreach (var job in jobs) {
     278        job.ClearData();
     279      }
     280    }
     281
    292282    private void DeleteHiveExperiment(Guid hiveExperimentId) {
    293       TryAndRepeat(() => {
     283      ExperimentManagerClient.TryAndRepeat(() => {
    294284        ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(hiveExperimentId));
    295285      }, 5, string.Format("Could not delete jobs"));
    296286    }
    297 
    298     private static object locker = new object();
    299     private Job UploadJob(object keyValuePairObj, IScope parentScopeClone, CancellationToken cancellationToken, List<Guid> resourceIds, Guid hiveExperimentId) {
    300       var keyValuePair = (KeyValuePair<int, EngineJob>)keyValuePairObj;
    301       Job job = new Job();
    302 
    303       try {
    304         maxSerializedJobsInMemory.WaitOne();
    305         JobData jobData = new JobData();
    306         IEnumerable<Type> usedTypes;
    307 
    308         // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems
    309         lock (locker) {
    310           ((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.Parent = parentScopeClone;
    311           keyValuePair.Value.InitialOperation = (IOperation)keyValuePair.Value.InitialOperation.Clone();
    312           if (keyValuePair.Value.InitialOperation is IAtomicOperation)
    313             ((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.ClearParentScopes();
    314           jobData.Data = PersistenceUtil.Serialize(keyValuePair.Value, out usedTypes);
    315         }
    316         var neededPlugins = new List<IPluginDescription>();
    317 
    318         bool useAllLocalPlugins = true;
    319         if (useAllLocalPlugins) {
    320           // use all plugins
    321           neededPlugins.AddRange(ApplicationManager.Manager.Plugins);
    322         } else {
    323           // use only
    324           PluginUtil.CollectDeclaringPlugins(neededPlugins, usedTypes);
    325         }
    326 
    327         job.CoresNeeded = 1;
    328         job.PluginsNeededIds = ServiceLocator.Instance.CallHiveService(s => PluginUtil.GetPluginDependencies(s, this.OnlinePlugins, this.AlreadyUploadedPlugins, neededPlugins, useLocalPlugins));
    329         job.Priority = priority;
    330         job.HiveExperimentId = hiveExperimentId;
    331 
    332         try {
    333           maxConcurrentConnections.WaitOne();
    334           while (job.Id == Guid.Empty) { // repeat until success
    335             cancellationToken.ThrowIfCancellationRequested();
    336             try {
    337               job.Id = ServiceLocator.Instance.CallHiveService(s => s.AddJob(job, jobData, resourceIds));
    338             }
    339             catch (Exception e) {
    340               LogException(e);
    341               LogMessage("Repeating upload");
    342             }
    343           }
    344         }
    345         finally {
    346           maxConcurrentConnections.Release();
    347         }
    348       }
    349       finally {
    350         maxSerializedJobsInMemory.Release();
    351       }
    352       return job;
    353     }
    354 
     287   
    355288    private List<Guid> GetResourceIds() {
    356289      return ServiceLocator.Instance.CallHiveService(service => {
     
    368301    }
    369302
    370     private EngineJob DownloadJob(IDictionary<Guid, int> jobIndices, object jobIdObj, CancellationToken cancellationToken) {
    371       Guid jobId = (Guid)jobIdObj;
    372       JobData jobData = null;
    373       EngineJob engineJob = null;
    374       try {
    375         maxSerializedJobsInMemory.WaitOne();
    376         maxConcurrentConnections.WaitOne();
    377         while (jobData == null) { // repeat until success
    378           cancellationToken.ThrowIfCancellationRequested();
    379           try {
    380             jobData = ServiceLocator.Instance.CallHiveService(s => s.GetJobData(jobId));
    381           }
    382           catch (Exception e) {
    383             LogException(e);
    384             LogMessage("Repeating download");
    385           }
    386         }
    387         engineJob = PersistenceUtil.Deserialize<EngineJob>(jobData.Data);
    388         jobData = null;
    389         LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId));
    390       }
    391       finally {
    392         maxConcurrentConnections.Release();
    393         maxSerializedJobsInMemory.Release();
    394       }
    395       return engineJob;
    396     }
    397 
    398303    /// <summary>
    399304    /// Threadsafe message logging
    400305    /// </summary>
    401306    private void LogMessage(string message) {
    402       lock (Log) {
     307      lock (logLocker) {
    403308        Log.LogMessage(message);
    404309      }
     
    409314    /// </summary>
    410315    private void LogException(Exception exception) {
    411       lock (Log) {
     316      lock (logLocker) {
    412317        Log.LogException(exception);
    413318      }
    414319    }
    415320
    416     /// <summary>
    417     /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
    418     /// If repetitions is -1, it is repeated infinitely.
    419     /// </summary>
    420     private static void TryAndRepeat(Action action, int repetitions, string errorMessage) {
    421       try { action(); }
    422       catch (Exception e) {
    423         repetitions--;
    424         if (repetitions <= 0)
    425           throw new HiveEngineException(errorMessage, e);
    426         TryAndRepeat(action, repetitions, errorMessage);
    427       }
    428     }
    429   }
    430 
    431   public static class EnumerableExtensions {
    432     public static TimeSpan Sum(this IEnumerable<TimeSpan> times) {
    433       return TimeSpan.FromMilliseconds(times.Select(e => e.TotalMilliseconds).Sum());
    434     }
     321    // testfunction:
     322    //private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
     323    //  IScope[] scopes = new Scope[jobs.Length];
     324    //  for (int i = 0; i < jobs.Length; i++) {
     325    //    var serialized = PersistenceUtil.Serialize(jobs[i]);
     326    //    var deserialized = PersistenceUtil.Deserialize<IJob>(serialized);
     327    //    deserialized.Start();
     328    //    while (deserialized.ExecutionState != ExecutionState.Stopped) {
     329    //      Thread.Sleep(100);
     330    //    }
     331    //    var serialized2 = PersistenceUtil.Serialize(deserialized);
     332    //    var deserialized2 = PersistenceUtil.Deserialize<EngineJob>(serialized2);
     333    //    var newScope = ((IAtomicOperation)deserialized2.InitialOperation).Scope;
     334    //    scopes[i] = newScope;
     335    //  }
     336    //  return scopes;
     337    //}
    435338  }
    436339}
Note: See TracChangeset for help on using the changeset viewer.