Changeset 35


Ignore:
Timestamp:
03/01/08 16:19:11 (13 years ago)
Author:
gkronber
Message:

fixed some synchronization issues in DistributedEngine and grid infrastructure

(ticket ref #2)

Location:
trunk/sources
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs

    r34 r35  
    3737    private List<Guid> runningEngines = new List<Guid>();
    3838    private string serverAddress;
     39    private bool cancelRequested;
    3940    public string ServerAddress {
    4041      get { return serverAddress; }
     
    5051      }
    5152    }
    52 
    5353    public override object Clone(IDictionary<Guid, object> clonedObjects) {
    5454      DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects);
     
    8181
    8282    public override void Abort() {
    83       base.Abort();
    84       foreach(Guid engineGuid in runningEngines) {
    85         server.AbortEngine(engineGuid);
     83      lock(runningEngines) {
     84        cancelRequested = true;
     85        foreach(Guid engineGuid in runningEngines) {
     86          server.AbortEngine(engineGuid);
     87        }
    8688      }
     89    }
     90    public override void Reset() {
     91      base.Reset();
     92      engineOperations.Clear();
     93      runningEngines.Clear();
     94      cancelRequested = false;
    8795    }
    8896
    8997    protected override void ProcessNextOperation() {
    90       if(runningEngines.Count != 0) {
    91         Guid engineGuid = runningEngines[0];
    92         byte[] resultXml = server.TryEndExecuteEngine(engineGuid,100);
    93         if(resultXml != null) {
    94           GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress);
    95           ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
    96           IScope oldScope = engineOperations[engineGuid].Scope;
    97           oldScope.Clear();
    98           foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) {
    99             oldScope.AddVariable(variable);
     98      lock(runningEngines) {
     99        if(runningEngines.Count == 0 && cancelRequested) {
     100          base.Abort();
     101          cancelRequested = false;
     102          return;
     103        }
     104        if(runningEngines.Count != 0) {
     105          Guid engineGuid = runningEngines[0];
     106          byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100);
     107          if(resultXml != null) {
     108            GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress);
     109            ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
     110            IScope oldScope = engineOperations[engineGuid].Scope;
     111            oldScope.Clear();
     112            foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) {
     113              oldScope.AddVariable(variable);
     114            }
     115            foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) {
     116              oldScope.AddSubScope(subScope);
     117            }
     118            OnOperationExecuted(engineOperations[engineGuid]);
     119            if(resultEngine.ExecutionStack.Count != 0) {
     120              foreach(IOperation op in resultEngine.ExecutionStack) {
     121                myExecutionStack.Push(op);
     122              }
     123            }
     124            runningEngines.Remove(engineGuid);
     125            engineOperations.Remove(engineGuid);
    100126          }
    101           foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) {
    102             oldScope.AddSubScope(subScope);
     127          return;
     128        }
     129        IOperation operation = myExecutionStack.Pop();
     130        if(operation is AtomicOperation) {
     131          AtomicOperation atomicOperation = (AtomicOperation)operation;
     132          IOperation next = null;
     133          try {
     134            next = atomicOperation.Operator.Execute(atomicOperation.Scope);
     135          } catch(Exception ex) {
     136            // push operation on stack again
     137            myExecutionStack.Push(atomicOperation);
     138            Abort();
     139            ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    103140          }
    104           runningEngines.Remove(engineGuid);
    105           engineOperations.Remove(engineGuid);
    106         }
    107 
    108         if(Canceled) {
    109           // write back not finished tasks
    110           //CompositeOperation remaining = new CompositeOperation();
    111           //remaining.ExecuteInParallel = true;
    112           //for(int i = 0; i < list.tasks.Length; i++) {
    113           //  if(list.tasks[i].Count > 0) {
    114           //    CompositeOperation task = new CompositeOperation();
    115           //    while(list.tasks[i].Count > 0)
    116           //      task.AddOperation(list.tasks[i].Pop());
    117           //    remaining.AddOperation(task);
    118           //  }
    119           //}
    120           //if(remaining.Operations.Count > 0)
    121           //  stack.Push(remaining);
    122         }
    123         return;
    124       }
    125       IOperation operation = myExecutionStack.Pop();
    126       if(operation is AtomicOperation) {
    127         AtomicOperation atomicOperation = (AtomicOperation)operation;
    128         IOperation next = null;
    129         try {
    130           next = atomicOperation.Operator.Execute(atomicOperation.Scope);
    131         } catch(Exception ex) {
    132           // push operation on stack again
    133           myExecutionStack.Push(atomicOperation);
    134           Abort();
    135           ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    136         }
    137         if(next != null)
    138           myExecutionStack.Push(next);
    139         OnOperationExecuted(atomicOperation);
    140         if(atomicOperation.Operator.Breakpoint) Abort();
    141       } else if(operation is CompositeOperation) {
    142         CompositeOperation compositeOperation = (CompositeOperation)operation;
    143         if(compositeOperation.ExecuteInParallel) {
    144           foreach(AtomicOperation parOperation in compositeOperation.Operations) {
    145             ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed?
    146             MemoryStream memStream = new MemoryStream();
    147             GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
    148             PersistenceManager.Save(engine, stream);
    149             stream.Close();
    150             Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
    151             runningEngines.Add(currentEngineGuid);
    152             engineOperations[currentEngineGuid] = parOperation;
     141          if(next != null)
     142            myExecutionStack.Push(next);
     143          OnOperationExecuted(atomicOperation);
     144          if(atomicOperation.Operator.Breakpoint) Abort();
     145        } else if(operation is CompositeOperation) {
     146          CompositeOperation compositeOperation = (CompositeOperation)operation;
     147          if(compositeOperation.ExecuteInParallel) {
     148            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
     149              ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed?
     150              MemoryStream memStream = new MemoryStream();
     151              GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
     152              PersistenceManager.Save(engine, stream);
     153              stream.Close();
     154              Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
     155              runningEngines.Add(currentEngineGuid);
     156              engineOperations[currentEngineGuid] = parOperation;
     157            }
     158          } else {
     159            for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
     160              myExecutionStack.Push(compositeOperation.Operations[i]);
    153161          }
    154         } else {
    155           for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
    156             myExecutionStack.Push(compositeOperation.Operations[i]);
    157162        }
    158163      }
  • trunk/sources/HeuristicLab.Grid/ClientForm.cs

    r34 r35  
    4646    private ProcessingEngine currentEngine;
    4747    private string clientUrl;
     48    private object locker = new object();
    4849
    4950    public ClientForm() {
     
    9697
    9798    private void fetchOperationTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
    98       byte[] engineXml;
    99       fetchOperationTimer.Stop();
    100       if (engineStore.TryTakeEngine(clientUrl, out currentGuid, out engineXml)) {
    101         currentEngine = RestoreEngine(engineXml);
    102         if (InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Executing engine"; }); } else statusTextBox.Text = "Executing engine";
    103         currentEngine.Finished += delegate(object src, EventArgs args) {
    104           byte[] resultXml = SaveEngine(currentEngine);
    105           engineStore.StoreResult(currentGuid, resultXml);
    106           currentGuid = Guid.Empty;
    107           currentEngine = null;
    108           fetchOperationTimer.Interval = 100;
     99      lock(locker) {
     100        byte[] engineXml;
     101        fetchOperationTimer.Stop();
     102        if(engineStore.TryTakeEngine(clientUrl, out currentGuid, out engineXml)) {
     103          currentEngine = RestoreEngine(engineXml);
     104          if(InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Executing engine"; }); } else statusTextBox.Text = "Executing engine";
     105          currentEngine.Finished += delegate(object src, EventArgs args) {
     106            byte[] resultXml = SaveEngine(currentEngine);
     107            engineStore.StoreResult(currentGuid, resultXml);
     108            currentGuid = Guid.Empty;
     109            currentEngine = null;
     110            fetchOperationTimer.Interval = 100;
     111            fetchOperationTimer.Start();
     112          };
     113          currentEngine.Execute();
     114        } else {
     115          if(InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Waiting for engine"; }); } else statusTextBox.Text = "Waiting for engine";
     116          fetchOperationTimer.Interval = 5000;
    109117          fetchOperationTimer.Start();
    110         };
    111         currentEngine.Execute();
    112       } else {
    113         if(InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Waiting for engine"; }); } else statusTextBox.Text = "Waiting for engine";
    114         fetchOperationTimer.Interval = 5000;
    115         fetchOperationTimer.Start();
     118        }
    116119      }
    117120    }
    118121    public void Abort(Guid guid) {
    119       if(!IsRunningEngine(guid)) return;
    120       currentEngine.Abort();
     122      lock(locker) {
     123        if(!IsRunningEngine(guid)) return;
     124        currentEngine.Abort();
     125      }
    121126    }
    122127    public bool IsRunningEngine(Guid guid) {
  • trunk/sources/HeuristicLab.Grid/EngineStore.cs

    r34 r35  
    117117        if(waitHandles.ContainsKey(guid)) {
    118118          ManualResetEvent waitHandle = waitHandles[guid];
    119           if(waitHandle.WaitOne(timeout, false)) {
     119          if(waitHandle.WaitOne(timeout, true)) {
    120120            waitHandle.Close();
    121121            waitHandles.Remove(guid);
     
    143143          waitingEngines.Remove(guid);
    144144          engineList.Remove(guid);
     145          waitHandles[guid].Set();
    145146          results.Add(guid, engine);
    146147        }
Note: See TracChangeset for help on using the changeset viewer.