Changeset 32


Ignore:
Timestamp:
02/29/08 19:22:27 (13 years ago)
Author:
gkronber
Message:

worked on #2

Location:
trunk/sources
Files:
1 added
6 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs

    r24 r32  
    3333namespace HeuristicLab.DistributedEngine {
    3434  public class DistributedEngine : EngineBase, IEditable {
    35     // currently executed operators
    36     private IOperator[] currentOperators;
    37     private int operatorIndex;
    3835    private IGridServer server;
     36    private Dictionary<Guid, AtomicOperation> runningEngines = new Dictionary<Guid, AtomicOperation>();
    3937
    4038    private string serverAddress;
     
    4745      }
    4846    }
    49 
    50     public DistributedEngine() {
    51       currentOperators = new IOperator[1000];
    52     }
    53 
    5447
    5548    public override object Clone(IDictionary<Guid, object> clonedObjects) {
     
    8477    public override void Abort() {
    8578      base.Abort();
    86       for(int i = 0; i < currentOperators.Length; i++) {
    87         if(currentOperators[i] != null)
    88           currentOperators[i].Abort();
     79      foreach(Guid engineGuid in runningEngines.Keys) {
     80        server.AbortEngine(engineGuid);
    8981      }
    9082    }
    9183
    9284    protected override void ProcessNextOperation() {
    93       operatorIndex = 1;
    9485      ProcessNextOperation(myExecutionStack, 0);
    9586    }
     
    10091        IOperation next = null;
    10192        try {
    102           currentOperators[currentOperatorIndex] = atomicOperation.Operator;
    10393          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
    10494        } catch(Exception ex) {
     
    115105        CompositeOperation compositeOperation = (CompositeOperation)operation;
    116106        if(compositeOperation.ExecuteInParallel) {
    117           Dictionary<Guid, AtomicOperation> runningEngines = new Dictionary<Guid, AtomicOperation>();
    118107          foreach(AtomicOperation parOperation in compositeOperation.Operations) {
    119108            ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed?
  • trunk/sources/HeuristicLab.Grid/ClientForm.cs

    r24 r32  
    3434using System.IO;
    3535using System.IO.Compression;
     36using System.Net;
    3637
    3738namespace HeuristicLab.Grid {
    38   public partial class ClientForm : Form {
    39 
     39  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple, UseSynchronizationContext = false)]
     40  public partial class ClientForm : Form, IClient {
    4041    private ChannelFactory<IEngineStore> factory;
     42    private ServiceHost clientHost;
    4143    private System.Timers.Timer fetchOperationTimer;
    4244    private IEngineStore engineStore;
     45    private Guid currentGuid;
     46    private ProcessingEngine currentEngine;
     47    private string clientUrl;
    4348
    4449    public ClientForm() {
     
    4853      fetchOperationTimer.Elapsed += new System.Timers.ElapsedEventHandler(fetchOperationTimer_Elapsed);
    4954      statusTextBox.Text = "Stopped";
     55      currentGuid = Guid.Empty;
    5056    }
    5157
    5258    private void startButton_Click(object sender, EventArgs e) {
     59      clientUrl = "net.tcp://" + Dns.GetHostAddresses(Dns.GetHostName())[0] + ":8002/Grid/Client";
     60      clientHost = new ServiceHost(this, new Uri(clientUrl));
    5361      try {
    5462        NetTcpBinding binding = new NetTcpBinding();
     
    5664        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
    5765        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
    58         binding.Security.Mode = SecurityMode.None;       
     66        binding.Security.Mode = SecurityMode.None;
     67
     68        clientHost.AddServiceEndpoint(typeof(IClient), binding, clientUrl);
     69        clientHost.Open();
     70
    5971        factory = new ChannelFactory<IEngineStore>(binding);
    6072        engineStore = factory.CreateChannel(new EndpointAddress(addressTextBox.Text));
     
    6577        statusTextBox.Text = "Waiting for engine";
    6678
    67       } catch (Exception ex) {
     79      } catch (CommunicationException ex) {
    6880        MessageBox.Show("Exception while connecting to the server: " + ex.Message);
     81        clientHost.Abort();
    6982        startButton.Enabled = true;
    7083        stopButton.Enabled = false;
     
    7689      fetchOperationTimer.Stop();
    7790      factory.Abort();
     91      clientHost.Close();
    7892      statusTextBox.Text = "Stopped";
    7993      stopButton.Enabled = false;
     
    8397    private void fetchOperationTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
    8498      byte[] engineXml;
    85       Guid guid;
    8699      fetchOperationTimer.Stop();
    87       if (engineStore.TryTakeEngine(out guid, out engineXml)) {
    88         ProcessingEngine engine = RestoreEngine(engineXml);
     100      if (engineStore.TryTakeEngine(clientUrl, out currentGuid, out engineXml)) {
     101        currentEngine = RestoreEngine(engineXml);
    89102        if (InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Executing engine"; }); } else statusTextBox.Text = "Executing engine";
    90         engine.Finished += delegate(object src, EventArgs args) {
    91           byte[] resultScopeXml = SaveScope(engine.InitialOperation.Scope);
    92           engineStore.StoreResult(guid, resultScopeXml);
     103        currentEngine.Finished += delegate(object src, EventArgs args) {
     104          byte[] resultScopeXml = SaveScope(currentEngine.InitialOperation.Scope);
     105          engineStore.StoreResult(currentGuid, resultScopeXml);
     106          currentGuid = Guid.Empty;
     107          currentEngine = null;
    93108          fetchOperationTimer.Interval = 100;
    94109          fetchOperationTimer.Start();
    95110        };
    96         engine.Execute();
     111        currentEngine.Execute();
    97112      } else {
    98113        if(InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Waiting for engine"; }); } else statusTextBox.Text = "Waiting for engine";
     
    100115        fetchOperationTimer.Start();
    101116      }
     117    }
     118    public void Abort(Guid guid) {
     119      if(!IsRunningEngine(guid)) return;
     120      currentEngine.Abort();
     121    }
     122    public bool IsRunningEngine(Guid guid) {
     123      return currentGuid == guid;
    102124    }
    103125    private ProcessingEngine RestoreEngine(byte[] engine) {
  • trunk/sources/HeuristicLab.Grid/EngineStore.cs

    r2 r32  
    2727
    2828namespace HeuristicLab.Grid {
    29   [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple, UseSynchronizationContext=false)]
     29  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple, UseSynchronizationContext = false)]
    3030  public class EngineStore : IEngineStore {
    3131    private Queue<Guid> engineQueue;
     
    3333    private Dictionary<Guid, byte[]> runningEngines;
    3434    private Dictionary<Guid, byte[]> results;
     35    private Dictionary<Guid, string> runningClients;
    3536    private object bigLock;
     37    private ChannelFactory<IClient> clientChannelFactory;
    3638
    3739    private event EventHandler ResultRecieved;
     
    5961      waitingEngines = new Dictionary<Guid, byte[]>();
    6062      runningEngines = new Dictionary<Guid, byte[]>();
     63      runningClients = new Dictionary<Guid, string>();
    6164      results = new Dictionary<Guid, byte[]>();
    6265      bigLock = new object();
     66
     67      NetTcpBinding binding = new NetTcpBinding();
     68      binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
     69      binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
     70      binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
     71      binding.Security.Mode = SecurityMode.None;
     72
     73      clientChannelFactory = new ChannelFactory<IClient>(binding);
    6374    }
    6475
    65     public bool TryTakeEngine(out Guid guid, out byte[] engine) {
    66       lock (bigLock) {
    67         if (engineQueue.Count == 0) {
     76    public bool TryTakeEngine(string clientUrl, out Guid guid, out byte[] engine) {
     77      lock(bigLock) {
     78        if(engineQueue.Count == 0) {
    6879          guid = Guid.Empty;
    6980          engine = null;
     
    7485          waitingEngines.Remove(guid);
    7586          runningEngines[guid] = engine;
     87          runningClients[guid] = clientUrl;
    7688          return true;
    7789        }
     
    8092
    8193    public void StoreResult(Guid guid, byte[] result) {
    82       lock (bigLock) {
    83         if (!runningEngines.ContainsKey(guid)) return; // ignore result when the engine is not known to be running
     94      lock(bigLock) {
     95        if(!runningEngines.ContainsKey(guid)) return; // ignore result when the engine is not known to be running
    8496
    8597        runningEngines.Remove(guid);
     98        runningClients.Remove(guid);
    8699        results[guid] = result;
    87100        OnResultRecieved(guid);
     
    90103
    91104    internal void AddEngine(Guid guid, byte[] engine) {
    92       lock (bigLock) {
     105      lock(bigLock) {
    93106        engineQueue.Enqueue(guid);
    94107        waitingEngines.Add(guid, engine);
     
    97110
    98111    internal byte[] RemoveResult(Guid guid) {
    99       lock (bigLock) {
     112      lock(bigLock) {
    100113        byte[] result = results[guid];
    101114        results.Remove(guid);
     
    106119    internal byte[] GetResult(Guid guid) {
    107120      ManualResetEvent waitHandle = new ManualResetEvent(false);
    108       lock (bigLock) {
    109         if (results.ContainsKey(guid)) {
     121      lock(bigLock) {
     122        if(results.ContainsKey(guid)) {
    110123          byte[] result = results[guid];
    111124          results.Remove(guid);
     
    114127          ResultRecieved += delegate(object source, EventArgs args) {
    115128            ResultRecievedEventArgs resultArgs = (ResultRecievedEventArgs)args;
    116             if (resultArgs.resultGuid == guid) {
     129            if(resultArgs.resultGuid == guid) {
    117130              waitHandle.Set();
    118131            }
     
    124137      waitHandle.Close();
    125138
    126       lock (bigLock) {
     139      lock(bigLock) {
    127140        byte[] result = results[guid];
    128141        results.Remove(guid);
     
    131144    }
    132145
     146    internal void AbortEngine(Guid guid) {
     147      string clientUrl = "";
     148      lock(bigLock) {
     149        if(runningClients.ContainsKey(guid)) {
     150          clientUrl = runningClients[guid];
     151        }
     152
     153        if(clientUrl != "") {
     154          IClient client = clientChannelFactory.CreateChannel(new EndpointAddress(clientUrl));
     155          client.Abort(guid);
     156        }
     157      }
     158    }
     159
    133160    private void OnResultRecieved(Guid guid) {
    134161      ResultRecievedEventArgs args = new ResultRecievedEventArgs();
    135162      args.resultGuid = guid;
    136       if (ResultRecieved != null) {
     163      if(ResultRecieved != null) {
    137164        ResultRecieved(this, args);
    138165      }
  • trunk/sources/HeuristicLab.Grid/GridServer.cs

    r2 r32  
    4646
    4747    public void AbortEngine(Guid engine) {
    48       throw new NotImplementedException();
     48      engineStore.AbortEngine(engine);
    4949    }
    5050  }
  • trunk/sources/HeuristicLab.Grid/HeuristicLab.Grid.csproj

    r30 r32  
    6363    <Compile Include="GridServerApplication.cs" />
    6464    <Compile Include="HeuristicLabGridPlugin.cs" />
     65    <Compile Include="IClient.cs" />
    6566    <Compile Include="IEngineStore.cs" />
    6667    <Compile Include="IGridServer.cs" />
  • trunk/sources/HeuristicLab.Grid/IEngineStore.cs

    r2 r32  
    2929  [ServiceContract(Namespace = "http://HeuristicLab.Grid")]
    3030  interface IEngineStore {
    31 
    3231    [OperationContract]
    33     bool TryTakeEngine(out Guid guid, out byte[] engine);
     32    bool TryTakeEngine(string clientUrl, out Guid guid, out byte[] engine);
    3433
    3534    [OperationContract]
Note: See TracChangeset for help on using the changeset viewer.