#region License Information /* HeuristicLab * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.ServiceModel; using HeuristicLab.Grid; using System.Threading; using HeuristicLab.Core; using System.IO; using System.Windows.Forms; using System.Diagnostics; namespace HeuristicLab.Grid { public class JobExecutionException : ApplicationException { public JobExecutionException(string msg) : base(msg) { } } public class JobManager { private const int MAX_RESTARTS = 5; private const int MAX_CONNECTION_RETRIES = 10; private const int RETRY_TIMEOUT_SEC = 60; private const int RESULT_POLLING_TIMEOUT = 5; private class Job { public Guid guid; public ProcessingEngine engine; public ManualResetEvent waitHandle; public int restarts; } private IGridServer server; private string address; private object waitingQueueLock = new object(); private Queue waitingJobs = new Queue(); private object runningQueueLock = new object(); private Queue runningJobs = new Queue(); private Dictionary results = new Dictionary(); private List erroredOperations = new List(); private object connectionLock = new object(); private object dictionaryLock = new object(); private AutoResetEvent runningWaitHandle = new AutoResetEvent(false); private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false); private ChannelFactory factory; public JobManager(string address) { this.address = address; Thread starterThread = new Thread(StartEngines); Thread resultsGatheringThread = new Thread(GetResults); starterThread.Start(); resultsGatheringThread.Start(); } public void Reset() { ResetConnection(); lock(dictionaryLock) { foreach(Job j in waitingJobs) { j.waitHandle.Close(); } waitingJobs.Clear(); foreach(Job j in runningJobs) { j.waitHandle.Close(); } runningJobs.Clear(); results.Clear(); erroredOperations.Clear(); } } private void ResetConnection() { Trace.TraceInformation("Reset connection in JobManager"); lock(connectionLock) { // open a new channel NetTcpBinding binding = new NetTcpBinding(); binding.MaxReceivedMessageSize = 100000000; // 100Mbytes binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; factory = new ChannelFactory(binding); server = factory.CreateChannel(new EndpointAddress(address)); } } public void StartEngines() { try { while(true) { Job job = null; lock(waitingQueueLock) { if(waitingJobs.Count > 0) job = waitingJobs.Dequeue(); } if(job==null) waitingWaitHandle.WaitOne(); // no jobs waiting else { Guid currentEngineGuid = TryStartExecuteEngine(job.engine); if(currentEngineGuid == Guid.Empty) { // couldn't start the job -> requeue if(job.restarts < MAX_RESTARTS) { job.restarts++; lock(waitingQueueLock) waitingJobs.Enqueue(job); waitingWaitHandle.Set(); } else { // max restart count reached -> give up on this job and flag error lock(dictionaryLock) { erroredOperations.Add(job.engine.InitialOperation); job.waitHandle.Set(); } } } else { // job started successfully job.guid = currentEngineGuid; lock(runningQueueLock) { runningJobs.Enqueue(job); runningWaitHandle.Set(); } } } } } catch(Exception e) { Trace.TraceError("Exception "+e+" in JobManager.StartEngines() killed the start-engine thread\n"+e.StackTrace); } } public void GetResults() { try { while(true) { Job job = null; lock(runningQueueLock) { if(runningJobs.Count > 0) job = runningJobs.Dequeue(); } if(job == null) runningWaitHandle.WaitOne(); // no jobs running else { byte[] zippedResult = TryEndExecuteEngine(server, job.guid); if(zippedResult != null) { // successful lock(dictionaryLock) { // store result results[job.engine.InitialOperation] = zippedResult; // notify consumer that result is ready job.waitHandle.Set(); } } else { // there was a problem -> check the state of the job and restart if necessary JobState jobState = TryGetJobState(server, job.guid); if(jobState == JobState.Unknown) { job.restarts++; lock(waitingQueueLock) { waitingJobs.Enqueue(job); waitingWaitHandle.Set(); } } else { // job still active at the server lock(runningQueueLock) { runningJobs.Enqueue(job); runningWaitHandle.Set(); } Thread.Sleep(TimeSpan.FromSeconds(RESULT_POLLING_TIMEOUT)); // sleep a while before trying to get the next result } } } } } catch(Exception e) { Trace.TraceError("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n"+ e.StackTrace); } } public WaitHandle BeginExecuteOperation(IScope globalScope, AtomicOperation operation) { return BeginExecuteEngine(new ProcessingEngine(globalScope, operation)); } public WaitHandle BeginExecuteEngine(ProcessingEngine engine) { Job job = new Job(); job.engine = engine; job.waitHandle = new ManualResetEvent(false); job.restarts = 0; lock(waitingQueueLock) { waitingJobs.Enqueue(job); } waitingWaitHandle.Set(); return job.waitHandle; } private byte[] ZipEngine(ProcessingEngine engine) { return PersistenceManager.SaveToGZip(engine); } public ProcessingEngine EndExecuteOperation(AtomicOperation operation) { if(erroredOperations.Contains(operation)) { erroredOperations.Remove(operation); throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server."); } else { byte[] zippedResult = null; lock(dictionaryLock) { zippedResult = results[operation]; results.Remove(operation); } // restore the engine return (ProcessingEngine)PersistenceManager.RestoreFromGZip(zippedResult); } } private Guid TryStartExecuteEngine(ProcessingEngine engine) { byte[] zippedEngine = ZipEngine(engine); int retries = 0; Guid guid = Guid.Empty; do { try { lock(connectionLock) { guid = server.BeginExecuteEngine(zippedEngine); } return guid; } catch(TimeoutException) { retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } catch(CommunicationException) { ResetConnection(); retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } } while(retries < MAX_CONNECTION_RETRIES); Trace.TraceWarning("Reached max connection retries in TryStartExecuteEngine"); return Guid.Empty; } private byte[] TryEndExecuteEngine(IGridServer server, Guid engineGuid) { int retries = 0; do { try { lock(connectionLock) { byte[] zippedResult = server.TryEndExecuteEngine(engineGuid); return zippedResult; } } catch(TimeoutException) { retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } catch(CommunicationException) { ResetConnection(); retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } } while(retries < MAX_CONNECTION_RETRIES); Trace.TraceWarning("Reached max connection retries in TryEndExecuteEngine"); return null; } private JobState TryGetJobState(IGridServer server, Guid engineGuid) { // check if the server is still working on the job int retries = 0; do { try { lock(connectionLock) { JobState jobState = server.JobState(engineGuid); return jobState; } } catch(TimeoutException) { retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } catch(CommunicationException) { ResetConnection(); retries++; Thread.Sleep(TimeSpan.FromSeconds(RETRY_TIMEOUT_SEC)); } } while(retries < MAX_CONNECTION_RETRIES); Trace.TraceWarning("Reached max connection retries in TryGetJobState"); return JobState.Unknown; } } }