#region License Information /* HeuristicLab * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.ServiceModel; using HeuristicLab.Grid; using System.Threading; using HeuristicLab.Core; using System.IO; using System.IO.Compression; using System.Windows.Forms; namespace HeuristicLab.DistributedEngine { class JobManager { private IGridServer server; private string address; private Dictionary engines = new Dictionary(); private Dictionary waithandles = new Dictionary(); private Dictionary results = new Dictionary(); private object connectionLock = new object(); private object dictionaryLock = new object(); private const int MAX_RESTARTS = 5; private const int MAX_CONNECTION_RETRIES = 10; private const int RETRY_TIMEOUT_SEC = 10; private const int CHECK_RESULTS_TIMEOUT = 10; private ChannelFactory factory; public JobManager(string address) { this.address = address; } internal void Reset() { ResetConnection(); lock(dictionaryLock) { foreach(WaitHandle wh in waithandles.Values) wh.Close(); waithandles.Clear(); engines.Clear(); results.Clear(); } } private void ResetConnection() { lock(connectionLock) { // open a new channel NetTcpBinding binding = new NetTcpBinding(); binding.MaxReceivedMessageSize = 100000000; // 100Mbytes binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; binding.Security.Mode = SecurityMode.None; factory = new ChannelFactory(binding); server = factory.CreateChannel(new EndpointAddress(address)); } } public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) { ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed? byte[] zippedEngine = ZipEngine(engine); Guid currentEngineGuid = Guid.Empty; bool success = false; int retryCount = 0; do { lock(connectionLock) { try { currentEngineGuid = server.BeginExecuteEngine(zippedEngine); success = true; } catch(TimeoutException timeoutException) { if(retryCount < MAX_CONNECTION_RETRIES) { Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); retryCount++; } else { throw new ApplicationException("Max retries reached.", timeoutException); } } catch(CommunicationException communicationException) { ResetConnection(); // wait some time and try again (limit with maximal retries if retry count reached throw exception -> engine can decide to stop execution) if(retryCount < MAX_CONNECTION_RETRIES) { Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); retryCount++; } else { throw new ApplicationException("Max retries reached.", communicationException); } } } } while(!success); lock(dictionaryLock) { engines[currentEngineGuid] = engine; waithandles[currentEngineGuid] = new ManualResetEvent(false); } ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid); return waithandles[currentEngineGuid]; } private byte[] ZipEngine(ProcessingEngine engine) { MemoryStream memStream = new MemoryStream(); GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); PersistenceManager.Save(engine, stream); stream.Close(); byte[] zippedEngine = memStream.ToArray(); memStream.Close(); return zippedEngine; } public IScope EndExecuteOperation(AtomicOperation operation) { byte[] zippedResult = null; lock(dictionaryLock) { zippedResult = results[operation]; results.Remove(operation); } // restore the engine using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) { ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream); return resultEngine.InitialOperation.Scope; } } private void TryGetResult(object state) { Guid engineGuid = (Guid)state; int restartCounter = 0; do { Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT)); byte[] zippedResult = null; lock(connectionLock) { bool success = false; int retries = 0; do { try { zippedResult = server.TryEndExecuteEngine(engineGuid, 100); success = true; } catch(TimeoutException timeoutException) { success = false; retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } catch(CommunicationException communicationException) { ResetConnection(); success = false; retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } } while(!success && retries < MAX_CONNECTION_RETRIES); } if(zippedResult != null) { lock(dictionaryLock) { // store result results[engines[engineGuid].InitialOperation] = zippedResult; // signal the wait handle and clean up then return engines.Remove(engineGuid); waithandles[engineGuid].Set(); waithandles.Remove(engineGuid); } return; } else { // check if the server is still working on the job bool success = false; int retries = 0; JobState jobState = JobState.Unkown; do { try { lock(connectionLock) { jobState = server.JobState(engineGuid); } success = true; } catch(TimeoutException timeoutException) { retries++; success = false; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } catch(CommunicationException communicationException) { ResetConnection(); retries++; success = false; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } } while(!success && retries < MAX_CONNECTION_RETRIES); if(jobState == JobState.Unkown) { // restart job ProcessingEngine engine; lock(dictionaryLock) { engine = engines[engineGuid]; } byte[] zippedEngine = ZipEngine(engine); success = false; retries = 0; do { try { lock(connectionLock) { server.BeginExecuteEngine(zippedEngine); } success = true; } catch(TimeoutException timeoutException) { success = false; retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } catch(CommunicationException communicationException) { ResetConnection(); success = false; retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } } while(!success && retries < MAX_CONNECTION_RETRIES); restartCounter++; } } // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem if(restartCounter > MAX_RESTARTS) { throw new ApplicationException("Maximum number of job restarts reached."); } } while(true); } } }