Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs @ 256

Last change on this file since 256 was 256, checked in by gkronber, 16 years ago
  • added statement to remove results from the job-manager
  • added a critical section in case EndExecuteOperation is called while any of the TryGetResult-threads is still running.
  • added statements to make sure that streams to (un)zip serialized engines are closed and resources released

(ticket #149)

File size: 8.2 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using System.ServiceModel;
6using HeuristicLab.Grid;
7using System.Threading;
8using HeuristicLab.Core;
9using System.IO;
10using System.IO.Compression;
11using System.Windows.Forms;
12
13namespace HeuristicLab.DistributedEngine {
14  class JobManager {
15    private IGridServer server;
16    private string address;
17    private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>();
18    private Dictionary<Guid, byte[]> runningEngines = new Dictionary<Guid, byte[]>();
19    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
20    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
21    private object connectionLock = new object();
22    private object dictionaryLock = new object();
23
24    private const int MAX_RESTARTS = 5;
25    private const int MAX_CONNECTION_RETRIES = 10;
26    private const int RETRY_TIMEOUT_SEC = 10;
27    private const int CHECK_RESULTS_TIMEOUT = 10;
28
29    private ChannelFactory<IGridServer> factory;
30
31    public JobManager(string address) {
32      this.address = address;
33    }
34
35    internal void Reset() {
36      ResetConnection();
37      lock(dictionaryLock) {
38        foreach(WaitHandle wh in waithandles.Values) wh.Close();
39        waithandles.Clear();
40        engineOperations.Clear();
41        runningEngines.Clear();
42        results.Clear();
43      }
44    }
45
46    private void ResetConnection() {
47      lock(connectionLock) {
48        // open a new channel
49        NetTcpBinding binding = new NetTcpBinding();
50        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
51        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
52        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
53        binding.Security.Mode = SecurityMode.None;
54
55        factory = new ChannelFactory<IGridServer>(binding);
56        server = factory.CreateChannel(new EndpointAddress(address));
57      }
58    }
59
60    public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) {
61      ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed?
62      MemoryStream memStream = new MemoryStream();
63      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
64      PersistenceManager.Save(engine, stream);
65      stream.Close();
66      byte[] zippedEngine = memStream.ToArray();
67      memStream.Close();
68      Guid currentEngineGuid = Guid.Empty;
69      bool success = false;
70      int retryCount = 0;
71      do {
72        lock(connectionLock) {
73          try {
74            currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
75            success = true;
76          } catch(TimeoutException timeoutException) {
77            if(retryCount < MAX_CONNECTION_RETRIES) {
78              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
79              retryCount++;
80            } else {
81              throw new ApplicationException("Max retries reached.", timeoutException);
82            }
83          } catch(CommunicationException communicationException) {
84            ResetConnection();
85            // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution)
86            if(retryCount < MAX_CONNECTION_RETRIES) {
87              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
88              retryCount++;
89            } else {
90              throw new ApplicationException("Max retries reached.", communicationException);
91            }
92          }
93        }
94      } while(!success);
95      lock(dictionaryLock) {
96        runningEngines[currentEngineGuid] = memStream.ToArray();
97        engineOperations[currentEngineGuid] = operation;
98        waithandles[currentEngineGuid] = new ManualResetEvent(false);
99      }
100      ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
101      return waithandles[currentEngineGuid];
102    }
103
104    public IScope EndExecuteOperation(AtomicOperation operation) {
105      byte[] zippedResult = null;
106      lock(dictionaryLock) {
107        zippedResult = results[operation];
108        results.Remove(operation);
109      }
110      // restore the engine
111      using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
112        ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
113        return resultEngine.InitialOperation.Scope;
114      }     
115    }
116
117    private void TryGetResult(object state) {
118      Guid engineGuid = (Guid)state;
119      int restartCounter = 0;
120      do {
121        byte[] zippedResult = null;
122        lock(connectionLock) {
123          bool success = false;
124          int retries = 0;
125          do {
126            try {
127              zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
128              success = true;
129            } catch(TimeoutException timeoutException) {
130              success = false;
131              retries++;
132              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
133            } catch(CommunicationException communicationException) {
134              ResetConnection();
135              success = false;
136              retries++;
137              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
138            }
139
140          } while(!success && retries < MAX_CONNECTION_RETRIES);
141        }
142        if(zippedResult != null) {
143          lock(dictionaryLock) {
144            // store result
145            results[engineOperations[engineGuid]] = zippedResult;
146
147            // signal the wait handle and clean up then return
148            engineOperations.Remove(engineGuid);
149            runningEngines.Remove(engineGuid);
150            waithandles[engineGuid].Set();
151            waithandles.Remove(engineGuid);
152          }
153          return;
154        } else {
155          // check if the server is still working on the job
156          bool success = false;
157          int retries = 0;
158          JobState jobState = JobState.Unkown;
159          do {
160            try {
161              lock(connectionLock) {
162                jobState = server.JobState(engineGuid);
163              }
164              success = true;
165            } catch(TimeoutException timeoutException) {
166              retries++;
167              success = false;
168              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
169            } catch(CommunicationException communicationException) {
170              ResetConnection();
171              retries++;
172              success = false;
173              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
174            }
175          } while(!success && retries < MAX_CONNECTION_RETRIES);
176          if(jobState == JobState.Unkown) {
177            // restart job
178            byte[] packedEngine;
179            lock(dictionaryLock) {
180              packedEngine = runningEngines[engineGuid];
181            }
182            success = false;
183            retries = 0;
184            do {
185              try {
186                lock(connectionLock) {
187                  server.BeginExecuteEngine(packedEngine);
188                }
189                success = true;
190              } catch(TimeoutException timeoutException) {
191                success = false;
192                retries++;
193                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
194              } catch(CommunicationException communicationException) {
195                ResetConnection();
196                success = false;
197                retries++;
198                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
199              }
200            } while(!success && retries < MAX_CONNECTION_RETRIES);
201            restartCounter++;
202          }
203        }
204
205        // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem
206        if(restartCounter > MAX_RESTARTS) {
207          throw new ApplicationException("Maximum number of job restarts reached.");
208        }
209
210        Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
211      } while(true);
212    }
213  }
214}
Note: See TracBrowser for help on using the repository browser.