Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
01/05/11 02:05:05 (13 years ago)
Author:
cneumuel
Message:

#1260

  • changed HiveEngine to be compatible with recent changes of engine
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs

    r5153 r5213  
    1414using System.Threading;
    1515using HeuristicLab.Random;
     16using System.Threading.Tasks;
    1617
    1718namespace HeuristicLab.HiveEngine {
     
    2223  [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")]
    2324  public class HiveEngine : Engine {
     25    private CancellationToken cancellationToken;
     26
    2427    [Storable]
    2528    private IOperator currentOperator;
     
    4447    #endregion
    4548
    46     protected override void ProcessNextOperation() {
    47       currentOperator = null;
    48       IOperation next = ExecutionStack.Pop();
    49       OperationCollection coll = next as OperationCollection;
     49    protected override void Run(CancellationToken cancellationToken) {
     50      this.cancellationToken = cancellationToken;
     51      Run(ExecutionStack);
     52    }
    5053
    51       while (coll != null) {
    52         if (coll.Parallel) {
    53           IDictionary<IOperation, OperationJob> jobs = new Dictionary<IOperation, OperationJob>();
    54           foreach (IOperation op in coll) {
    55             jobs.Add(op, new OperationJob(op));
     54    private void Run(object state) {
     55      Stack<IOperation> executionStack = (Stack<IOperation>)state;
     56      IOperation next;
     57      OperationCollection coll;
     58      IAtomicOperation operation;
     59
     60      while (ExecutionStack.Count > 0) {
     61        cancellationToken.ThrowIfCancellationRequested();
     62
     63        next = ExecutionStack.Pop();
     64        if (next is OperationCollection) {
     65          coll = (OperationCollection)next;
     66          if (coll.Parallel) {
     67            IDictionary<IOperation, OperationJob> jobs = new Dictionary<IOperation, OperationJob>();
     68            foreach (IOperation op in coll) {
     69              jobs.Add(op, new OperationJob(op));
     70            }
     71
     72            ExecuteOnHive(jobs);
     73
     74            foreach (var kvp in jobs) {
     75              if (kvp.Key is IAtomicOperation) {
     76                ReIntegrateScope(kvp.Value.Operation as IAtomicOperation, kvp.Key as IAtomicOperation);
     77              } else if (kvp.Key is OperationCollection) {
     78                OperationCollection ocoll = (OperationCollection)kvp.Key;
     79                for (int i = ocoll.Count - 1; i >= 0; i--)
     80                  if (ocoll[i] != null) executionStack.Push(ocoll[i]);
     81              }
     82            }
     83          } else {
     84            for (int i = coll.Count - 1; i >= 0; i--)
     85              if (coll[i] != null) executionStack.Push(coll[i]);
    5686          }
     87        } else if (next is IAtomicOperation) {
     88          operation = (IAtomicOperation)next;
     89          try {
     90            next = operation.Operator.Execute((IExecutionContext)operation, cancellationToken);
     91          }
     92          catch (Exception ex) {
     93            ExecutionStack.Push(operation);
     94            if (ex is OperationCanceledException) throw ex;
     95            else throw new OperatorExecutionException(operation.Operator, ex);
     96          }
     97          if (next != null) ExecutionStack.Push(next);
    5798
    58           ExecuteOnHive(jobs);
    59 
    60           foreach (var kvp in jobs) {
    61             if (kvp.Key is IAtomicOperation) {
    62               ReIntegrateScope(kvp.Value.Operation as IAtomicOperation, kvp.Key as IAtomicOperation);
    63             } else if (kvp.Key is OperationCollection) {
    64               // todo
    65             }
     99          if (operation.Operator.Breakpoint) {
     100            Log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
     101            Pause();
    66102          }
    67         } else {
    68           for (int i = coll.Count - 1; i >= 0; i--)
    69             ExecutionStack.Push(coll[i]);
    70         }
    71         next = ExecutionStack.Count > 0 ? ExecutionStack.Pop() : null;
    72         coll = next as OperationCollection;
    73       }
    74 
    75       IAtomicOperation operation = next as IAtomicOperation;
    76       if (operation != null) {
    77         try {
    78           currentOperator = operation.Operator;
    79           ExecutionStack.Push(operation.Operator.Execute((IExecutionContext)operation));
    80         }
    81         catch (Exception ex) {
    82           ExecutionStack.Push(operation);
    83           OnExceptionOccurred(new OperatorExecutionException(operation.Operator, ex));
    84           Pause();
    85         }
    86         if (operation.Operator.Breakpoint) {
    87           Log.LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));
    88           Pause();
    89103        }
    90104      }
     
    123137    private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict) {
    124138      Log.LogMessage(string.Format("Executing {0} operations on the hive.", jobDict.Count));
    125       IDictionary<Guid, IOperation> jobIds = new Dictionary<Guid, IOperation>();
    126       JobResultList results;
     139      try {
     140        IDictionary<Guid, IOperation> jobIds = new Dictionary<Guid, IOperation>();
     141        JobResultList results;
    127142
    128       using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
    129         List<JobDto> jobs = new List<JobDto>();
    130         foreach (var kvp in jobDict) {
    131           // shuffle random variable to avoid the same random sequence in each operation
    132           IRandom random = FindRandomParameter(kvp.Key as IExecutionContext);
    133           if (random != null)
    134             random.Reset(random.Next());
     143        using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     144          List<JobDto> jobs = new List<JobDto>();
     145          foreach (var kvp in jobDict) {
     146            // shuffle random variable to avoid the same random sequence in each operation
     147            IRandom random = FindRandomParameter(kvp.Key as IExecutionContext);
     148            if (random != null)
     149              random.Reset(random.Next());
    135150
    136           var groups = ResourceIds.Split(';');
    137           SerializedJob serializedJob = new SerializedJob();
    138           serializedJob.SerializedJobData = SerializedJob.Serialize(kvp.Value);
    139           serializedJob.JobInfo = new JobDto();
    140           serializedJob.JobInfo.State = JobState.Offline;
    141           serializedJob.JobInfo.CoresNeeded = 1;
    142           serializedJob.JobInfo.PluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
    143           ResponseObject<JobDto> response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
    144           jobs.Add(response.Obj);
    145           jobIds.Add(response.Obj.Id, kvp.Key);
    146         }
    147         results = service.Obj.GetJobResults(jobIds.Keys).Obj;
    148       }
    149 
    150       while (!results.All(
    151           x => x.State == JobState.Finished ||
    152           x.State == JobState.Failed ||
    153           x.State == JobState.Aborted)) {
    154         Thread.Sleep(5000);
    155         using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     151            var groups = ResourceIds.Split(';');
     152            SerializedJob serializedJob = new SerializedJob();
     153            serializedJob.SerializedJobData = SerializedJob.Serialize(kvp.Value);
     154            serializedJob.JobInfo = new JobDto();
     155            serializedJob.JobInfo.State = JobState.Offline;
     156            serializedJob.JobInfo.CoresNeeded = 1;
     157            serializedJob.JobInfo.PluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
     158            ResponseObject<JobDto> response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
     159            jobs.Add(response.Obj);
     160            jobIds.Add(response.Obj.Id, kvp.Key);
     161          }
    156162          results = service.Obj.GetJobResults(jobIds.Keys).Obj;
    157163        }
     164
     165        while (!results.All(
     166            x => x.State == JobState.Finished ||
     167            x.State == JobState.Failed ||
     168            x.State == JobState.Aborted)) {
     169          Thread.Sleep(5000);
     170          using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
     171            results = service.Obj.GetJobResults(jobIds.Keys).Obj;
     172          }
     173        }
     174
     175        // all finished
     176        using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
     177          foreach (Guid jobId in jobIds.Keys) {
     178            SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
     179            OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
     180            jobDict[jobIds[jobId]] = operationJob;
     181          }
     182        }
     183
     184        // delete jobs
     185        using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
     186          foreach (Guid jobId in jobIds.Keys) {
     187            service.Obj.DeleteJob(jobId);
     188          }
     189        }
     190
     191        Log.LogMessage(string.Format("Operations on the hive finished.", jobDict.Count));
    158192      }
    159 
    160       // all finished
    161       using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
    162         foreach (Guid jobId in jobIds.Keys) {
    163           SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj;
    164           OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData);
    165           jobDict[jobIds[jobId]] = operationJob;
    166         }
     193      catch (Exception e) {
     194        Log.LogException(e);
     195        throw e;
    167196      }
    168 
    169       // delete jobs
    170       using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
    171         foreach (Guid jobId in jobIds.Keys) {
    172           service.Obj.DeleteJob(jobId);
    173         }
    174       }
    175 
    176       Log.LogMessage(string.Format("Operations on the hive finished.", jobDict.Count));
    177197    }
    178 
    179     public override void Pause() {
    180       base.Pause();
    181       if (currentOperator != null) currentOperator.Abort();
    182     }
    183     public override void Stop() {
    184       base.Stop();
    185       if (currentOperator != null) currentOperator.Abort();
    186     }
    187 
    188198  }
    189199}
Note: See TracChangeset for help on using the changeset viewer.