Changeset 228 for trunk/sources/HeuristicLab.DistributedEngine
- Timestamp:
- 05/09/08 16:22:33 (17 years ago)
- Location:
- trunk/sources/HeuristicLab.DistributedEngine
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.DistributedEngine/DistributedEngine.cs
r222 r228 89 89 CompositeOperation compositeOperation = (CompositeOperation)operation; 90 90 if(compositeOperation.ExecuteInParallel) { 91 WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count]; 92 int i = 0; 93 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 94 waithandles[i++] = jobManager.BeginExecuteOperation(OperatorGraph, GlobalScope, parOperation); 95 } 96 // WaitAll works only with maximally 64 waithandles 97 if(waithandles.Length <= 64) { 98 WaitHandle.WaitAll(waithandles); 99 } else { 100 for(i = 0; i < waithandles.Length; i++) { 101 waithandles[i].WaitOne(); 91 try { 92 WaitHandle[] waithandles = new WaitHandle[compositeOperation.Operations.Count]; 93 int i = 0; 94 foreach(AtomicOperation parOperation in compositeOperation.Operations) { 95 waithandles[i++] = jobManager.BeginExecuteOperation(OperatorGraph, GlobalScope, parOperation); 102 96 } 103 } 104 if(jobManager.Exception != null) { 97 // WaitAll works only with maximally 64 waithandles 98 if(waithandles.Length <= 64) { 99 WaitHandle.WaitAll(waithandles); 100 } else { 101 for(i = 0; i < waithandles.Length; i++) { 102 waithandles[i].WaitOne(); 103 } 104 } 105 if(jobManager.Exception != null) { 106 myExecutionStack.Push(compositeOperation); 107 Abort(); 108 ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(jobManager.Exception); }); 109 } 110 } catch(Exception e) { 105 111 myExecutionStack.Push(compositeOperation); 106 112 Abort(); -
trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs
r219 r228 21 21 private const int MAX_RESTARTS = 5; 22 22 private Exception exception; 23 private ChannelFactory<IGridServer> factory; 23 24 public Exception Exception { 24 25 get { return exception; } … … 31 32 internal void Reset() { 32 33 lock(locker) { 33 // open a new channel 34 NetTcpBinding binding = new NetTcpBinding(); 35 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes 36 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 37 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 38 binding.Security.Mode = SecurityMode.None; 39 ChannelFactory<IGridServer> factory = new ChannelFactory<IGridServer>(binding); 40 server = factory.CreateChannel(new EndpointAddress(address)); 41 34 ResetConnection(); 42 35 foreach(WaitHandle wh in waithandles.Values) wh.Close(); 43 36 waithandles.Clear(); … … 48 41 } 49 42 43 private void ResetConnection() { 44 // open a new channel 45 NetTcpBinding binding = new NetTcpBinding(); 46 binding.MaxReceivedMessageSize = 100000000; // 100Mbytes 47 binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars 48 binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; 49 binding.Security.Mode = SecurityMode.None; 50 factory = new ChannelFactory<IGridServer>(binding); 51 server = factory.CreateChannel(new EndpointAddress(address)); 52 } 53 50 54 public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) { 51 55 ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed? 52 56 MemoryStream memStream = new MemoryStream(); 53 57 GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); 54 PersistenceManager.Save(engine, stream); // Careful! Make sure that persistence is thread-safe!58 PersistenceManager.Save(engine, stream); 55 59 stream.Close(); 60 if(factory.State != CommunicationState.Opened) 61 ResetConnection(); 56 62 Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray()); 57 63 lock(locker) { … … 69 75 do { 70 76 try { 77 if(factory.State != CommunicationState.Opened) ResetConnection(); 71 78 byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100); 72 79 if(resultXml != null) {
Note: See TracChangeset
for help on using the changeset viewer.