Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
05/06/08 12:22:58 (17 years ago)
Author:
gkronber
Message:
  • extracted communication code out of the DistributedEngine into class JobManager
  • implemented a method to retrieve the JobState for a given job (specified by it's guid) in GridServer
  • implemented restarting of jobs in JobManager
  • improved exception handling in JobManager and DistributedEngine

(ticket #136)

Location:
trunk/sources/HeuristicLab.DistributedEngine
Files:
1 added
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs

    r36 r219  
    3030using System.IO;
    3131using System.IO.Compression;
     32using HeuristicLab.PluginInfrastructure;
     33using System.Windows.Forms;
    3234
    3335namespace HeuristicLab.DistributedEngine {
    3436  public class DistributedEngine : EngineBase, IEditable {
    35     private IGridServer server;
    36     private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>();
    37     private List<Guid> runningEngines = new List<Guid>();
     37    private JobManager jobManager;
     38    private CompositeOperation waitingOperations;
    3839    private string serverAddress;
    39     private bool cancelRequested;
    40     private CompositeOperation waitingOperations;
    4140    public string ServerAddress {
    4241      get { return serverAddress; }
     
    4544          serverAddress = value;
    4645        }
    47       }
    48     }
    49     public override bool Terminated {
    50       get {
    51         return myExecutionStack.Count == 0 && runningEngines.Count == 0 && waitingOperations==null;
    5246      }
    5347    }
     
    6660
    6761    public override void Execute() {
    68       NetTcpBinding binding = new NetTcpBinding();
    69       binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
    70       binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
    71       binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
    72       binding.Security.Mode = SecurityMode.None;
    73       ChannelFactory<IGridServer> factory = new ChannelFactory<IGridServer>(binding);
    74       server = factory.CreateChannel(new EndpointAddress(serverAddress));
    75 
     62      if(jobManager == null) this.jobManager = new JobManager(serverAddress);
     63      jobManager.Reset();
    7664      base.Execute();
    7765    }
     
    8169    }
    8270
    83     public override void Abort() {
    84       lock(runningEngines) {
    85         cancelRequested = true;
    86         foreach(Guid engineGuid in runningEngines) {
    87           server.AbortEngine(engineGuid);
     71    protected override void ProcessNextOperation() {
     72      IOperation operation = myExecutionStack.Pop();
     73      if(operation is AtomicOperation) {
     74        AtomicOperation atomicOperation = (AtomicOperation)operation;
     75        IOperation next = null;
     76        try {
     77          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
     78        } catch(Exception ex) {
     79          // push operation on stack again
     80          myExecutionStack.Push(atomicOperation);
     81          Abort();
     82          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    8883        }
    89       }
    90     }
    91     public override void Reset() {
    92       base.Reset();
    93       engineOperations.Clear();
    94       runningEngines.Clear();
    95       cancelRequested = false;
    96     }
    97 
    98     protected override void ProcessNextOperation() {
    99       lock(runningEngines) {
    100         if(runningEngines.Count == 0 && cancelRequested) {
    101           base.Abort();
    102           cancelRequested = false;
    103           if(waitingOperations != null && waitingOperations.Operations.Count != 0) {
    104             myExecutionStack.Push(waitingOperations);
    105             waitingOperations = null;
     84        if(next != null)
     85          myExecutionStack.Push(next);
     86        OnOperationExecuted(atomicOperation);
     87        if(atomicOperation.Operator.Breakpoint) Abort();
     88      } else if(operation is CompositeOperation) {
     89        CompositeOperation compositeOperation = (CompositeOperation)operation;
     90        if(compositeOperation.ExecuteInParallel) {
     91          WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
     92          int i = 0;
     93          foreach(AtomicOperation parOperation in compositeOperation.Operations) {
     94            waithandles[i++] = jobManager.BeginExecuteOperation(OperatorGraph, GlobalScope, parOperation);
    10695          }
    107           return;
    108         }
    109         if(runningEngines.Count != 0) {
    110           Guid engineGuid = runningEngines[0];
    111           byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100);
    112           if(resultXml != null) {
    113             GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress);
    114             ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
    115             IScope oldScope = engineOperations[engineGuid].Scope;
    116             oldScope.Clear();
    117             foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) {
    118               oldScope.AddVariable(variable);
    119             }
    120             foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) {
    121               oldScope.AddSubScope(subScope);
    122             }
    123             OnOperationExecuted(engineOperations[engineGuid]);
    124 
    125             if(cancelRequested & resultEngine.ExecutionStack.Count != 0) {
    126               if(waitingOperations == null) {
    127                 waitingOperations = new CompositeOperation();
    128                 waitingOperations.ExecuteInParallel = false;
    129               }
    130               CompositeOperation task = new CompositeOperation();
    131               while(resultEngine.ExecutionStack.Count > 0) {
    132                 AtomicOperation oldOperation = (AtomicOperation)resultEngine.ExecutionStack.Pop();
    133                 if(oldOperation.Scope == resultEngine.InitialOperation.Scope) {
    134                   oldOperation = new AtomicOperation(oldOperation.Operator, oldScope);
    135                 }
    136                 task.AddOperation(oldOperation);
    137               }
    138               waitingOperations.AddOperation(task);
    139             }
    140             runningEngines.Remove(engineGuid);
    141             engineOperations.Remove(engineGuid);
     96          WaitHandle.WaitAll(waithandles);
     97          if(jobManager.Exception != null) {
     98            myExecutionStack.Push(compositeOperation);
     99            Abort();
     100            ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(jobManager.Exception); });
    142101          }
    143           return;
    144         }
    145         IOperation operation = myExecutionStack.Pop();
    146         if(operation is AtomicOperation) {
    147           AtomicOperation atomicOperation = (AtomicOperation)operation;
    148           IOperation next = null;
    149           try {
    150             next = atomicOperation.Operator.Execute(atomicOperation.Scope);
    151           } catch(Exception ex) {
    152             // push operation on stack again
    153             myExecutionStack.Push(atomicOperation);
    154             Abort();
    155             ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    156           }
    157           if(next != null)
    158             myExecutionStack.Push(next);
    159           OnOperationExecuted(atomicOperation);
    160           if(atomicOperation.Operator.Breakpoint) Abort();
    161         } else if(operation is CompositeOperation) {
    162           CompositeOperation compositeOperation = (CompositeOperation)operation;
    163           if(compositeOperation.ExecuteInParallel) {
    164             foreach(AtomicOperation parOperation in compositeOperation.Operations) {
    165               ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed?
    166               MemoryStream memStream = new MemoryStream();
    167               GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
    168               PersistenceManager.Save(engine, stream);
    169               stream.Close();
    170               Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
    171               runningEngines.Add(currentEngineGuid);
    172               engineOperations[currentEngineGuid] = parOperation;
    173             }
    174           } else {
    175             for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
    176               myExecutionStack.Push(compositeOperation.Operations[i]);
    177           }
     102        } else {
     103          for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
     104            myExecutionStack.Push(compositeOperation.Operations[i]);
    178105        }
    179106      }
  • trunk/sources/HeuristicLab.DistributedEngine/HeuristicLab.DistributedEngine.csproj

    r30 r219  
    5050  <ItemGroup>
    5151    <Compile Include="HeuristicLabDistributedEnginePlugin.cs" />
     52    <Compile Include="JobManager.cs" />
    5253    <Compile Include="Properties\AssemblyInfo.cs" />
    5354    <Compile Include="DistributedEngine.cs" />
Note: See TracChangeset for help on using the changeset viewer.