Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs @ 249

Last change on this file since 249 was 249, checked in by gkronber, 16 years ago

fixed incorrect sleeping-timeouts (#149)

File size: 8.2 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using System.ServiceModel;
6using HeuristicLab.Grid;
7using System.Threading;
8using HeuristicLab.Core;
9using System.IO;
10using System.IO.Compression;
11using System.Windows.Forms;
12
13namespace HeuristicLab.DistributedEngine {
14  class JobManager {
15    private IGridServer server;
16    private string address;
17    private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>();
18    private Dictionary<Guid, byte[]> runningEngines = new Dictionary<Guid, byte[]>();
19    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
20    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
21    private object connectionLock = new object();
22    private object dictionaryLock = new object();
23
24    private const int MAX_RESTARTS = 5;
25    private const int MAX_CONNECTION_RETRIES = 10;
26    private const int RETRY_TIMEOUT_SEC = 10;
27    private const int CHECK_RESULTS_TIMEOUT = 10;
28
29    private ChannelFactory<IGridServer> factory;
30
31    public JobManager(string address) {
32      this.address = address;
33    }
34
35    internal void Reset() {
36      ResetConnection();
37      lock(dictionaryLock) {
38        foreach(WaitHandle wh in waithandles.Values) wh.Close();
39        waithandles.Clear();
40        engineOperations.Clear();
41        runningEngines.Clear();
42        results.Clear();
43      }
44    }
45
46    private void ResetConnection() {
47      lock(connectionLock) {
48        // open a new channel
49        NetTcpBinding binding = new NetTcpBinding();
50        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
51        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
52        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
53        binding.Security.Mode = SecurityMode.None;
54
55        factory = new ChannelFactory<IGridServer>(binding);
56        server = factory.CreateChannel(new EndpointAddress(address));
57      }
58    }
59
60    public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) {
61      ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed?
62      MemoryStream memStream = new MemoryStream();
63      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
64      PersistenceManager.Save(engine, stream);
65      stream.Close();
66      byte[] zippedEngine = memStream.ToArray();
67      Guid currentEngineGuid = Guid.Empty;
68      bool success = false;
69      int retryCount = 0;
70      do {
71        lock(connectionLock) {
72          if(factory.State != CommunicationState.Opened)
73            ResetConnection();
74          try {
75            currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
76            success = true;
77          } catch(TimeoutException timeoutException) {
78            if(retryCount < MAX_CONNECTION_RETRIES) {
79              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
80              retryCount++;
81            } else {
82              throw new ApplicationException("Max retries reached.", timeoutException);
83            }
84          } catch(CommunicationException communicationException) {
85            // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution)
86            if(retryCount < MAX_CONNECTION_RETRIES) {
87              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
88              retryCount++;
89            } else {
90              throw new ApplicationException("Max retries reached.", communicationException);
91            }
92          }
93        }
94      } while(!success);
95      lock(dictionaryLock) {
96        runningEngines[currentEngineGuid] = memStream.ToArray();
97        engineOperations[currentEngineGuid] = operation;
98        waithandles[currentEngineGuid] = new ManualResetEvent(false);
99      }
100      ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
101      return waithandles[currentEngineGuid];
102    }
103
104    public IScope EndExecuteOperation(AtomicOperation operation) {
105      byte[] zippedResult = results[operation];
106      // restore the engine
107      GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress);
108      ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
109
110      return resultEngine.InitialOperation.Scope;
111    }
112
113    private void TryGetResult(object state) {
114      Guid engineGuid = (Guid)state;
115      int restartCounter = 0;
116      do {
117        byte[] zippedResult = null;
118        lock(connectionLock) {
119          bool success = false;
120          int retries = 0;
121          do {
122            if(factory.State != CommunicationState.Opened) ResetConnection();
123            try {
124              zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
125              success = true;
126            } catch(TimeoutException timeoutException) {
127              success = false;
128              retries++;
129              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
130            } catch(CommunicationException communicationException) {
131              success = false;
132              retries++;
133              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
134            }
135
136          } while(!success && retries < MAX_CONNECTION_RETRIES);
137        }
138        if(zippedResult != null) {
139          lock(dictionaryLock) {
140            // store result
141            results[engineOperations[engineGuid]] = zippedResult;
142
143            // signal the wait handle and clean up then return
144            engineOperations.Remove(engineGuid);
145            runningEngines.Remove(engineGuid);
146            waithandles[engineGuid].Set();
147            waithandles.Remove(engineGuid);
148          }
149          return;
150        } else {
151          // check if the server is still working on the job
152          bool success = false;
153          int retries = 0;
154          JobState jobState = JobState.Unkown;
155          do {
156            try {
157              lock(connectionLock) {
158                if(factory.State != CommunicationState.Opened) ResetConnection();
159                jobState = server.JobState(engineGuid);
160              }
161              success = true;
162            } catch(TimeoutException timeoutException) {
163              retries++;
164              success = false;
165              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
166            } catch(CommunicationException communicationException) {
167              retries++;
168              success = false;
169              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
170            }
171          } while(!success && retries < MAX_CONNECTION_RETRIES);
172          if(jobState == JobState.Unkown) {
173            // restart job
174            byte[] packedEngine;
175            lock(dictionaryLock) {
176              packedEngine = runningEngines[engineGuid];
177            }
178            success = false;
179            retries = 0;
180            do {
181              try {
182                lock(connectionLock) {
183                  if(factory.State != CommunicationState.Opened) ResetConnection();
184                  server.BeginExecuteEngine(packedEngine);
185                }
186                success = true;
187              } catch(TimeoutException timeoutException) {
188                success = false;
189                retries++;
190                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
191              } catch(CommunicationException communicationException) {
192                success = false;
193                retries++;
194                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
195              }
196            } while(!success && retries < MAX_CONNECTION_RETRIES);
197            restartCounter++;
198          }
199        }
200
201        // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem
202        if(restartCounter > MAX_RESTARTS) {
203          throw new ApplicationException("Maximum number of job restarts reached.");
204        }
205
206        Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
207      } while(true);
208    }
209  }
210}
Note: See TracBrowser for help on using the repository browser.