#region License Information /* HeuristicLab * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.ServiceModel; using HeuristicLab.Grid; using System.Threading; using HeuristicLab.Core; using System.IO; using System.IO.Compression; using System.Windows.Forms; namespace HeuristicLab.DistributedEngine { class JobManager { private IGridServer server; private string address; private Dictionary engines = new Dictionary(); private Dictionary waithandles = new Dictionary(); private Dictionary results = new Dictionary(); private List erroredOperations = new List(); private object connectionLock = new object(); private object dictionaryLock = new object(); private const int MAX_RESTARTS = 5; private const int MAX_CONNECTION_RETRIES = 10; private const int RETRY_TIMEOUT_SEC = 60; private const int CHECK_RESULTS_TIMEOUT = 10; private ChannelFactory factory; public JobManager(string address) { this.address = address; } internal void Reset() { ResetConnection(); lock(dictionaryLock) { foreach(WaitHandle wh in waithandles.Values) wh.Close(); waithandles.Clear(); engines.Clear(); results.Clear(); erroredOperations.Clear(); } } private void ResetConnection() { lock(connectionLock) { // open a new channel NetTcpBinding binding = new NetTcpBinding(); binding.MaxReceivedMessageSize = 100000000; // 100Mbytes binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; binding.Security.Mode = SecurityMode.None; factory = new ChannelFactory(binding); server = factory.CreateChannel(new EndpointAddress(address)); } } public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) { ProcessingEngine engine = new ProcessingEngine(globalScope, operation); byte[] zippedEngine = ZipEngine(engine); Guid currentEngineGuid = Guid.Empty; bool success = false; int retryCount = 0; do { try { lock(connectionLock) { currentEngineGuid = server.BeginExecuteEngine(zippedEngine); } success = true; } catch(TimeoutException timeoutException) { if(retryCount++ >= MAX_CONNECTION_RETRIES) { throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", timeoutException); } Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } catch(CommunicationException communicationException) { if(retryCount++ >= MAX_CONNECTION_RETRIES) { throw new ApplicationException("Maximal number of connection attempts reached. There is a problem with the connection to the grid-server.", communicationException); } ResetConnection(); Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } } while(!success); lock(dictionaryLock) { engines[currentEngineGuid] = engine; waithandles[currentEngineGuid] = new ManualResetEvent(false); } ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid); return waithandles[currentEngineGuid]; } private byte[] ZipEngine(ProcessingEngine engine) { MemoryStream memStream = new MemoryStream(); GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true); PersistenceManager.Save(engine, stream); stream.Close(); byte[] zippedEngine = memStream.ToArray(); memStream.Close(); return zippedEngine; } public ProcessingEngine EndExecuteOperation(AtomicOperation operation) { if(erroredOperations.Contains(operation)) { erroredOperations.Remove(operation); throw new ApplicationException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server."); } else { byte[] zippedResult = null; lock(dictionaryLock) { zippedResult = results[operation]; results.Remove(operation); } // restore the engine using(GZipStream stream = new GZipStream(new MemoryStream(zippedResult), CompressionMode.Decompress)) { return (ProcessingEngine)PersistenceManager.Load(stream); } } } private void TryGetResult(object state) { Guid engineGuid = (Guid)state; int restartCounter = 0; do { Thread.Sleep(TimeSpan.FromSeconds(CHECK_RESULTS_TIMEOUT)); byte[] zippedResult = TryEndExecuteEngine(server, engineGuid); if(zippedResult != null) { // successful lock(dictionaryLock) { // store result results[engines[engineGuid].InitialOperation] = zippedResult; // clean up and signal the wait handle then return engines.Remove(engineGuid); waithandles[engineGuid].Set(); waithandles.Remove(engineGuid); } return; } else { // there was a problem -> check the state of the job and restart if necessary JobState jobState = TryGetJobState(server, engineGuid); if(jobState == JobState.Unkown) { TryRestartJob(engineGuid); restartCounter++; } } } while(restartCounter < MAX_RESTARTS); lock(dictionaryLock) { // the job was never finished and restarting didn't help -> stop trying to execute the job and // save the faulted operation in a list to throw an exception when EndExecuteEngine is called. erroredOperations.Add(engines[engineGuid].InitialOperation); // clean up and signal the wait handle engines.Remove(engineGuid); waithandles[engineGuid].Set(); waithandles.Remove(engineGuid); } } private void TryRestartJob(Guid engineGuid) { // restart job ProcessingEngine engine; lock(dictionaryLock) { engine = engines[engineGuid]; } byte[] zippedEngine = ZipEngine(engine); int retries = 0; do { try { lock(connectionLock) { server.BeginExecuteEngine(zippedEngine); } return; } catch(TimeoutException timeoutException) { retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } catch(CommunicationException communicationException) { ResetConnection(); retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } } while(retries < MAX_CONNECTION_RETRIES); } private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) { int retries = 0; do { try { lock(connectionLock) { byte[] zippedResult = server.TryEndExecuteEngine(engineGuid, 100); return zippedResult; } } catch(TimeoutException timeoutException) { retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } catch(CommunicationException communicationException) { ResetConnection(); retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } } while(retries < MAX_CONNECTION_RETRIES); return null; } private JobState TryGetJobState(IGridServer server, Guid engineGuid) { // check if the server is still working on the job int retries = 0; do { try { lock(connectionLock) { JobState jobState = server.JobState(engineGuid); return jobState; } } catch(TimeoutException timeoutException) { retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } catch(CommunicationException communicationException) { ResetConnection(); retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } } while(retries < MAX_CONNECTION_RETRIES); return JobState.Unkown; } } }