Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs @ 260

Last change on this file since 260 was 258, checked in by gkronber, 17 years ago

moved sleep in the results-fetching-loop to the top. It's better to sleep a little before the first try. (ticket #149)

File size: 8.1 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using System.ServiceModel;
6using HeuristicLab.Grid;
7using System.Threading;
8using HeuristicLab.Core;
9using System.IO;
10using System.IO.Compression;
11using System.Windows.Forms;
12
13namespace HeuristicLab.DistributedEngine {
14  class JobManager {
15    private IGridServer server;
16    private string address;
17    private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
18    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
19    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
20    private object connectionLock = new object();
21    private object dictionaryLock = new object();
22
23    private const int MAX_RESTARTS = 5;
24    private const int MAX_CONNECTION_RETRIES = 10;
25    private const int RETRY_TIMEOUT_SEC = 10;
26    private const int CHECK_RESULTS_TIMEOUT = 10;
27
28    private ChannelFactory<IGridServer> factory;
29
30    public JobManager(string address) {
31      this.address = address;
32    }
33
34    internal void Reset() {
35      ResetConnection();
36      lock(dictionaryLock) {
37        foreach(WaitHandle wh in waithandles.Values) wh.Close();
38        waithandles.Clear();
39        engines.Clear();
40        results.Clear();
41      }
42    }
43
44    private void ResetConnection() {
45      lock(connectionLock) {
46        // open a new channel
47        NetTcpBinding binding = new NetTcpBinding();
48        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
49        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
50        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
51        binding.Security.Mode = SecurityMode.None;
52
53        factory = new ChannelFactory<IGridServer>(binding);
54        server = factory.CreateChannel(new EndpointAddress(address));
55      }
56    }
57
58    public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) {
59      ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed?
60      byte[] zippedEngine = ZipEngine(engine);
61      Guid currentEngineGuid = Guid.Empty;
62      bool success = false;
63      int retryCount = 0;
64      do {
65        lock(connectionLock) {
66          try {
67            currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
68            success = true;
69          } catch(TimeoutException timeoutException) {
70            if(retryCount < MAX_CONNECTION_RETRIES) {
71              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
72              retryCount++;
73            } else {
74              throw new ApplicationException("Max retries reached.", timeoutException);
75            }
76          } catch(CommunicationException communicationException) {
77            ResetConnection();
78            // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution)
79            if(retryCount < MAX_CONNECTION_RETRIES) {
80              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
81              retryCount++;
82            } else {
83              throw new ApplicationException("Max retries reached.", communicationException);
84            }
85          }
86        }
87      } while(!success);
88      lock(dictionaryLock) {
89        engines[currentEngineGuid] = engine;
90        waithandles[currentEngineGuid] = new ManualResetEvent(false);
91      }
92      ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
93      return waithandles[currentEngineGuid];
94    }
95
96    private byte[] ZipEngine(ProcessingEngine engine) {
97      MemoryStream memStream = new MemoryStream();
98      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
99      PersistenceManager.Save(engine, stream);
100      stream.Close();
101      byte[] zippedEngine = memStream.ToArray();
102      memStream.Close();
103      return zippedEngine;
104    }
105
106    public IScope EndExecuteOperation(AtomicOperation operation) {
107      byte[] zippedResult = null;
108      lock(dictionaryLock) {
109        zippedResult = results[operation];
110        results.Remove(operation);
111      }
112      // restore the engine
113      using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
114        ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
115        return resultEngine.InitialOperation.Scope;
116      }     
117    }
118
119    private void TryGetResult(object state) {
120      Guid engineGuid = (Guid)state;
121      int restartCounter = 0;
122      do {
123        Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
124        byte[] zippedResult = null;
125        lock(connectionLock) {
126          bool success = false;
127          int retries = 0;
128          do {
129            try {
130              zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
131              success = true;
132            } catch(TimeoutException timeoutException) {
133              success = false;
134              retries++;
135              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
136            } catch(CommunicationException communicationException) {
137              ResetConnection();
138              success = false;
139              retries++;
140              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
141            }
142
143          } while(!success && retries < MAX_CONNECTION_RETRIES);
144        }
145        if(zippedResult != null) {
146          lock(dictionaryLock) {
147            // store result
148            results[engines[engineGuid].InitialOperation] = zippedResult;
149
150            // signal the wait handle and clean up then return
151            engines.Remove(engineGuid);
152            waithandles[engineGuid].Set();
153            waithandles.Remove(engineGuid);
154          }
155          return;
156        } else {
157          // check if the server is still working on the job
158          bool success = false;
159          int retries = 0;
160          JobState jobState = JobState.Unkown;
161          do {
162            try {
163              lock(connectionLock) {
164                jobState = server.JobState(engineGuid);
165              }
166              success = true;
167            } catch(TimeoutException timeoutException) {
168              retries++;
169              success = false;
170              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
171            } catch(CommunicationException communicationException) {
172              ResetConnection();
173              retries++;
174              success = false;
175              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
176            }
177          } while(!success && retries < MAX_CONNECTION_RETRIES);
178          if(jobState == JobState.Unkown) {
179            // restart job
180            ProcessingEngine engine;
181            lock(dictionaryLock) {
182              engine = engines[engineGuid];
183            }
184            byte[] zippedEngine = ZipEngine(engine);
185            success = false;
186            retries = 0;
187            do {
188              try {
189                lock(connectionLock) {
190                  server.BeginExecuteEngine(zippedEngine);
191                }
192                success = true;
193              } catch(TimeoutException timeoutException) {
194                success = false;
195                retries++;
196                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
197              } catch(CommunicationException communicationException) {
198                ResetConnection();
199                success = false;
200                retries++;
201                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
202              }
203            } while(!success && retries < MAX_CONNECTION_RETRIES);
204            restartCounter++;
205          }
206        }
207
208        // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem
209        if(restartCounter > MAX_RESTARTS) {
210          throw new ApplicationException("Maximum number of job restarts reached.");
211        }
212      } while(true);
213    }
214  }
215}
Note: See TracBrowser for help on using the repository browser.