Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
03/01/08 16:19:11 (17 years ago)
Author:
gkronber
Message:

fixed some synchronization issues in DistributedEngine and grid infrastructure

(ticket ref #2)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs

    r34 r35  
    3737    private List<Guid> runningEngines = new List<Guid>();
    3838    private string serverAddress;
     39    private bool cancelRequested;
    3940    public string ServerAddress {
    4041      get { return serverAddress; }
     
    5051      }
    5152    }
    52 
    5353    public override object Clone(IDictionary<Guid, object> clonedObjects) {
    5454      DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects);
     
    8181
    8282    public override void Abort() {
    83       base.Abort();
    84       foreach(Guid engineGuid in runningEngines) {
    85         server.AbortEngine(engineGuid);
     83      lock(runningEngines) {
     84        cancelRequested = true;
     85        foreach(Guid engineGuid in runningEngines) {
     86          server.AbortEngine(engineGuid);
     87        }
    8688      }
     89    }
     90    public override void Reset() {
     91      base.Reset();
     92      engineOperations.Clear();
     93      runningEngines.Clear();
     94      cancelRequested = false;
    8795    }
    8896
    8997    protected override void ProcessNextOperation() {
    90       if(runningEngines.Count != 0) {
    91         Guid engineGuid = runningEngines[0];
    92         byte[] resultXml = server.TryEndExecuteEngine(engineGuid,100);
    93         if(resultXml != null) {
    94           GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress);
    95           ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
    96           IScope oldScope = engineOperations[engineGuid].Scope;
    97           oldScope.Clear();
    98           foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) {
    99             oldScope.AddVariable(variable);
     98      lock(runningEngines) {
     99        if(runningEngines.Count == 0 && cancelRequested) {
     100          base.Abort();
     101          cancelRequested = false;
     102          return;
     103        }
     104        if(runningEngines.Count != 0) {
     105          Guid engineGuid = runningEngines[0];
     106          byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100);
     107          if(resultXml != null) {
     108            GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress);
     109            ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
     110            IScope oldScope = engineOperations[engineGuid].Scope;
     111            oldScope.Clear();
     112            foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) {
     113              oldScope.AddVariable(variable);
     114            }
     115            foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) {
     116              oldScope.AddSubScope(subScope);
     117            }
     118            OnOperationExecuted(engineOperations[engineGuid]);
     119            if(resultEngine.ExecutionStack.Count != 0) {
     120              foreach(IOperation op in resultEngine.ExecutionStack) {
     121                myExecutionStack.Push(op);
     122              }
     123            }
     124            runningEngines.Remove(engineGuid);
     125            engineOperations.Remove(engineGuid);
    100126          }
    101           foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) {
    102             oldScope.AddSubScope(subScope);
     127          return;
     128        }
     129        IOperation operation = myExecutionStack.Pop();
     130        if(operation is AtomicOperation) {
     131          AtomicOperation atomicOperation = (AtomicOperation)operation;
     132          IOperation next = null;
     133          try {
     134            next = atomicOperation.Operator.Execute(atomicOperation.Scope);
     135          } catch(Exception ex) {
     136            // push operation on stack again
     137            myExecutionStack.Push(atomicOperation);
     138            Abort();
     139            ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    103140          }
    104           runningEngines.Remove(engineGuid);
    105           engineOperations.Remove(engineGuid);
    106         }
    107 
    108         if(Canceled) {
    109           // write back not finished tasks
    110           //CompositeOperation remaining = new CompositeOperation();
    111           //remaining.ExecuteInParallel = true;
    112           //for(int i = 0; i < list.tasks.Length; i++) {
    113           //  if(list.tasks[i].Count > 0) {
    114           //    CompositeOperation task = new CompositeOperation();
    115           //    while(list.tasks[i].Count > 0)
    116           //      task.AddOperation(list.tasks[i].Pop());
    117           //    remaining.AddOperation(task);
    118           //  }
    119           //}
    120           //if(remaining.Operations.Count > 0)
    121           //  stack.Push(remaining);
    122         }
    123         return;
    124       }
    125       IOperation operation = myExecutionStack.Pop();
    126       if(operation is AtomicOperation) {
    127         AtomicOperation atomicOperation = (AtomicOperation)operation;
    128         IOperation next = null;
    129         try {
    130           next = atomicOperation.Operator.Execute(atomicOperation.Scope);
    131         } catch(Exception ex) {
    132           // push operation on stack again
    133           myExecutionStack.Push(atomicOperation);
    134           Abort();
    135           ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    136         }
    137         if(next != null)
    138           myExecutionStack.Push(next);
    139         OnOperationExecuted(atomicOperation);
    140         if(atomicOperation.Operator.Breakpoint) Abort();
    141       } else if(operation is CompositeOperation) {
    142         CompositeOperation compositeOperation = (CompositeOperation)operation;
    143         if(compositeOperation.ExecuteInParallel) {
    144           foreach(AtomicOperation parOperation in compositeOperation.Operations) {
    145             ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed?
    146             MemoryStream memStream = new MemoryStream();
    147             GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
    148             PersistenceManager.Save(engine, stream);
    149             stream.Close();
    150             Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
    151             runningEngines.Add(currentEngineGuid);
    152             engineOperations[currentEngineGuid] = parOperation;
     141          if(next != null)
     142            myExecutionStack.Push(next);
     143          OnOperationExecuted(atomicOperation);
     144          if(atomicOperation.Operator.Breakpoint) Abort();
     145        } else if(operation is CompositeOperation) {
     146          CompositeOperation compositeOperation = (CompositeOperation)operation;
     147          if(compositeOperation.ExecuteInParallel) {
     148            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
     149              ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed?
     150              MemoryStream memStream = new MemoryStream();
     151              GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
     152              PersistenceManager.Save(engine, stream);
     153              stream.Close();
     154              Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
     155              runningEngines.Add(currentEngineGuid);
     156              engineOperations[currentEngineGuid] = parOperation;
     157            }
     158          } else {
     159            for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
     160              myExecutionStack.Push(compositeOperation.Operations[i]);
    153161          }
    154         } else {
    155           for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
    156             myExecutionStack.Push(compositeOperation.Operations[i]);
    157162        }
    158163      }
Note: See TracChangeset for help on using the changeset viewer.