Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
03/01/08 15:05:37 (17 years ago)
Author:
gkronber
Message:
  • worked on #2
  • fixed some problems in communication between Grid and DistributedEngine
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs

    r32 r33  
    3434  public class DistributedEngine : EngineBase, IEditable {
    3535    private IGridServer server;
    36     private Dictionary<Guid, AtomicOperation> runningEngines = new Dictionary<Guid, AtomicOperation>();
    37 
     36    private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>();
     37    private List<Guid> runningEngines = new List<Guid>();
    3838    private string serverAddress;
    3939    public string ServerAddress {
     
    4343          serverAddress = value;
    4444        }
     45      }
     46    }
     47    public override bool Terminated {
     48      get {
     49        return myExecutionStack.Count == 0 && runningEngines.Count == 0;
    4550      }
    4651    }
     
    7782    public override void Abort() {
    7883      base.Abort();
    79       foreach(Guid engineGuid in runningEngines.Keys) {
     84      foreach(Guid engineGuid in runningEngines) {
    8085        server.AbortEngine(engineGuid);
    8186      }
     
    8388
    8489    protected override void ProcessNextOperation() {
    85       ProcessNextOperation(myExecutionStack, 0);
    86     }
    87     private void ProcessNextOperation(Stack<IOperation> stack, int currentOperatorIndex) {
    88       IOperation operation = stack.Pop();
     90      if(runningEngines.Count != 0) {
     91        Guid engineGuid = runningEngines[0];
     92        byte[] scopeXml = server.TryEndExecuteEngine(engineGuid,100);
     93        if(scopeXml != null) {
     94          GZipStream stream = new GZipStream(new MemoryStream(scopeXml), CompressionMode.Decompress);
     95          IScope newScope = (IScope)PersistenceManager.Load(stream);
     96          IScope oldScope = engineOperations[engineGuid].Scope;
     97          oldScope.Clear();
     98          foreach(IVariable variable in newScope.Variables) {
     99            oldScope.AddVariable(variable);
     100          }
     101          foreach(IScope subScope in newScope.SubScopes) {
     102            oldScope.AddSubScope(subScope);
     103          }
     104          runningEngines.Remove(engineGuid);
     105          engineOperations.Remove(engineGuid);
     106        }
     107
     108        if(Canceled) {
     109          // write back not finished tasks
     110          //CompositeOperation remaining = new CompositeOperation();
     111          //remaining.ExecuteInParallel = true;
     112          //for(int i = 0; i < list.tasks.Length; i++) {
     113          //  if(list.tasks[i].Count > 0) {
     114          //    CompositeOperation task = new CompositeOperation();
     115          //    while(list.tasks[i].Count > 0)
     116          //      task.AddOperation(list.tasks[i].Pop());
     117          //    remaining.AddOperation(task);
     118          //  }
     119          //}
     120          //if(remaining.Operations.Count > 0)
     121          //  stack.Push(remaining);
     122        }
     123        return;
     124      }
     125      IOperation operation = myExecutionStack.Pop();
    89126      if(operation is AtomicOperation) {
    90127        AtomicOperation atomicOperation = (AtomicOperation)operation;
     
    94131        } catch(Exception ex) {
    95132          // push operation on stack again
    96           stack.Push(atomicOperation);
     133          myExecutionStack.Push(atomicOperation);
    97134          Abort();
    98135          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    99136        }
    100137        if(next != null)
    101           stack.Push(next);
     138          myExecutionStack.Push(next);
    102139        OnOperationExecuted(atomicOperation);
    103140        if(atomicOperation.Operator.Breakpoint) Abort();
     
    112149            stream.Close();
    113150            Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
    114             runningEngines[currentEngineGuid] = parOperation;
     151            runningEngines.Add(currentEngineGuid);
     152            engineOperations[currentEngineGuid] = parOperation;
    115153          }
    116           foreach(Guid engineGuid in runningEngines.Keys) {
    117             byte[] scopeXml = server.EndExecuteEngine(engineGuid);
    118             GZipStream stream = new GZipStream(new MemoryStream(scopeXml), CompressionMode.Decompress);
    119             IScope newScope = (IScope)PersistenceManager.Load(stream);
    120             IScope oldScope = runningEngines[engineGuid].Scope;
    121             oldScope.Clear();
    122             foreach(IVariable variable in newScope.Variables) {
    123               oldScope.AddVariable(variable);
    124             }
    125             foreach(IScope subScope in newScope.SubScopes) {
    126               oldScope.AddSubScope(subScope);
    127             }
    128           }
    129 
    130           // TASK (gkronber 12.2.08)
    131           //if (Canceled) {
    132           //  // write back not finished tasks
    133           //  CompositeOperation remaining = new CompositeOperation();
    134           //  remaining.ExecuteInParallel = true;
    135           //  for (int i = 0; i < list.tasks.Length; i++) {
    136           //    if (list.tasks[i].Count > 0) {
    137           //      CompositeOperation task = new CompositeOperation();
    138           //      while (list.tasks[i].Count > 0)
    139           //        task.AddOperation(list.tasks[i].Pop());
    140           //      remaining.AddOperation(task);
    141           //    }
    142           //  }
    143           //  if (remaining.Operations.Count > 0)
    144           //    stack.Push(remaining);
    145           //}
    146154        } else {
    147155          for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
    148             stack.Push(compositeOperation.Operations[i]);
     156            myExecutionStack.Push(compositeOperation.Operations[i]);
    149157        }
    150158      }
Note: See TracChangeset for help on using the changeset viewer.