Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/05/11 22:35:40 (13 years ago)
Author:
cneumuel
Message:

#1233

  • refactoring of slave core
  • created JobManager, which is responsible for managing jobs without knowing anything about the service. this class is easier testable than slave core
  • lots of cleanup
  • created console test project for slave
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.HiveEngine/3.4/HiveEngine.cs

    r6219 r6357  
    3737  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
    3838  public class HiveEngine : Engine {
     39    private static object locker = new object();
    3940    private static object logLocker = new object();
    4041    private CancellationToken cancellationToken;
    41    
     42    private bool firstRun = true;
     43
    4244    [Storable]
    4345    private IOperator currentOperator;
     
    136138      OperationCollection coll;
    137139      IAtomicOperation operation;
    138       TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
    139 
    140       this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.IsLocal == false).ToList();
    141       this.AlreadyUploadedPlugins = new List<Plugin>();
    142 
    143       while (ExecutionStack.Count > 0) {
     140
     141      if (firstRun) {
     142        TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
     143        this.OnlinePlugins = ServiceLocator.Instance.CallHiveService(s => s.GetPlugins()).Where(x => x.IsLocal == false).ToList();
     144        this.AlreadyUploadedPlugins = new List<Plugin>();
     145        firstRun = false;
     146      }
     147
     148      while (executionStack.Count > 0) {
    144149        cancellationToken.ThrowIfCancellationRequested();
    145150
    146         next = ExecutionStack.Pop();
     151        next = executionStack.Pop();
     152        bool isOpCollection = next is OperationCollection;
     153        int collCount = isOpCollection ? ((OperationCollection)next).Count : 0;
     154        string opName = !isOpCollection ? ((IAtomicOperation)next).Operator.Name : "OpCollection";
     155
    147156        if (next is OperationCollection) {
    148157          coll = (OperationCollection)next;
    149           if (coll.Parallel) {
     158
     159          bool isPMOEvaluator = coll.Count > 0 && coll.First() is HeuristicLab.Core.ExecutionContext && ((HeuristicLab.Core.ExecutionContext)coll.First()).Operator.GetType().Name == "PMOEvaluator";
     160          bool isAlgorithmEvaluator = coll.Count > 0 && coll.First() is HeuristicLab.Core.ExecutionContext && ((HeuristicLab.Core.ExecutionContext)coll.First()).Operator.GetType().Name == "AlgorithmEvaluator";
     161
     162          if (coll.Parallel && isPMOEvaluator) {
     163            Task[] tasks = new Task[coll.Count];
     164            Stack<IOperation>[] stacks = new Stack<IOperation>[coll.Count];
     165            for (int i = 0; i < coll.Count; i++) {
     166              stacks[i] = new Stack<IOperation>();
     167              stacks[i].Push(coll[i]);
     168              tasks[i] = Task.Factory.StartNew(Run, stacks[i], cancellationToken);
     169            }
     170            try {
     171              Task.WaitAll(tasks);
     172            }
     173            catch (AggregateException ex) {
     174              OperationCollection remaining = new OperationCollection() { Parallel = true };
     175              for (int i = 0; i < stacks.Length; i++) {
     176                if (stacks[i].Count == 1)
     177                  remaining.Add(stacks[i].Pop());
     178                if (stacks[i].Count > 1) {
     179                  OperationCollection ops = new OperationCollection();
     180                  while (stacks[i].Count > 0)
     181                    ops.Add(stacks[i].Pop());
     182                  remaining.Add(ops);
     183                }
     184              }
     185              if (remaining.Count > 0) executionStack.Push(remaining);
     186              throw ex;
     187            }
     188          } else if (coll.Parallel) {
    150189            // clone the parent scope here and reuse it for each operation. otherwise for each job the whole scope-tree first needs to be copied and then cleaned, which causes a lot of work for the Garbage Collector
    151190            IScope parentScopeClone = (IScope)((IAtomicOperation)coll.First()).Scope.Parent.Clone();
     
    158197            }
    159198
    160             IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken);
    161             //IScope[] scopes = ExecuteLocally(jobs, parentScopeClone, cancellationToken);
     199            var experiment = CreateHiveExperiment();
     200            IScope[] scopes = ExecuteOnHive(experiment, jobs, parentScopeClone, cancellationToken);
     201            DisposeHiveExperiment(experiment);
    162202
    163203            for (int i = 0; i < coll.Count; i++) {
     
    178218          }
    179219          catch (Exception ex) {
    180             ExecutionStack.Push(operation);
     220            executionStack.Push(operation);
    181221            if (ex is OperationCanceledException) throw ex;
    182222            else throw new OperatorExecutionException(operation.Operator, ex);
    183223          }
    184           if (next != null) ExecutionStack.Push(next);
     224          if (next != null) executionStack.Push(next);
    185225
    186226          if (operation.Operator.Breakpoint) {
     
    227267    /// </summary>
    228268    /// <param name="jobs"></param>
    229     private IScope[] ExecuteOnHive(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
     269    private IScope[] ExecuteOnHive(RefreshableHiveExperiment refreshableHiveExperiment, EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
    230270      LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
    231271      IScope[] scopes = new Scope[jobs.Length];
    232272      object locker = new object();
    233273      IDictionary<Guid, int> jobIndices = new Dictionary<Guid, int>();
    234       var hiveExperiment = new HiveExperiment();
     274      var hiveExperiment = refreshableHiveExperiment.HiveExperiment;
    235275
    236276      try {
    237277        List<Guid> remainingJobIds = new List<Guid>();
    238278
    239         // create hive experiment
     279        // create upload-tasks
     280        var uploadTasks = new List<Task<Job>>();
     281        for (int i = 0; i < jobs.Length; i++) {
     282          hiveExperiment.HiveJobs.Add(new EngineHiveJob(jobs[i], parentScopeClone));
     283
     284          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
     285          IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext);
     286          if (random != null)
     287            random.Reset(random.Next());
     288        }
     289        ExperimentManagerClient.StartExperiment((e) => {
     290          LogException(e);
     291        }, refreshableHiveExperiment);
     292
     293        // do polling until experiment is finished and all jobs are downloaded
     294        while (!refreshableHiveExperiment.AllJobsFinished()) {
     295          Thread.Sleep(500);
     296          this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(hiveExperiments.Sum(x => x.HiveExperiment.ExecutionTime.TotalMilliseconds));
     297          cancellationToken.ThrowIfCancellationRequested();
     298        }
     299        LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableHiveExperiment.ToString(), refreshableHiveExperiment.HiveExperiment.ExecutionTime));
     300
     301        // get scopes
     302        int j = 0;
     303        foreach (var hiveJob in hiveExperiment.HiveJobs) {
     304          if (hiveJob.Job.State != JobState.Finished)
     305            throw new HiveEngineException("Job failed: " + hiveJob.Job.StateLog.Last().Exception);
     306
     307          var scope = ((IAtomicOperation)((EngineJob)hiveJob.ItemJob).InitialOperation).Scope;
     308          scopes[j++] = scope;
     309        }
     310        return scopes;
     311      }
     312      catch (OperationCanceledException e) {
     313        lock (locker) {
     314          if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id);
     315        }
     316        throw e;
     317      }
     318      catch (Exception e) {
     319        lock (locker) {
     320          if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id);
     321        }
     322        LogException(e);
     323        throw e;
     324      }
     325    }
     326
     327    private RefreshableHiveExperiment CreateHiveExperiment() {
     328      lock (locker) {
     329        var hiveExperiment = new HiveExperiment();
    240330        hiveExperiment.Name = "HiveEngine Run " + hiveExperiments.Count;
    241331        hiveExperiment.DateCreated = DateTime.Now;
     
    245335        refreshableHiveExperiment.IsControllable = false;
    246336        hiveExperiments.Add(refreshableHiveExperiment);
    247 
    248         // create upload-tasks
    249         var uploadTasks = new List<Task<Job>>();
    250         for (int i = 0; i < jobs.Length; i++) {
    251           hiveExperiment.HiveJobs.Add(new EngineHiveJob(jobs[i], parentScopeClone));
    252 
    253           // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
    254           IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext);
    255           if (random != null)
    256             random.Reset(random.Next());
    257         }
    258         ExperimentManagerClient.StartExperiment((e) => {
    259           LogException(e);
    260         }, refreshableHiveExperiment);
    261 
    262         // do polling until experiment is finished and all jobs are downloaded
    263         while (!refreshableHiveExperiment.AllJobsFinished()) {
    264           Thread.Sleep(500);
    265           this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(hiveExperiments.Sum(x => x.HiveExperiment.ExecutionTime.TotalMilliseconds));
    266           cancellationToken.ThrowIfCancellationRequested();
    267         }
    268         LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableHiveExperiment.ToString(), refreshableHiveExperiment.HiveExperiment.ExecutionTime));
    269 
    270         // get scopes
    271         int j = 0;
    272         foreach (var hiveJob in hiveExperiment.HiveJobs) {
    273           var scope = ((IAtomicOperation) ((EngineJob)hiveJob.ItemJob).InitialOperation).Scope;
    274           scopes[j++] = scope;
    275         }
    276         refreshableHiveExperiment.RefreshAutomatically = false;
    277         DeleteHiveExperiment(hiveExperiment.Id);
    278         ClearData(refreshableHiveExperiment);
    279         return scopes;
    280       }
    281       catch (OperationCanceledException e) {
    282         lock (locker) {
    283           if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id);
    284         }
    285         throw e;
    286       }
    287       catch (Exception e) {
    288         lock (locker) {
    289           if (jobIndices != null) DeleteHiveExperiment(hiveExperiment.Id);
    290         }
    291         LogException(e);
    292         throw e;
    293       }
     337        return refreshableHiveExperiment;
     338      }
     339    }
     340
     341    private void DisposeHiveExperiment(RefreshableHiveExperiment refreshableHiveExperiment) {
     342      refreshableHiveExperiment.RefreshAutomatically = false;
     343      DeleteHiveExperiment(refreshableHiveExperiment.HiveExperiment.Id);
     344      ClearData(refreshableHiveExperiment);
    294345    }
    295346
     
    306357      }, 5, string.Format("Could not delete jobs"));
    307358    }
    308    
     359
    309360    private List<Guid> GetResourceIds() {
    310361      return ServiceLocator.Instance.CallHiveService(service => {
Note: See TracChangeset for help on using the changeset viewer.