Changeset 33


Ignore:
Timestamp:
03/01/08 15:05:37 (13 years ago)
Author:
gkronber
Message:
  • worked on #2
  • fixed some problems in communication between Grid and DistributedEngine
Location:
trunk/sources
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs

    r32 r33  
    3434  public class DistributedEngine : EngineBase, IEditable {
    3535    private IGridServer server;
    36     private Dictionary<Guid, AtomicOperation> runningEngines = new Dictionary<Guid, AtomicOperation>();
    37 
     36    private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>();
     37    private List<Guid> runningEngines = new List<Guid>();
    3838    private string serverAddress;
    3939    public string ServerAddress {
     
    4343          serverAddress = value;
    4444        }
     45      }
     46    }
     47    public override bool Terminated {
     48      get {
     49        return myExecutionStack.Count == 0 && runningEngines.Count == 0;
    4550      }
    4651    }
     
    7782    public override void Abort() {
    7883      base.Abort();
    79       foreach(Guid engineGuid in runningEngines.Keys) {
     84      foreach(Guid engineGuid in runningEngines) {
    8085        server.AbortEngine(engineGuid);
    8186      }
     
    8388
    8489    protected override void ProcessNextOperation() {
    85       ProcessNextOperation(myExecutionStack, 0);
    86     }
    87     private void ProcessNextOperation(Stack<IOperation> stack, int currentOperatorIndex) {
    88       IOperation operation = stack.Pop();
     90      if(runningEngines.Count != 0) {
     91        Guid engineGuid = runningEngines[0];
     92        byte[] scopeXml = server.TryEndExecuteEngine(engineGuid,100);
     93        if(scopeXml != null) {
     94          GZipStream stream = new GZipStream(new MemoryStream(scopeXml), CompressionMode.Decompress);
     95          IScope newScope = (IScope)PersistenceManager.Load(stream);
     96          IScope oldScope = engineOperations[engineGuid].Scope;
     97          oldScope.Clear();
     98          foreach(IVariable variable in newScope.Variables) {
     99            oldScope.AddVariable(variable);
     100          }
     101          foreach(IScope subScope in newScope.SubScopes) {
     102            oldScope.AddSubScope(subScope);
     103          }
     104          runningEngines.Remove(engineGuid);
     105          engineOperations.Remove(engineGuid);
     106        }
     107
     108        if(Canceled) {
     109          // write back not finished tasks
     110          //CompositeOperation remaining = new CompositeOperation();
     111          //remaining.ExecuteInParallel = true;
     112          //for(int i = 0; i < list.tasks.Length; i++) {
     113          //  if(list.tasks[i].Count > 0) {
     114          //    CompositeOperation task = new CompositeOperation();
     115          //    while(list.tasks[i].Count > 0)
     116          //      task.AddOperation(list.tasks[i].Pop());
     117          //    remaining.AddOperation(task);
     118          //  }
     119          //}
     120          //if(remaining.Operations.Count > 0)
     121          //  stack.Push(remaining);
     122        }
     123        return;
     124      }
     125      IOperation operation = myExecutionStack.Pop();
    89126      if(operation is AtomicOperation) {
    90127        AtomicOperation atomicOperation = (AtomicOperation)operation;
     
    94131        } catch(Exception ex) {
    95132          // push operation on stack again
    96           stack.Push(atomicOperation);
     133          myExecutionStack.Push(atomicOperation);
    97134          Abort();
    98135          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    99136        }
    100137        if(next != null)
    101           stack.Push(next);
     138          myExecutionStack.Push(next);
    102139        OnOperationExecuted(atomicOperation);
    103140        if(atomicOperation.Operator.Breakpoint) Abort();
     
    112149            stream.Close();
    113150            Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
    114             runningEngines[currentEngineGuid] = parOperation;
     151            runningEngines.Add(currentEngineGuid);
     152            engineOperations[currentEngineGuid] = parOperation;
    115153          }
    116           foreach(Guid engineGuid in runningEngines.Keys) {
    117             byte[] scopeXml = server.EndExecuteEngine(engineGuid);
    118             GZipStream stream = new GZipStream(new MemoryStream(scopeXml), CompressionMode.Decompress);
    119             IScope newScope = (IScope)PersistenceManager.Load(stream);
    120             IScope oldScope = runningEngines[engineGuid].Scope;
    121             oldScope.Clear();
    122             foreach(IVariable variable in newScope.Variables) {
    123               oldScope.AddVariable(variable);
    124             }
    125             foreach(IScope subScope in newScope.SubScopes) {
    126               oldScope.AddSubScope(subScope);
    127             }
    128           }
    129 
    130           // TASK (gkronber 12.2.08)
    131           //if (Canceled) {
    132           //  // write back not finished tasks
    133           //  CompositeOperation remaining = new CompositeOperation();
    134           //  remaining.ExecuteInParallel = true;
    135           //  for (int i = 0; i < list.tasks.Length; i++) {
    136           //    if (list.tasks[i].Count > 0) {
    137           //      CompositeOperation task = new CompositeOperation();
    138           //      while (list.tasks[i].Count > 0)
    139           //        task.AddOperation(list.tasks[i].Pop());
    140           //      remaining.AddOperation(task);
    141           //    }
    142           //  }
    143           //  if (remaining.Operations.Count > 0)
    144           //    stack.Push(remaining);
    145           //}
    146154        } else {
    147155          for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
    148             stack.Push(compositeOperation.Operations[i]);
     156            myExecutionStack.Push(compositeOperation.Operations[i]);
    149157        }
    150158      }
  • trunk/sources/HeuristicLab.Grid/EngineStore.cs

    r32 r33  
    3232    private Dictionary<Guid, byte[]> waitingEngines;
    3333    private Dictionary<Guid, byte[]> runningEngines;
     34    private Dictionary<Guid, ManualResetEvent> waitHandles;
    3435    private Dictionary<Guid, byte[]> results;
    3536    private Dictionary<Guid, string> runningClients;
    3637    private object bigLock;
    3738    private ChannelFactory<IClient> clientChannelFactory;
    38 
    39     private event EventHandler ResultRecieved;
    40 
    4139    public int WaitingJobs {
    4240      get {
     
    6260      runningEngines = new Dictionary<Guid, byte[]>();
    6361      runningClients = new Dictionary<Guid, string>();
     62      waitHandles = new Dictionary<Guid, ManualResetEvent>();
    6463      results = new Dictionary<Guid, byte[]>();
    6564      bigLock = new object();
     
    9897        runningClients.Remove(guid);
    9998        results[guid] = result;
    100         OnResultRecieved(guid);
     99        waitHandles[guid].Set();
    101100      }
    102101    }
     
    106105        engineQueue.Enqueue(guid);
    107106        waitingEngines.Add(guid, engine);
     107        waitHandles.Add(guid, new ManualResetEvent(false));
    108108      }
    109109    }
     
    118118
    119119    internal byte[] GetResult(Guid guid) {
    120       ManualResetEvent waitHandle = new ManualResetEvent(false);
     120      return GetResult(guid, System.Threading.Timeout.Infinite);
     121    }
     122    internal byte[] GetResult(Guid guid, int timeout) {
    121123      lock(bigLock) {
    122         if(results.ContainsKey(guid)) {
    123           byte[] result = results[guid];
    124           results.Remove(guid);
    125           return result;
     124        if(waitHandles.ContainsKey(guid)) {
     125          ManualResetEvent waitHandle = waitHandles[guid];
     126          if(waitHandle.WaitOne(timeout, false)) {
     127            waitHandle.Close();
     128            waitHandles.Remove(guid);
     129            byte[] result = results[guid];
     130            results.Remove(guid);
     131            return result;
     132          } else {
     133            return null;
     134          }
    126135        } else {
    127           ResultRecieved += delegate(object source, EventArgs args) {
    128             ResultRecievedEventArgs resultArgs = (ResultRecievedEventArgs)args;
    129             if(resultArgs.resultGuid == guid) {
    130               waitHandle.Set();
    131             }
    132           };
     136          return null;
    133137        }
    134       }
    135 
    136       waitHandle.WaitOne();
    137       waitHandle.Close();
    138 
    139       lock(bigLock) {
    140         byte[] result = results[guid];
    141         results.Remove(guid);
    142         return result;
    143138      }
    144139    }
     
    157152      }
    158153    }
    159 
    160     private void OnResultRecieved(Guid guid) {
    161       ResultRecievedEventArgs args = new ResultRecievedEventArgs();
    162       args.resultGuid = guid;
    163       if(ResultRecieved != null) {
    164         ResultRecieved(this, args);
    165       }
    166     }
    167 
    168     private class ResultRecievedEventArgs : EventArgs {
    169       public Guid resultGuid;
    170     }
    171154  }
    172155}
  • trunk/sources/HeuristicLab.Grid/GridServer.cs

    r32 r33  
    4444      return engineStore.GetResult(guid);
    4545    }
     46    public byte[] TryEndExecuteEngine(Guid guid, int timeout) {
     47      return engineStore.GetResult(guid, timeout);
     48    }
    4649
    4750    public void AbortEngine(Guid engine) {
  • trunk/sources/HeuristicLab.Grid/IGridServer.cs

    r2 r33  
    3030    [OperationContract]
    3131    Guid BeginExecuteEngine(byte[] engine);
    32    
    3332    [OperationContract]
    3433    byte[] EndExecuteEngine(Guid engineGuid);
    35 
     34    [OperationContract]
     35    byte[] TryEndExecuteEngine(Guid engineGuid, int timeout);
    3636    [OperationContract]
    3737    void AbortEngine(Guid engineGuid);
Note: See TracChangeset for help on using the changeset viewer.