Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs @ 268

Last change on this file since 268 was 268, checked in by gkronber, 16 years ago

improved RAM footprint of distributed-engine by serializing only the part of the scope-tree that contains the scope (including sub-scopes) on which a parallel operation 'operates' on. (ticket #153)

File size: 8.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.IO.Compression;
32using System.Windows.Forms;
33
34namespace HeuristicLab.DistributedEngine {
35  class JobManager {
36    private IGridServer server;
37    private string address;
38    private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
39    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
40    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
41    private object connectionLock = new object();
42    private object dictionaryLock = new object();
43
44    private const int MAX_RESTARTS = 5;
45    private const int MAX_CONNECTION_RETRIES = 10;
46    private const int RETRY_TIMEOUT_SEC = 10;
47    private const int CHECK_RESULTS_TIMEOUT = 10;
48
49    private ChannelFactory<IGridServer> factory;
50
51    public JobManager(string address) {
52      this.address = address;
53    }
54
55    internal void Reset() {
56      ResetConnection();
57      lock(dictionaryLock) {
58        foreach(WaitHandle wh in waithandles.Values) wh.Close();
59        waithandles.Clear();
60        engines.Clear();
61        results.Clear();
62      }
63    }
64
65    private void ResetConnection() {
66      lock(connectionLock) {
67        // open a new channel
68        NetTcpBinding binding = new NetTcpBinding();
69        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
70        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
71        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
72        binding.Security.Mode = SecurityMode.None;
73
74        factory = new ChannelFactory<IGridServer>(binding);
75        server = factory.CreateChannel(new EndpointAddress(address));
76      }
77    }
78
79    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
80      ProcessingEngine engine = new ProcessingEngine(globalScope, operation);
81      byte[] zippedEngine = ZipEngine(engine);
82      Guid currentEngineGuid = Guid.Empty;
83      bool success = false;
84      int retryCount = 0;
85      do {
86        lock(connectionLock) {
87          try {
88            currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
89            success = true;
90          } catch(TimeoutException timeoutException) {
91            if(retryCount < MAX_CONNECTION_RETRIES) {
92              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
93              retryCount++;
94            } else {
95              throw new ApplicationException("Max retries reached.", timeoutException);
96            }
97          } catch(CommunicationException communicationException) {
98            ResetConnection();
99            // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution)
100            if(retryCount < MAX_CONNECTION_RETRIES) {
101              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
102              retryCount++;
103            } else {
104              throw new ApplicationException("Max retries reached.", communicationException);
105            }
106          }
107        }
108      } while(!success);
109      lock(dictionaryLock) {
110        engines[currentEngineGuid] = engine;
111        waithandles[currentEngineGuid] = new ManualResetEvent(false);
112      }
113      ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
114      return waithandles[currentEngineGuid];
115    }
116
117    private byte[] ZipEngine(ProcessingEngine engine) {
118      MemoryStream memStream = new MemoryStream();
119      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
120      PersistenceManager.Save(engine, stream);
121      stream.Close();
122      byte[] zippedEngine = memStream.ToArray();
123      memStream.Close();
124      return zippedEngine;
125    }
126
127    public IScope EndExecuteOperation(AtomicOperation operation) {
128      byte[] zippedResult = null;
129      lock(dictionaryLock) {
130        zippedResult = results[operation];
131        results.Remove(operation);
132      }
133      // restore the engine
134      using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
135        ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
136        return resultEngine.InitialOperation.Scope;
137      }     
138    }
139
140    private void TryGetResult(object state) {
141      Guid engineGuid = (Guid)state;
142      int restartCounter = 0;
143      do {
144        Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
145        byte[] zippedResult = null;
146        lock(connectionLock) {
147          bool success = false;
148          int retries = 0;
149          do {
150            try {
151              zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
152              success = true;
153            } catch(TimeoutException timeoutException) {
154              success = false;
155              retries++;
156              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
157            } catch(CommunicationException communicationException) {
158              ResetConnection();
159              success = false;
160              retries++;
161              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
162            }
163
164          } while(!success && retries < MAX_CONNECTION_RETRIES);
165        }
166        if(zippedResult != null) {
167          lock(dictionaryLock) {
168            // store result
169            results[engines[engineGuid].InitialOperation] = zippedResult;
170
171            // signal the wait handle and clean up then return
172            engines.Remove(engineGuid);
173            waithandles[engineGuid].Set();
174            waithandles.Remove(engineGuid);
175          }
176          return;
177        } else {
178          // check if the server is still working on the job
179          bool success = false;
180          int retries = 0;
181          JobState jobState = JobState.Unkown;
182          do {
183            try {
184              lock(connectionLock) {
185                jobState = server.JobState(engineGuid);
186              }
187              success = true;
188            } catch(TimeoutException timeoutException) {
189              retries++;
190              success = false;
191              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
192            } catch(CommunicationException communicationException) {
193              ResetConnection();
194              retries++;
195              success = false;
196              Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
197            }
198          } while(!success && retries < MAX_CONNECTION_RETRIES);
199          if(jobState == JobState.Unkown) {
200            // restart job
201            ProcessingEngine engine;
202            lock(dictionaryLock) {
203              engine = engines[engineGuid];
204            }
205            byte[] zippedEngine = ZipEngine(engine);
206            success = false;
207            retries = 0;
208            do {
209              try {
210                lock(connectionLock) {
211                  server.BeginExecuteEngine(zippedEngine);
212                }
213                success = true;
214              } catch(TimeoutException timeoutException) {
215                success = false;
216                retries++;
217                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
218              } catch(CommunicationException communicationException) {
219                ResetConnection();
220                success = false;
221                retries++;
222                Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
223              }
224            } while(!success && retries < MAX_CONNECTION_RETRIES);
225            restartCounter++;
226          }
227        }
228
229        // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem
230        if(restartCounter > MAX_RESTARTS) {
231          throw new ApplicationException("Maximum number of job restarts reached.");
232        }
233      } while(true);
234    }
235  }
236}
Note: See TracBrowser for help on using the repository browser.