Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs @ 336

Last change on this file since 336 was 315, checked in by gkronber, 17 years ago

improved code for distributed-engine job-manager. Also removed the throwing of an uncaught exception in a thread-pool thread.

File size: 9.2 KB
RevLine 
[265]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
[219]23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.IO.Compression;
32using System.Windows.Forms;
33
34namespace HeuristicLab.DistributedEngine {
35  class JobManager {
36    private IGridServer server;
37    private string address;
[257]38    private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
[219]39    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
[248]40    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
[315]41    private List<IOperation> erroredOperations = new List<IOperation>();
[248]42    private object connectionLock = new object();
43    private object dictionaryLock = new object();
44
[219]45    private const int MAX_RESTARTS = 5;
[248]46    private const int MAX_CONNECTION_RETRIES = 10;
[315]47    private const int RETRY_TIMEOUT_SEC = 60;
[248]48    private const int CHECK_RESULTS_TIMEOUT = 10;
49
[228]50    private ChannelFactory<IGridServer> factory;
[219]51
52    public JobManager(string address) {
53      this.address = address;
54    }
55
56    internal void Reset() {
[248]57      ResetConnection();
58      lock(dictionaryLock) {
[219]59        foreach(WaitHandle wh in waithandles.Values) wh.Close();
60        waithandles.Clear();
[257]61        engines.Clear();
[248]62        results.Clear();
[315]63        erroredOperations.Clear();
[219]64      }
65    }
66
[228]67    private void ResetConnection() {
[248]68      lock(connectionLock) {
69        // open a new channel
70        NetTcpBinding binding = new NetTcpBinding();
71        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
72        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
73        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
74        binding.Security.Mode = SecurityMode.None;
75
76        factory = new ChannelFactory<IGridServer>(binding);
77        server = factory.CreateChannel(new EndpointAddress(address));
78      }
[228]79    }
80
[268]81    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
82      ProcessingEngine engine = new ProcessingEngine(globalScope, operation);
[257]83      byte[] zippedEngine = ZipEngine(engine);
[248]84      Guid currentEngineGuid = Guid.Empty;
85      bool success = false;
86      int retryCount = 0;
87      do {
[315]88        try {
89          lock(connectionLock) {
[248]90            currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
91          }
[315]92          success = true;
93        } catch(TimeoutException timeoutException) {
94          if(retryCount++ >= MAX_CONNECTION_RETRIES) {
95            throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException);
96          }
97          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
98        } catch(CommunicationException communicationException) {
99          if(retryCount++ >= MAX_CONNECTION_RETRIES) {
100            throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException);
101          }
102          ResetConnection();
103          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
[248]104        }
105      } while(!success);
106      lock(dictionaryLock) {
[257]107        engines[currentEngineGuid] = engine;
[219]108        waithandles[currentEngineGuid] = new ManualResetEvent(false);
109      }
[248]110      ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
[219]111      return waithandles[currentEngineGuid];
112    }
113
[257]114    private byte[] ZipEngine(ProcessingEngine engine) {
115      MemoryStream memStream = new MemoryStream();
116      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
117      PersistenceManager.Save(engine, stream);
118      stream.Close();
119      byte[] zippedEngine = memStream.ToArray();
120      memStream.Close();
121      return zippedEngine;
122    }
123
[281]124    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
[315]125      if(erroredOperations.Contains(operation)) {
126        erroredOperations.Remove(operation);
127        throw new ApplicationException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
128      } else {
129        byte[] zippedResult = null;
130        lock(dictionaryLock) {
131          zippedResult = results[operation];
132          results.Remove(operation);
133        }
134        // restore the engine
135        using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
136          return (ProcessingEngine)PersistenceManager.Load(stream);
137        }
[256]138      }
[248]139    }
140
[219]141    private void TryGetResult(object state) {
142      Guid engineGuid = (Guid)state;
143      int restartCounter = 0;
144      do {
[258]145        Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
[315]146        byte[] zippedResult = TryEndExecuteEngine(server, engineGuid);
147        if(zippedResult != null) { // successful
[248]148          lock(dictionaryLock) {
149            // store result
[257]150            results[engines[engineGuid].InitialOperation] = zippedResult;
[315]151            // clean up and signal the wait handle then return
[257]152            engines.Remove(engineGuid);
[248]153            waithandles[engineGuid].Set();
154            waithandles.Remove(engineGuid);
155          }
156          return;
157        } else {
[315]158          // there was a problem -> check the state of the job and restart if necessary
159          JobState jobState = TryGetJobState(server, engineGuid);
[248]160          if(jobState == JobState.Unkown) {
[315]161            TryRestartJob(engineGuid);
[248]162            restartCounter++;
[219]163          }
164        }
[315]165      } while(restartCounter < MAX_RESTARTS);
166      lock(dictionaryLock) {
167        // the job was never finished and restarting didn't help -> stop trying to execute the job and
168        // save the faulted operation in a list to throw an exception when EndExecuteEngine is called.
169        erroredOperations.Add(engines[engineGuid].InitialOperation);
170        // clean up and signal the wait handle
171        engines.Remove(engineGuid);
172        waithandles[engineGuid].Set();
173        waithandles.Remove(engineGuid);
174      }
175    }
[219]176
[315]177    private void TryRestartJob(Guid engineGuid) {
178      // restart job
179      ProcessingEngine engine;
180      lock(dictionaryLock) {
181        engine = engines[engineGuid];
182      }
183      byte[] zippedEngine = ZipEngine(engine);
184      int retries = 0;
185      do {
186        try {
187          lock(connectionLock) {
188            server.BeginExecuteEngine(zippedEngine);
189          }
190          return;
191        } catch(TimeoutException timeoutException) {
192          retries++;
193          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
194        } catch(CommunicationException communicationException) {
195          ResetConnection();
196          retries++;
197          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
[219]198        }
[315]199      } while(retries < MAX_CONNECTION_RETRIES);
[219]200    }
[315]201
202    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
203      int retries = 0;
204      do {
205        try {
206          lock(connectionLock) {
207            byte[] zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
208            return zippedResult;
209          }
210        } catch(TimeoutException timeoutException) {
211          retries++;
212          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
213        } catch(CommunicationException communicationException) {
214          ResetConnection();
215          retries++;
216          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
217        }
218      } while(retries < MAX_CONNECTION_RETRIES);
219      return null;
220    }
221
222    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
223      // check if the server is still working on the job
224      int retries = 0;
225      do {
226        try {
227          lock(connectionLock) {
228            JobState jobState = server.JobState(engineGuid);
229            return jobState;
230          }
231        } catch(TimeoutException timeoutException) {
232          retries++;
233          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
234        } catch(CommunicationException communicationException) {
235          ResetConnection();
236          retries++;
237          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
238        }
239      } while(retries < MAX_CONNECTION_RETRIES);
240      return JobState.Unkown;
241     
242    }
[219]243  }
244}
Note: See TracBrowser for help on using the repository browser.