Free cookie consent management tool by TermsFeed Policy Generator

source: branches/Collections/sources/HeuristicLab.Grid/JobManager.cs @ 384

Last change on this file since 384 was 384, checked in by gkronber, 16 years ago

merged changesets r382 and r383 (fix references and compiler warnings) into the "collections" branch

File size: 9.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Grid;
28using System.Threading;
29using HeuristicLab.Core;
30using System.IO;
31using System.IO.Compression;
32using System.Windows.Forms;
33
34namespace HeuristicLab.Grid {
35  public class JobManager {
36    private IGridServer server;
37    private string address;
38    private Dictionary<Guid, ProcessingEngine> engines = new Dictionary<Guid, ProcessingEngine>();
39    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
40    private Dictionary<AtomicOperation, byte[]> results = new Dictionary<AtomicOperation, byte[]>();
41    private List<IOperation> erroredOperations = new List<IOperation>();
42    private object connectionLock = new object();
43    private object dictionaryLock = new object();
44
45    private const int MAX_RESTARTS = 5;
46    private const int MAX_CONNECTION_RETRIES = 10;
47    private const int RETRY_TIMEOUT_SEC = 60;
48    private const int CHECK_RESULTS_TIMEOUT = 10;
49
50    private ChannelFactory<IGridServer> factory;
51
52    public JobManager(string address) {
53      this.address = address;
54    }
55
56    public void Reset() {
57      ResetConnection();
58      lock(dictionaryLock) {
59        foreach(WaitHandle wh in waithandles.Values) wh.Close();
60        waithandles.Clear();
61        engines.Clear();
62        results.Clear();
63        erroredOperations.Clear();
64      }
65    }
66
67    private void ResetConnection() {
68      lock(connectionLock) {
69        // open a new channel
70        NetTcpBinding binding = new NetTcpBinding();
71        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
72        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
73        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
74        binding.Security.Mode = SecurityMode.None;
75
76        factory = new ChannelFactory<IGridServer>(binding);
77        server = factory.CreateChannel(new EndpointAddress(address));
78      }
79    }
80
81    public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) {
82      ProcessingEngine engine = new ProcessingEngine(globalScope, operation);
83      byte[] zippedEngine = ZipEngine(engine);
84      Guid currentEngineGuid = Guid.Empty;
85      bool success = false;
86      int retryCount = 0;
87      do {
88        try {
89          lock(connectionLock) {
90            currentEngineGuid = server.BeginExecuteEngine(zippedEngine);
91          }
92          success = true;
93        } catch(TimeoutException timeoutException) {
94          if(retryCount++ >= MAX_CONNECTION_RETRIES) {
95            throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException);
96          }
97          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
98        } catch(CommunicationException communicationException) {
99          if(retryCount++ >= MAX_CONNECTION_RETRIES) {
100            throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException);
101          }
102          ResetConnection();
103          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
104        }
105      } while(!success);
106      lock(dictionaryLock) {
107        engines[currentEngineGuid] = engine;
108        waithandles[currentEngineGuid] = new ManualResetEvent(false);
109      }
110      ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
111      return waithandles[currentEngineGuid];
112    }
113
114    private byte[] ZipEngine(ProcessingEngine engine) {
115      MemoryStream memStream = new MemoryStream();
116      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
117      PersistenceManager.Save(engine, stream);
118      stream.Close();
119      byte[] zippedEngine = memStream.ToArray();
120      memStream.Close();
121      return zippedEngine;
122    }
123
124    public ProcessingEngine EndExecuteOperation(AtomicOperation operation) {
125      if(erroredOperations.Contains(operation)) {
126        erroredOperations.Remove(operation);
127        throw new ApplicationException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server.");
128      } else {
129        byte[] zippedResult = null;
130        lock(dictionaryLock) {
131          zippedResult = results[operation];
132          results.Remove(operation);
133        }
134        // restore the engine
135        using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) {
136          return (ProcessingEngine)PersistenceManager.Load(stream);
137        }
138      }
139    }
140
141    private void TryGetResult(object state) {
142      Guid engineGuid = (Guid)state;
143      int restartCounter = 0;
144      do {
145        Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT));
146        byte[] zippedResult = TryEndExecuteEngine(server, engineGuid);
147        if(zippedResult != null) { // successful
148          lock(dictionaryLock) {
149            // store result
150            results[engines[engineGuid].InitialOperation] = zippedResult;
151            // clean up and signal the wait handle then return
152            engines.Remove(engineGuid);
153            waithandles[engineGuid].Set();
154            waithandles.Remove(engineGuid);
155          }
156          return;
157        } else {
158          // there was a problem -> check the state of the job and restart if necessary
159          JobState jobState = TryGetJobState(server, engineGuid);
160          if(jobState == JobState.Unkown) {
161            TryRestartJob(engineGuid);
162            restartCounter++;
163          }
164        }
165      } while(restartCounter < MAX_RESTARTS);
166      lock(dictionaryLock) {
167        // the job was never finished and restarting didn't help -> stop trying to execute the job and
168        // save the faulted operation in a list to throw an exception when EndExecuteEngine is called.
169        erroredOperations.Add(engines[engineGuid].InitialOperation);
170        // clean up and signal the wait handle
171        engines.Remove(engineGuid);
172        waithandles[engineGuid].Set();
173        waithandles.Remove(engineGuid);
174      }
175    }
176
177    private void TryRestartJob(Guid engineGuid) {
178      // restart job
179      ProcessingEngine engine;
180      lock(dictionaryLock) {
181        engine = engines[engineGuid];
182      }
183      byte[] zippedEngine = ZipEngine(engine);
184      int retries = 0;
185      do {
186        try {
187          lock(connectionLock) {
188            server.BeginExecuteEngine(zippedEngine);
189          }
190          return;
191        } catch(TimeoutException) {
192          retries++;
193          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
194        } catch(CommunicationException) {
195          ResetConnection();
196          retries++;
197          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
198        }
199      } while(retries < MAX_CONNECTION_RETRIES);
200    }
201
202    private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) {
203      int retries = 0;
204      do {
205        try {
206          lock(connectionLock) {
207            byte[] zippedResult = server.TryEndExecuteEngine(engineGuid, 100);
208            return zippedResult;
209          }
210        } catch(TimeoutException) {
211          retries++;
212          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
213        } catch(CommunicationException) {
214          ResetConnection();
215          retries++;
216          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
217        }
218      } while(retries < MAX_CONNECTION_RETRIES);
219      return null;
220    }
221
222    private JobState TryGetJobState(IGridServer server, Guid engineGuid) {
223      // check if the server is still working on the job
224      int retries = 0;
225      do {
226        try {
227          lock(connectionLock) {
228            JobState jobState = server.JobState(engineGuid);
229            return jobState;
230          }
231        } catch(TimeoutException) {
232          retries++;
233          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
234        } catch(CommunicationException) {
235          ResetConnection();
236          retries++;
237          Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC));
238        }
239      } while(retries < MAX_CONNECTION_RETRIES);
240      return JobState.Unkown;
241    }
242  }
243}
Note: See TracBrowser for help on using the repository browser.