#region License Information /* HeuristicLab * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Text; using System.Xml; using System.Threading; using HeuristicLab.Core; using HeuristicLab.Grid; using System.ServiceModel; using System.IO; using System.IO.Compression; namespace HeuristicLab.DistributedEngine { public class DistributedEngine : EngineBase, IEditable { private IGridServer server; private Dictionary engineOperations = new Dictionary(); private List runningEngines = new List(); private string serverAddress; private bool cancelRequested; private CompositeOperation waitingOperations; public string ServerAddress { get { return serverAddress; } set { if(value != serverAddress) { serverAddress = value; } } } public override bool Terminated { get { return myExecutionStack.Count == 0 && runningEngines.Count == 0 && waitingOperations==null; } } public override object Clone(IDictionary clonedObjects) { DistributedEngine clone = (DistributedEngine)base.Clone(clonedObjects); clone.ServerAddress = serverAddress; return clone; } public override IView CreateView() { return new DistributedEngineEditor(this); } public virtual IEditor CreateEditor() { return new DistributedEngineEditor(this); } public override void Execute() { NetTcpBinding binding = new NetTcpBinding(); binding.MaxReceivedMessageSize = 100000000; // 100Mbytes binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; binding.Security.Mode = SecurityMode.None; ChannelFactory factory = new ChannelFactory(binding); server = factory.CreateChannel(new EndpointAddress(serverAddress)); base.Execute(); } public override void ExecuteSteps(int steps) { throw new InvalidOperationException("DistributedEngine doesn't support stepwise execution"); } public override void Abort() { lock(runningEngines) { cancelRequested = true; foreach(Guid engineGuid in runningEngines) { server.AbortEngine(engineGuid); } } } public override void Reset() { base.Reset(); engineOperations.Clear(); runningEngines.Clear(); cancelRequested = false; } protected override void ProcessNextOperation() { lock(runningEngines) { if(runningEngines.Count == 0 && cancelRequested) { base.Abort(); cancelRequested = false; if(waitingOperations != null && waitingOperations.Operations.Count != 0) { myExecutionStack.Push(waitingOperations); waitingOperations = null; } return; } if(runningEngines.Count != 0) { Guid engineGuid = runningEngines[0]; byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100); if(resultXml != null) { GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress); ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); IScope oldScope = engineOperations[engineGuid].Scope; oldScope.Clear(); foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) { oldScope.AddVariable(variable); } foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) { oldScope.AddSubScope(subScope); } OnOperationExecuted(engineOperations[engineGuid]); if(cancelRequested & resultEngine.ExecutionStack.Count != 0) { if(waitingOperations == null) { waitingOperations = new CompositeOperation(); waitingOperations.ExecuteInParallel = false; } CompositeOperation task = new CompositeOperation(); while(resultEngine.ExecutionStack.Count > 0) { AtomicOperation oldOperation = (AtomicOperation)resultEngine.ExecutionStack.Pop(); if(oldOperation.Scope == resultEngine.InitialOperation.Scope) { oldOperation = new AtomicOperation(oldOperation.Operator, oldScope); } task.AddOperation(oldOperation); } waitingOperations.AddOperation(task); } runningEngines.Remove(engineGuid); engineOperations.Remove(engineGuid); } return; } IOperation operation = myExecutionStack.Pop(); if(operation is AtomicOperation) { AtomicOperation atomicOperation = (AtomicOperation)operation; IOperation next = null; try { next = atomicOperation.Operator.Execute(atomicOperation.Scope); } catch(Exception ex) { // push operation on stack again myExecutionStack.Push(atomicOperation); Abort(); ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); }); } if(next != null) myExecutionStack.Push(next); OnOperationExecuted(atomicOperation); if(atomicOperation.Operator.Breakpoint) Abort(); } else if(operation is CompositeOperation) { CompositeOperation compositeOperation = (CompositeOperation)operation; if(compositeOperation.ExecuteInParallel) { foreach(AtomicOperation parOperation in compositeOperation.Operations) { ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed? MemoryStream memStream = new MemoryStream(); GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); PersistenceManager.Save(engine, stream); stream.Close(); Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); runningEngines.Add(currentEngineGuid); engineOperations[currentEngineGuid] = parOperation; } } else { for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--) myExecutionStack.Push(compositeOperation.Operations[i]); } } } } #region Persistence Methods public override XmlNode GetXmlNode(string name, XmlDocument document, IDictionary persistedObjects) { XmlNode node = base.GetXmlNode(name, document, persistedObjects); XmlAttribute addressAttribute = document.CreateAttribute("ServerAddress"); addressAttribute.Value = ServerAddress; node.Attributes.Append(addressAttribute); return node; } public override void Populate(XmlNode node, IDictionary restoredObjects) { base.Populate(node, restoredObjects); ServerAddress = node.Attributes["ServerAddress"].Value; } #endregion } }