Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
05/09/08 16:22:33 (17 years ago)
Author:
gkronber
Message:

bug fixing in DistributedEngine and Grid-Infrastructure

Location:
trunk/sources/HeuristicLab.DistributedEngine
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs

    r222 r228  
    8989        CompositeOperation compositeOperation = (CompositeOperation)operation;
    9090        if(compositeOperation.ExecuteInParallel) {
    91           WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
    92           int i = 0;
    93           foreach(AtomicOperation parOperation in compositeOperation.Operations) {
    94             waithandles[i++] = jobManager.BeginExecuteOperation(OperatorGraph, GlobalScope, parOperation);
    95           }
    96           // WaitAll works only with maximally 64 waithandles
    97           if(waithandles.Length <= 64) {
    98             WaitHandle.WaitAll(waithandles);
    99           } else {
    100             for(i = 0; i < waithandles.Length; i++) {
    101               waithandles[i].WaitOne();
     91          try {
     92            WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
     93            int i = 0;
     94            foreach(AtomicOperation parOperation in compositeOperation.Operations) {
     95              waithandles[i++] = jobManager.BeginExecuteOperation(OperatorGraph, GlobalScope, parOperation);
    10296            }
    103           }
    104           if(jobManager.Exception != null) {
     97            // WaitAll works only with maximally 64 waithandles
     98            if(waithandles.Length <= 64) {
     99              WaitHandle.WaitAll(waithandles);
     100            } else {
     101              for(i = 0; i < waithandles.Length; i++) {
     102                waithandles[i].WaitOne();
     103              }
     104            }
     105            if(jobManager.Exception != null) {
     106              myExecutionStack.Push(compositeOperation);
     107              Abort();
     108              ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(jobManager.Exception); });
     109            }
     110          } catch(Exception e) {
    105111            myExecutionStack.Push(compositeOperation);
    106112            Abort();
  • trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs

    r219 r228  
    2121    private const int MAX_RESTARTS = 5;
    2222    private Exception exception;
     23    private ChannelFactory<IGridServer> factory;
    2324    public Exception Exception {
    2425      get { return exception; }
     
    3132    internal void Reset() {
    3233      lock(locker) {
    33         // open a new channel
    34         NetTcpBinding binding = new NetTcpBinding();
    35         binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
    36         binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
    37         binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
    38         binding.Security.Mode = SecurityMode.None;
    39         ChannelFactory<IGridServer> factory = new ChannelFactory<IGridServer>(binding);
    40         server = factory.CreateChannel(new EndpointAddress(address));
    41 
     34        ResetConnection();
    4235        foreach(WaitHandle wh in waithandles.Values) wh.Close();
    4336        waithandles.Clear();
     
    4841    }
    4942
     43    private void ResetConnection() {
     44      // open a new channel
     45      NetTcpBinding binding = new NetTcpBinding();
     46      binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
     47      binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
     48      binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
     49      binding.Security.Mode = SecurityMode.None;
     50      factory = new ChannelFactory<IGridServer>(binding);
     51      server = factory.CreateChannel(new EndpointAddress(address));
     52    }
     53
    5054    public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) {
    5155      ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed?
    5256      MemoryStream memStream = new MemoryStream();
    5357      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
    54       PersistenceManager.Save(engine, stream); // Careful! Make sure that persistence is thread-safe!
     58      PersistenceManager.Save(engine, stream);
    5559      stream.Close();
     60      if(factory.State != CommunicationState.Opened)
     61        ResetConnection();
    5662      Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
    5763      lock(locker) {
     
    6975      do {
    7076        try {
     77          if(factory.State != CommunicationState.Opened) ResetConnection();
    7178          byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100);
    7279          if(resultXml != null) {
Note: See TracChangeset for help on using the changeset viewer.