Free cookie consent management tool by TermsFeed Policy Generator

Changeset 219


Ignore:
Timestamp:
05/06/08 12:22:58 (16 years ago)
Author:
gkronber
Message:
  • extracted communication code out of the DistributedEngine into class JobManager
  • implemented a method to retrieve the JobState for a given job (specified by it's guid) in GridServer
  • implemented restarting of jobs in JobManager
  • improved exception handling in JobManager and DistributedEngine

(ticket #136)

Location:
trunk/sources
Files:
1 added
8 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs

    r36 r219  
    3030using System.IO;
    3131using System.IO.Compression;
     32using HeuristicLab.PluginInfrastructure;
     33using System.Windows.Forms;
    3234
    3335namespace HeuristicLab.DistributedEngine {
    3436  public class DistributedEngine : EngineBase, IEditable {
    35     private IGridServer server;
    36     private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>();
    37     private List<Guid> runningEngines = new List<Guid>();
     37    private JobManager jobManager;
     38    private CompositeOperation waitingOperations;
    3839    private string serverAddress;
    39     private bool cancelRequested;
    40     private CompositeOperation waitingOperations;
    4140    public string ServerAddress {
    4241      get { return serverAddress; }
     
    4544          serverAddress = value;
    4645        }
    47       }
    48     }
    49     public override bool Terminated {
    50       get {
    51         return myExecutionStack.Count == 0 && runningEngines.Count == 0 && waitingOperations==null;
    5246      }
    5347    }
     
    6660
    6761    public override void Execute() {
    68       NetTcpBinding binding = new NetTcpBinding();
    69       binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
    70       binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
    71       binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
    72       binding.Security.Mode = SecurityMode.None;
    73       ChannelFactory<IGridServer> factory = new ChannelFactory<IGridServer>(binding);
    74       server = factory.CreateChannel(new EndpointAddress(serverAddress));
    75 
     62      if(jobManager == null) this.jobManager = new JobManager(serverAddress);
     63      jobManager.Reset();
    7664      base.Execute();
    7765    }
     
    8169    }
    8270
    83     public override void Abort() {
    84       lock(runningEngines) {
    85         cancelRequested = true;
    86         foreach(Guid engineGuid in runningEngines) {
    87           server.AbortEngine(engineGuid);
     71    protected override void ProcessNextOperation() {
     72      IOperation operation = myExecutionStack.Pop();
     73      if(operation is AtomicOperation) {
     74        AtomicOperation atomicOperation = (AtomicOperation)operation;
     75        IOperation next = null;
     76        try {
     77          next = atomicOperation.Operator.Execute(atomicOperation.Scope);
     78        } catch(Exception ex) {
     79          // push operation on stack again
     80          myExecutionStack.Push(atomicOperation);
     81          Abort();
     82          ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    8883        }
    89       }
    90     }
    91     public override void Reset() {
    92       base.Reset();
    93       engineOperations.Clear();
    94       runningEngines.Clear();
    95       cancelRequested = false;
    96     }
    97 
    98     protected override void ProcessNextOperation() {
    99       lock(runningEngines) {
    100         if(runningEngines.Count == 0 && cancelRequested) {
    101           base.Abort();
    102           cancelRequested = false;
    103           if(waitingOperations != null && waitingOperations.Operations.Count != 0) {
    104             myExecutionStack.Push(waitingOperations);
    105             waitingOperations = null;
     84        if(next != null)
     85          myExecutionStack.Push(next);
     86        OnOperationExecuted(atomicOperation);
     87        if(atomicOperation.Operator.Breakpoint) Abort();
     88      } else if(operation is CompositeOperation) {
     89        CompositeOperation compositeOperation = (CompositeOperation)operation;
     90        if(compositeOperation.ExecuteInParallel) {
     91          WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count];
     92          int i = 0;
     93          foreach(AtomicOperation parOperation in compositeOperation.Operations) {
     94            waithandles[i++] = jobManager.BeginExecuteOperation(OperatorGraph, GlobalScope, parOperation);
    10695          }
    107           return;
    108         }
    109         if(runningEngines.Count != 0) {
    110           Guid engineGuid = runningEngines[0];
    111           byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100);
    112           if(resultXml != null) {
    113             GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress);
    114             ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
    115             IScope oldScope = engineOperations[engineGuid].Scope;
    116             oldScope.Clear();
    117             foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) {
    118               oldScope.AddVariable(variable);
    119             }
    120             foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) {
    121               oldScope.AddSubScope(subScope);
    122             }
    123             OnOperationExecuted(engineOperations[engineGuid]);
    124 
    125             if(cancelRequested & resultEngine.ExecutionStack.Count != 0) {
    126               if(waitingOperations == null) {
    127                 waitingOperations = new CompositeOperation();
    128                 waitingOperations.ExecuteInParallel = false;
    129               }
    130               CompositeOperation task = new CompositeOperation();
    131               while(resultEngine.ExecutionStack.Count > 0) {
    132                 AtomicOperation oldOperation = (AtomicOperation)resultEngine.ExecutionStack.Pop();
    133                 if(oldOperation.Scope == resultEngine.InitialOperation.Scope) {
    134                   oldOperation = new AtomicOperation(oldOperation.Operator, oldScope);
    135                 }
    136                 task.AddOperation(oldOperation);
    137               }
    138               waitingOperations.AddOperation(task);
    139             }
    140             runningEngines.Remove(engineGuid);
    141             engineOperations.Remove(engineGuid);
     96          WaitHandle.WaitAll(waithandles);
     97          if(jobManager.Exception != null) {
     98            myExecutionStack.Push(compositeOperation);
     99            Abort();
     100            ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(jobManager.Exception); });
    142101          }
    143           return;
    144         }
    145         IOperation operation = myExecutionStack.Pop();
    146         if(operation is AtomicOperation) {
    147           AtomicOperation atomicOperation = (AtomicOperation)operation;
    148           IOperation next = null;
    149           try {
    150             next = atomicOperation.Operator.Execute(atomicOperation.Scope);
    151           } catch(Exception ex) {
    152             // push operation on stack again
    153             myExecutionStack.Push(atomicOperation);
    154             Abort();
    155             ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    156           }
    157           if(next != null)
    158             myExecutionStack.Push(next);
    159           OnOperationExecuted(atomicOperation);
    160           if(atomicOperation.Operator.Breakpoint) Abort();
    161         } else if(operation is CompositeOperation) {
    162           CompositeOperation compositeOperation = (CompositeOperation)operation;
    163           if(compositeOperation.ExecuteInParallel) {
    164             foreach(AtomicOperation parOperation in compositeOperation.Operations) {
    165               ProcessingEngine engine = new ProcessingEngine(OperatorGraph, GlobalScope, parOperation); // OperatorGraph not needed?
    166               MemoryStream memStream = new MemoryStream();
    167               GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
    168               PersistenceManager.Save(engine, stream);
    169               stream.Close();
    170               Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
    171               runningEngines.Add(currentEngineGuid);
    172               engineOperations[currentEngineGuid] = parOperation;
    173             }
    174           } else {
    175             for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
    176               myExecutionStack.Push(compositeOperation.Operations[i]);
    177           }
     102        } else {
     103          for(int i = compositeOperation.Operations.Count - 1; i >= 0; i--)
     104            myExecutionStack.Push(compositeOperation.Operations[i]);
    178105        }
    179106      }
  • trunk/sources/HeuristicLab.DistributedEngine/HeuristicLab.DistributedEngine.csproj

    r30 r219  
    5050  <ItemGroup>
    5151    <Compile Include="HeuristicLabDistributedEnginePlugin.cs" />
     52    <Compile Include="JobManager.cs" />
    5253    <Compile Include="Properties\AssemblyInfo.cs" />
    5354    <Compile Include="DistributedEngine.cs" />
  • trunk/sources/HeuristicLab.Grid/ClientForm.cs

    r115 r219  
    4646    private ProcessingEngine currentEngine;
    4747    private string clientUrl;
    48     private object locker = new object();
    4948
    5049    public ClientForm() {
     
    6261
    6362      // windows XP returns the external ip on index 0 while windows vista returns the external ip on index 2
    64       if (System.Environment.OSVersion.Version.Major >= 6) {
    65         clientUrl = "net.tcp://" + Dns.GetHostAddresses(Dns.GetHostName())[2] + ":" + clientPort.Text +"/Grid/Client";
     63      if(System.Environment.OSVersion.Version.Major >= 6) {
     64        clientUrl = "net.tcp://" + Dns.GetHostAddresses(Dns.GetHostName())[2] + ":" + clientPort.Text + "/Grid/Client";
    6665      } else {
    67         clientUrl = "net.tcp://" + Dns.GetHostAddresses(Dns.GetHostName())[0] + ":" + clientPort.Text +"/Grid/Client";
     66        clientUrl = "net.tcp://" + Dns.GetHostAddresses(Dns.GetHostName())[0] + ":" + clientPort.Text + "/Grid/Client";
    6867      }
    6968
     
    8786        statusTextBox.Text = "Waiting for engine";
    8887
    89       } catch (CommunicationException ex) {
     88      } catch(CommunicationException ex) {
    9089        MessageBox.Show("Exception while connecting to the server: " + ex.Message);
    9190        clientHost.Abort();
     
    106105
    107106    private void fetchOperationTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
    108       lock(locker) {
    109         byte[] engineXml;
    110         fetchOperationTimer.Stop();
    111         if(engineStore.TryTakeEngine(clientUrl, out currentGuid, out engineXml)) {
     107      byte[] engineXml;
     108      fetchOperationTimer.Stop();
     109      try {
     110        if(engineStore.TryTakeEngine(out currentGuid, out engineXml)) {
    112111          currentEngine = RestoreEngine(engineXml);
    113112          if(InvokeRequired) { Invoke((MethodInvoker)delegate() { statusTextBox.Text = "Executing engine"; }); } else statusTextBox.Text = "Executing engine";
     
    126125          fetchOperationTimer.Start();
    127126        }
     127      } catch(Exception ex) {
     128        currentEngine = null;
     129        currentGuid = Guid.Empty;
     130        fetchOperationTimer.Interval = 5000;
     131        fetchOperationTimer.Start();
    128132      }
    129133    }
    130134    public void Abort(Guid guid) {
    131       lock(locker) {
    132         if(!IsRunningEngine(guid)) return;
    133         currentEngine.Abort();
    134       }
     135      throw new NotSupportedException();
    135136    }
    136137    public bool IsRunningEngine(Guid guid) {
    137       return currentGuid == guid;
     138      throw new NotSupportedException();
    138139    }
    139140    private ProcessingEngine RestoreEngine(byte[] engine) {
  • trunk/sources/HeuristicLab.Grid/EngineStore.cs

    r35 r219  
    3131    private List<Guid> engineList;
    3232    private Dictionary<Guid, byte[]> waitingEngines;
    33     private Dictionary<Guid, byte[]> runningEngines;
    3433    private Dictionary<Guid, ManualResetEvent> waitHandles;
    3534    private Dictionary<Guid, byte[]> results;
    36     private Dictionary<Guid, string> runningClients;
     35    private Dictionary<Guid, DateTime> resultDate;
    3736    private object bigLock;
    3837    private ChannelFactory<IClient> clientChannelFactory;
     
    4544    public int RunningJobs {
    4645      get {
    47         return runningEngines.Count;
     46        return waitHandles.Count;
    4847      }
    4948    }
     
    5857      engineList = new List<Guid>();
    5958      waitingEngines = new Dictionary<Guid, byte[]>();
    60       runningEngines = new Dictionary<Guid, byte[]>();
    61       runningClients = new Dictionary<Guid, string>();
    6259      waitHandles = new Dictionary<Guid, ManualResetEvent>();
    6360      results = new Dictionary<Guid, byte[]>();
     61      resultDate = new Dictionary<Guid, DateTime>();
    6462      bigLock = new object();
    6563
     
    7371    }
    7472
    75     public bool TryTakeEngine(string clientUrl, out Guid guid, out byte[] engine) {
     73    public bool TryTakeEngine(out Guid guid, out byte[] engine) {
    7674      lock(bigLock) {
    7775        if(engineList.Count == 0) {
     
    8482          engine = waitingEngines[guid];
    8583          waitingEngines.Remove(guid);
    86           runningEngines[guid] = engine;
    87           runningClients[guid] = clientUrl;
    8884          return true;
    8985        }
     
    9389    public void StoreResult(Guid guid, byte[] result) {
    9490      lock(bigLock) {
    95         if(!runningEngines.ContainsKey(guid)) return; // ignore result when the engine is not known to be running
    96 
    97         runningEngines.Remove(guid);
    98         runningClients.Remove(guid);
     91        // clear old results
     92        List<Guid> expiredResults = FindExpiredResults(DateTime.Now.AddHours(-1.0));
     93        foreach(Guid expiredGuid in expiredResults) {
     94          results.Remove(expiredGuid);
     95          waitHandles.Remove(expiredGuid);
     96          resultDate.Remove(expiredGuid);
     97        }
     98        // add the new result
    9999        results[guid] = result;
     100        resultDate[guid] = DateTime.Now;
    100101        waitHandles[guid].Set();
    101102      }
     103    }
     104
     105    private List<Guid> FindExpiredResults(DateTime expirationDate) {
     106      List<Guid> expiredResults = new List<Guid>();
     107      foreach(Guid guid in results.Keys) {
     108        if(resultDate[guid] < expirationDate) {
     109          expiredResults.Add(guid);
     110        }
     111      }
     112      return expiredResults;
    102113    }
    103114
     
    113124      return GetResult(guid, System.Threading.Timeout.Infinite);
    114125    }
     126
    115127    internal byte[] GetResult(Guid guid, int timeout) {
    116128      lock(bigLock) {
    117         if(waitHandles.ContainsKey(guid)) {
     129        // result already available
     130        if(results.ContainsKey(guid)) {
     131          // if the wait-handle for this result is still alive then close and remove it
     132          if(waitHandles.ContainsKey(guid)) {
     133            ManualResetEvent waitHandle = waitHandles[guid];
     134            waitHandle.Close();
     135            waitHandles.Remove(guid);
     136          }
     137          return results[guid];
     138        } else {
     139          // result not yet available, if there is also no wait-handle for that result then we will never have a result and can return null
     140          if(!waitHandles.ContainsKey(guid)) return null;
     141
     142          // otherwise we have a wait-handle and can wait for the result
    118143          ManualResetEvent waitHandle = waitHandles[guid];
     144          // wait
    119145          if(waitHandle.WaitOne(timeout, true)) {
     146            // ok got the result in within the wait time => close and remove the wait-hande and return the result
    120147            waitHandle.Close();
    121148            waitHandles.Remove(guid);
    122149            byte[] result = results[guid];
    123             results.Remove(guid);
    124150            return result;
    125151          } else {
     152            // no result yet return without result
    126153            return null;
    127154          }
    128         } else {
    129           return null;
    130155        }
    131156      }
     
    133158
    134159    internal void AbortEngine(Guid guid) {
    135       string clientUrl = "";
    136160      lock(bigLock) {
    137         if(runningClients.ContainsKey(guid)) {
    138           clientUrl = runningClients[guid];
    139           IClient client = clientChannelFactory.CreateChannel(new EndpointAddress(clientUrl));
    140           client.Abort(guid);
    141         } else if(waitingEngines.ContainsKey(guid)) {
     161        if(waitingEngines.ContainsKey(guid)) {
    142162          byte[] engine = waitingEngines[guid];
    143163          waitingEngines.Remove(guid);
     
    148168      }
    149169    }
     170
     171    internal JobState JobState(Guid guid) {
     172      lock(bigLock) {
     173        if(waitingEngines.ContainsKey(guid)) return HeuristicLab.Grid.JobState.Waiting;
     174        else if(waitHandles.ContainsKey(guid)) return HeuristicLab.Grid.JobState.Busy;
     175        else if(results.ContainsKey(guid)) return HeuristicLab.Grid.JobState.Finished;
     176        else return HeuristicLab.Grid.JobState.Unkown;
     177      }
     178    }
    150179  }
    151180}
  • trunk/sources/HeuristicLab.Grid/GridServer.cs

    r33 r219  
    3535    }
    3636
     37    public JobState JobState(Guid guid) {
     38      return engineStore.JobState(guid);
     39    }
     40
    3741    public Guid BeginExecuteEngine(byte[] engine) {
    3842      Guid guid = Guid.NewGuid();
  • trunk/sources/HeuristicLab.Grid/IEngineStore.cs

    r32 r219  
    3030  interface IEngineStore {
    3131    [OperationContract]
    32     bool TryTakeEngine(string clientUrl, out Guid guid, out byte[] engine);
     32    bool TryTakeEngine(out Guid guid, out byte[] engine);
    3333
    3434    [OperationContract]
  • trunk/sources/HeuristicLab.Grid/IGridServer.cs

    r33 r219  
    2626
    2727namespace HeuristicLab.Grid {
     28  public enum JobState {
     29    Unkown,
     30    Waiting,
     31    Busy,
     32    Finished
     33  }
     34
     35
    2836  [ServiceContract(Namespace = "http://HeuristicLab.Grid")]
    2937  public interface IGridServer {
     38    [OperationContract]
     39    JobState JobState(Guid guid);
    3040    [OperationContract]
    3141    Guid BeginExecuteEngine(byte[] engine);
  • trunk/sources/HeuristicLab.Grid/ProcessingEngine.cs

    r27 r219  
    6868          myExecutionStack.Push(atomicOperation);
    6969          Abort();
    70           ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    7170        }
    7271        if(next != null)
    7372          myExecutionStack.Push(next);
    74         OnOperationExecuted(atomicOperation);
    7573        if(atomicOperation.Operator.Breakpoint) Abort();
    7674      } else if(operation is CompositeOperation) {
Note: See TracChangeset for help on using the changeset viewer.