Free cookie consent management tool by TermsFeed Policy Generator

source: branches/Ticket175Branch/HeuristicLab.DistributedEngine/JobManager.cs @ 343

Last change on this file since 343 was 315, checked in by gkronber, 17 years ago

improved code for distributed-engine job-manager. Also removed the throwing of an uncaught exception in a thread-pool thread.

File size: 9.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.IO.Compression;
32using System.Windows.Forms;
33
34namespace HeuristicLab.DistributedEngine {
35  class JobManager {
36    private IGridServer server;
37    private string address;
38    private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
39    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
40    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
41    private List<IOperation> erroredOperations = new List<IOperation>();
42    private object connectionLock = new object();
43    private object dictionaryLock = new object();
44
45    private const int MAX_RESTARTS = 5;
46    private const int MAX_CONNECTION_RETRIES = 10;
47    private const int RETRY_TIMEOUT_SEC = 60;
48    private const int CHECK_RESULTS_TIMEOUT = 10;
49
50    private ChannelFactory<IGridServer> factory;
51
52    public JobManager(string address) {
53      this.address = address;
54    }
55
56    internal void Reset() {
57      ResetConnection();
58      lock(dictionaryLock) {
59        foreach(WaitHandle wh in waithandles.Values) wh.Close();
60        waithandles.Clear();
61        engines.Clear();
62        results.Clear();
63        erroredOperations.Clear();
64      }
65    }
66
67    private void ResetConnection() {
68      lock(connectionLock) {
69        // open a new channel
70        NetTcpBinding binding = new NetTcpBinding();
71        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
72        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
73        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
74        binding.Security.Mode = SecurityMode.None;
75
76        factory = new ChannelFactory<IGridServer>(binding);
77        server = factory.CreateChannel(new EndpointAddress(address));
78      }
79    }
80
81    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
82      ProcessingEngine engine = new ProcessingEngine(globalScope, operation);
83      byte[] zippedEngine = ZipEngine(engine);
84      Guid currentEngineGuid = Guid.Empty;
85      bool success = false;
86      int retryCount = 0;
87      do {
88        try {
89          lock(connectionLock) {
90            currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
91          }
92          success = true;
93        } catch(TimeoutException timeoutException) {
94          if(retryCount++ >= MAX_CONNECTION_RETRIES) {
95            throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException);
96          }
97          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
98        } catch(CommunicationException communicationException) {
99          if(retryCount++ >= MAX_CONNECTION_RETRIES) {
100            throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException);
101          }
102          ResetConnection();
103          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
104        }
105      } while(!success);
106      lock(dictionaryLock) {
107        engines[currentEngineGuid] = engine;
108        waithandles[currentEngineGuid] = new ManualResetEvent(false);
109      }
110      ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
111      return waithandles[currentEngineGuid];
112    }
113
114    private byte[] ZipEngine(ProcessingEngine engine) {
115      MemoryStream memStream = new MemoryStream();
116      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
117      PersistenceManager.Save(engine, stream);
118      stream.Close();
119      byte[] zippedEngine = memStream.ToArray();
120      memStream.Close();
121      return zippedEngine;
122    }
123
124    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
125      if(erroredOperations.Contains(operation)) {
126        erroredOperations.Remove(operation);
127        throw new ApplicationException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
128      } else {
129        byte[] zippedResult = null;
130        lock(dictionaryLock) {
131          zippedResult = results[operation];
132          results.Remove(operation);
133        }
134        // restore the engine
135        using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
136          return (ProcessingEngine)PersistenceManager.Load(stream);
137        }
138      }
139    }
140
141    private void TryGetResult(object state) {
142      Guid engineGuid = (Guid)state;
143      int restartCounter = 0;
144      do {
145        Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
146        byte[] zippedResult = TryEndExecuteEngine(server, engineGuid);
147        if(zippedResult != null) { // successful
148          lock(dictionaryLock) {
149            // store result
150            results[engines[engineGuid].InitialOperation] = zippedResult;
151            // clean up and signal the wait handle then return
152            engines.Remove(engineGuid);
153            waithandles[engineGuid].Set();
154            waithandles.Remove(engineGuid);
155          }
156          return;
157        } else {
158          // there was a problem -> check the state of the job and restart if necessary
159          JobState jobState = TryGetJobState(server, engineGuid);
160          if(jobState == JobState.Unkown) {
161            TryRestartJob(engineGuid);
162            restartCounter++;
163          }
164        }
165      } while(restartCounter < MAX_RESTARTS);
166      lock(dictionaryLock) {
167        // the job was never finished and restarting didn't help -> stop trying to execute the job and
168        // save the faulted operation in a list to throw an exception when EndExecuteEngine is called.
169        erroredOperations.Add(engines[engineGuid].InitialOperation);
170        // clean up and signal the wait handle
171        engines.Remove(engineGuid);
172        waithandles[engineGuid].Set();
173        waithandles.Remove(engineGuid);
174      }
175    }
176
177    private void TryRestartJob(Guid engineGuid) {
178      // restart job
179      ProcessingEngine engine;
180      lock(dictionaryLock) {
181        engine = engines[engineGuid];
182      }
183      byte[] zippedEngine = ZipEngine(engine);
184      int retries = 0;
185      do {
186        try {
187          lock(connectionLock) {
188            server.BeginExecuteEngine(zippedEngine);
189          }
190          return;
191        } catch(TimeoutException timeoutException) {
192          retries++;
193          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
194        } catch(CommunicationException communicationException) {
195          ResetConnection();
196          retries++;
197          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
198        }
199      } while(retries < MAX_CONNECTION_RETRIES);
200    }
201
202    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
203      int retries = 0;
204      do {
205        try {
206          lock(connectionLock) {
207            byte[] zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
208            return zippedResult;
209          }
210        } catch(TimeoutException timeoutException) {
211          retries++;
212          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
213        } catch(CommunicationException communicationException) {
214          ResetConnection();
215          retries++;
216          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
217        }
218      } while(retries < MAX_CONNECTION_RETRIES);
219      return null;
220    }
221
222    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
223      // check if the server is still working on the job
224      int retries = 0;
225      do {
226        try {
227          lock(connectionLock) {
228            JobState jobState = server.JobState(engineGuid);
229            return jobState;
230          }
231        } catch(TimeoutException timeoutException) {
232          retries++;
233          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
234        } catch(CommunicationException communicationException) {
235          ResetConnection();
236          retries++;
237          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
238        }
239      } while(retries < MAX_CONNECTION_RETRIES);
240      return JobState.Unkown;
241     
242    }
243  }
244}
Note: See TracBrowser for help on using the repository browser.