Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs @ 286

Last change on this file since 286 was 281, checked in by gkronber, 16 years ago

ticket #155
operations that failed at a grid-client are pushed back onto the stack again to force local execution. If they fail locally as well the user will get the usual error dialog and can fix the bug (and save/load the engine as usual).

File size: 8.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.IO.Compression;
32using System.Windows.Forms;
33
34namespace HeuristicLab.DistributedEngine {
35  class JobManager {
36    private IGridServer server;
37    private string address;
38    private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
39    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
40    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
41    private object connectionLock = new object();
42    private object dictionaryLock = new object();
43
44    private const int MAX_RESTARTS = 5;
45    private const int MAX_CONNECTION_RETRIES = 10;
46    private const int RETRY_TIMEOUT_SEC = 10;
47    private const int CHECK_RESULTS_TIMEOUT = 10;
48
49    private ChannelFactory<IGridServer> factory;
50
51    public JobManager(string address) {
52      this.address = address;
53    }
54
55    internal void Reset() {
56      ResetConnection();
57      lock(dictionaryLock) {
58        foreach(WaitHandle wh in waithandles.Values) wh.Close();
59        waithandles.Clear();
60        engines.Clear();
61        results.Clear();
62      }
63    }
64
65    private void ResetConnection() {
66      lock(connectionLock) {
67        // open a new channel
68        NetTcpBinding binding = new NetTcpBinding();
69        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
70        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
71        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
72        binding.Security.Mode = SecurityMode.None;
73
74        factory = new ChannelFactory<IGridServer>(binding);
75        server = factory.CreateChannel(new EndpointAddress(address));
76      }
77    }
78
79    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
80      ProcessingEngine engine = new ProcessingEngine(globalScope, operation);
81      byte[] zippedEngine = ZipEngine(engine);
82      Guid currentEngineGuid = Guid.Empty;
83      bool success = false;
84      int retryCount = 0;
85      do {
86        lock(connectionLock) {
87          try {
88            currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
89            success = true;
90          } catch(TimeoutException timeoutException) {
91            if(retryCount < MAX_CONNECTION_RETRIES) {
92              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
93              retryCount++;
94            } else {
95              throw new ApplicationException("Max retries reached.", timeoutException);
96            }
97          } catch(CommunicationException communicationException) {
98            ResetConnection();
99            // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution)
100            if(retryCount < MAX_CONNECTION_RETRIES) {
101              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
102              retryCount++;
103            } else {
104              throw new ApplicationException("Max retries reached.", communicationException);
105            }
106          }
107        }
108      } while(!success);
109      lock(dictionaryLock) {
110        engines[currentEngineGuid] = engine;
111        waithandles[currentEngineGuid] = new ManualResetEvent(false);
112      }
113      ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
114      return waithandles[currentEngineGuid];
115    }
116
117    private byte[] ZipEngine(ProcessingEngine engine) {
118      MemoryStream memStream = new MemoryStream();
119      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
120      PersistenceManager.Save(engine, stream);
121      stream.Close();
122      byte[] zippedEngine = memStream.ToArray();
123      memStream.Close();
124      return zippedEngine;
125    }
126
127    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
128      byte[] zippedResult = null;
129      lock(dictionaryLock) {
130        zippedResult = results[operation];
131        results.Remove(operation);
132      }
133      // restore the engine
134      using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
135        return (ProcessingEngine)PersistenceManager.Load(stream);
136      }     
137    }
138
139    private void TryGetResult(object state) {
140      Guid engineGuid = (Guid)state;
141      int restartCounter = 0;
142      do {
143        Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
144        byte[] zippedResult = null;
145        lock(connectionLock) {
146          bool success = false;
147          int retries = 0;
148          do {
149            try {
150              zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
151              success = true;
152            } catch(TimeoutException timeoutException) {
153              success = false;
154              retries++;
155              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
156            } catch(CommunicationException communicationException) {
157              ResetConnection();
158              success = false;
159              retries++;
160              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
161            }
162
163          } while(!success && retries < MAX_CONNECTION_RETRIES);
164        }
165        if(zippedResult != null) {
166          lock(dictionaryLock) {
167            // store result
168            results[engines[engineGuid].InitialOperation] = zippedResult;
169
170            // signal the wait handle and clean up then return
171            engines.Remove(engineGuid);
172            waithandles[engineGuid].Set();
173            waithandles.Remove(engineGuid);
174          }
175          return;
176        } else {
177          // check if the server is still working on the job
178          bool success = false;
179          int retries = 0;
180          JobState jobState = JobState.Unkown;
181          do {
182            try {
183              lock(connectionLock) {
184                jobState = server.JobState(engineGuid);
185              }
186              success = true;
187            } catch(TimeoutException timeoutException) {
188              retries++;
189              success = false;
190              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
191            } catch(CommunicationException communicationException) {
192              ResetConnection();
193              retries++;
194              success = false;
195              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
196            }
197          } while(!success && retries < MAX_CONNECTION_RETRIES);
198          if(jobState == JobState.Unkown) {
199            // restart job
200            ProcessingEngine engine;
201            lock(dictionaryLock) {
202              engine = engines[engineGuid];
203            }
204            byte[] zippedEngine = ZipEngine(engine);
205            success = false;
206            retries = 0;
207            do {
208              try {
209                lock(connectionLock) {
210                  server.BeginExecuteEngine(zippedEngine);
211                }
212                success = true;
213              } catch(TimeoutException timeoutException) {
214                success = false;
215                retries++;
216                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
217              } catch(CommunicationException communicationException) {
218                ResetConnection();
219                success = false;
220                retries++;
221                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
222              }
223            } while(!success && retries < MAX_CONNECTION_RETRIES);
224            restartCounter++;
225          }
226        }
227
228        // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem
229        if(restartCounter > MAX_RESTARTS) {
230          throw new ApplicationException("Maximum number of job restarts reached.");
231        }
232      } while(true);
233    }
234  }
235}
Note: See TracBrowser for help on using the repository browser.