Changeset 32
- Timestamp:
- 02/29/08 19:22:27 (17 years ago)
- Location:
- trunk/sources
- Files:
-
- 1 added
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs ¶
r24 r32 33 33 namespace HeuristicLab.DistributedEngine { 34 34 public class DistributedEngine : EngineBase, IEditable { 35 // currently executed operators36 private IOperator[] currentOperators;37 private int operatorIndex;38 35 private IGridServer server; 36 private Dictionary<Guid, AtomicOperation> runningEngines = new Dictionary<Guid, AtomicOperation>(); 39 37 40 38 private string serverAddress; … … 47 45 } 48 46 } 49 50 public DistributedEngine() {51 currentOperators = new IOperator[1000];52 }53 54 47 55 48 public override object Clone(IDictionary<Guid, object> clonedObjects) { … … 84 77 public override void Abort() { 85 78 base.Abort(); 86 for(int i = 0; i < currentOperators.Length; i++) { 87 if(currentOperators[i] != null) 88 currentOperators[i].Abort(); 79 foreach(Guid engineGuid in runningEngines.Keys) { 80 server.AbortEngine(engineGuid); 89 81 } 90 82 } 91 83 92 84 protected override void ProcessNextOperation() { 93 operatorIndex = 1;94 85 ProcessNextOperation(myExecutionStack, 0); 95 86 } … … 100 91 IOperation next = null; 101 92 try { 102 currentOperators[currentOperatorIndex] = atomicOperation.Operator;103 93 next = atomicOperation.Operator.Execute(atomicOperation.Scope); 104 94 } catch(Exception ex) { … … 115 105 CompositeOperation compositeOperation = (CompositeOperation)operation; 116 106 if(compositeOperation.ExecuteInParallel) { 117 Dictionary<Guid, AtomicOperation> runningEngines = new Dictionary<Guid, AtomicOperation>();118 107 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 119 108 ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed? -
TabularUnified trunk/sources/HeuristicLab.Grid/ClientForm.cs ¶
r24 r32 34 34 using System.IO; 35 35 using System.IO.Compression; 36 using System.Net; 36 37 37 38 namespace HeuristicLab.Grid { 38 public partial class ClientForm : Form {39 39 [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple, UseSynchronizationContext = false)] 40 public partial class ClientForm : Form, IClient { 40 41 private ChannelFactory<IEngineStore> factory; 42 private ServiceHost clientHost; 41 43 private System.Timers.Timer fetchOperationTimer; 42 44 private IEngineStore engineStore; 45 private Guid currentGuid; 46 private ProcessingEngine currentEngine; 47 private string clientUrl; 43 48 44 49 public ClientForm() { … … 48 53 fetchOperationTimer.Elapsed += new System.Timers.ElapsedEventHandler(fetchOperationTimer_Elapsed); 49 54 statusTextBox.Text = "Stopped"; 55 currentGuid = Guid.Empty; 50 56 } 51 57 52 58 private void startButton_Click(object sender, EventArgs e) { 59 clientUrl = "net.tcp://" + Dns.GetHostAddresses(Dns.GetHostName())[0] + ":8002/Grid/Client"; 60 clientHost = new ServiceHost(this, new Uri(clientUrl)); 53 61 try { 54 62 NetTcpBinding binding = new NetTcpBinding(); … … 56 64 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 57 65 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 58 binding.Security.Mode = SecurityMode.None; 66 binding.Security.Mode = SecurityMode.None; 67 68 clientHost.AddServiceEndpoint(typeof(IClient), binding, clientUrl); 69 clientHost.Open(); 70 59 71 factory = new ChannelFactory<IEngineStore>(binding); 60 72 engineStore = factory.CreateChannel(new EndpointAddress(addressTextBox.Text)); … … 65 77 statusTextBox.Text = "Waiting for engine"; 66 78 67 } catch ( Exception ex) {79 } catch (CommunicationException ex) { 68 80 MessageBox.Show("Exception while connecting to the server: " + ex.Message); 81 clientHost.Abort(); 69 82 startButton.Enabled = true; 70 83 stopButton.Enabled = false; … … 76 89 fetchOperationTimer.Stop(); 77 90 factory.Abort(); 91 clientHost.Close(); 78 92 statusTextBox.Text = "Stopped"; 79 93 stopButton.Enabled = false; … … 83 97 private void fetchOperationTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { 84 98 byte[] engineXml; 85 Guid guid;86 99 fetchOperationTimer.Stop(); 87 if (engineStore.TryTakeEngine( out guid, out engineXml)) {88 ProcessingEngine engine = RestoreEngine(engineXml);100 if (engineStore.TryTakeEngine(clientUrl, out currentGuid, out engineXml)) { 101 currentEngine = RestoreEngine(engineXml); 89 102 if (InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Executing engine"; }); } else statusTextBox.Text = "Executing engine"; 90 engine.Finished += delegate(object src, EventArgs args) { 91 byte[] resultScopeXml = SaveScope(engine.InitialOperation.Scope); 92 engineStore.StoreResult(guid, resultScopeXml); 103 currentEngine.Finished += delegate(object src, EventArgs args) { 104 byte[] resultScopeXml = SaveScope(currentEngine.InitialOperation.Scope); 105 engineStore.StoreResult(currentGuid, resultScopeXml); 106 currentGuid = Guid.Empty; 107 currentEngine = null; 93 108 fetchOperationTimer.Interval = 100; 94 109 fetchOperationTimer.Start(); 95 110 }; 96 engine.Execute();111 currentEngine.Execute(); 97 112 } else { 98 113 if(InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Waiting for engine"; }); } else statusTextBox.Text = "Waiting for engine"; … … 100 115 fetchOperationTimer.Start(); 101 116 } 117 } 118 public void Abort(Guid guid) { 119 if(!IsRunningEngine(guid)) return; 120 currentEngine.Abort(); 121 } 122 public bool IsRunningEngine(Guid guid) { 123 return currentGuid == guid; 102 124 } 103 125 private ProcessingEngine RestoreEngine(byte[] engine) { -
TabularUnified trunk/sources/HeuristicLab.Grid/EngineStore.cs ¶
r2 r32 27 27 28 28 namespace HeuristicLab.Grid { 29 [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple, UseSynchronizationContext =false)]29 [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple, UseSynchronizationContext = false)] 30 30 public class EngineStore : IEngineStore { 31 31 private Queue<Guid> engineQueue; … … 33 33 private Dictionary<Guid, byte[]> runningEngines; 34 34 private Dictionary<Guid, byte[]> results; 35 private Dictionary<Guid, string> runningClients; 35 36 private object bigLock; 37 private ChannelFactory<IClient> clientChannelFactory; 36 38 37 39 private event EventHandler ResultRecieved; … … 59 61 waitingEngines = new Dictionary<Guid, byte[]>(); 60 62 runningEngines = new Dictionary<Guid, byte[]>(); 63 runningClients = new Dictionary<Guid, string>(); 61 64 results = new Dictionary<Guid, byte[]>(); 62 65 bigLock = new object(); 66 67 NetTcpBinding binding = new NetTcpBinding(); 68 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes 69 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 70 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 71 binding.Security.Mode = SecurityMode.None; 72 73 clientChannelFactory = new ChannelFactory<IClient>(binding); 63 74 } 64 75 65 public bool TryTakeEngine( out Guid guid, out byte[] engine) {66 lock 67 if 76 public bool TryTakeEngine(string clientUrl, out Guid guid, out byte[] engine) { 77 lock(bigLock) { 78 if(engineQueue.Count == 0) { 68 79 guid = Guid.Empty; 69 80 engine = null; … … 74 85 waitingEngines.Remove(guid); 75 86 runningEngines[guid] = engine; 87 runningClients[guid] = clientUrl; 76 88 return true; 77 89 } … … 80 92 81 93 public void StoreResult(Guid guid, byte[] result) { 82 lock 83 if 94 lock(bigLock) { 95 if(!runningEngines.ContainsKey(guid)) return; // ignore result when the engine is not known to be running 84 96 85 97 runningEngines.Remove(guid); 98 runningClients.Remove(guid); 86 99 results[guid] = result; 87 100 OnResultRecieved(guid); … … 90 103 91 104 internal void AddEngine(Guid guid, byte[] engine) { 92 lock 105 lock(bigLock) { 93 106 engineQueue.Enqueue(guid); 94 107 waitingEngines.Add(guid, engine); … … 97 110 98 111 internal byte[] RemoveResult(Guid guid) { 99 lock 112 lock(bigLock) { 100 113 byte[] result = results[guid]; 101 114 results.Remove(guid); … … 106 119 internal byte[] GetResult(Guid guid) { 107 120 ManualResetEvent waitHandle = new ManualResetEvent(false); 108 lock 109 if 121 lock(bigLock) { 122 if(results.ContainsKey(guid)) { 110 123 byte[] result = results[guid]; 111 124 results.Remove(guid); … … 114 127 ResultRecieved += delegate(object source, EventArgs args) { 115 128 ResultRecievedEventArgs resultArgs = (ResultRecievedEventArgs)args; 116 if 129 if(resultArgs.resultGuid == guid) { 117 130 waitHandle.Set(); 118 131 } … … 124 137 waitHandle.Close(); 125 138 126 lock 139 lock(bigLock) { 127 140 byte[] result = results[guid]; 128 141 results.Remove(guid); … … 131 144 } 132 145 146 internal void AbortEngine(Guid guid) { 147 string clientUrl = ""; 148 lock(bigLock) { 149 if(runningClients.ContainsKey(guid)) { 150 clientUrl = runningClients[guid]; 151 } 152 153 if(clientUrl != "") { 154 IClient client = clientChannelFactory.CreateChannel(new EndpointAddress(clientUrl)); 155 client.Abort(guid); 156 } 157 } 158 } 159 133 160 private void OnResultRecieved(Guid guid) { 134 161 ResultRecievedEventArgs args = new ResultRecievedEventArgs(); 135 162 args.resultGuid = guid; 136 if 163 if(ResultRecieved != null) { 137 164 ResultRecieved(this, args); 138 165 } -
TabularUnified trunk/sources/HeuristicLab.Grid/GridServer.cs ¶
r2 r32 46 46 47 47 public void AbortEngine(Guid engine) { 48 throw new NotImplementedException();48 engineStore.AbortEngine(engine); 49 49 } 50 50 } -
TabularUnified trunk/sources/HeuristicLab.Grid/HeuristicLab.Grid.csproj ¶
r30 r32 63 63 <Compile Include="GridServerApplication.cs" /> 64 64 <Compile Include="HeuristicLabGridPlugin.cs" /> 65 <Compile Include="IClient.cs" /> 65 66 <Compile Include="IEngineStore.cs" /> 66 67 <Compile Include="IGridServer.cs" /> -
TabularUnified trunk/sources/HeuristicLab.Grid/IEngineStore.cs ¶
r2 r32 29 29 [ServiceContract(Namespace = "http://HeuristicLab.Grid")] 30 30 interface IEngineStore { 31 32 31 [OperationContract] 33 bool TryTakeEngine( out Guid guid, out byte[] engine);32 bool TryTakeEngine(string clientUrl, out Guid guid, out byte[] engine); 34 33 35 34 [OperationContract]
Note: See TracChangeset
for help on using the changeset viewer.