#region License Information /* HeuristicLab * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.ServiceModel; using System.Diagnostics; using System.Threading; using HeuristicLab.PluginInfrastructure; namespace HeuristicLab.Grid { class GridClient { private string uri; private ChannelFactory factory; private System.Timers.Timer fetchOperationTimer; private IEngineStore engineStore; private object connectionLock = new object(); private const int CONNECTION_RETRY_TIMEOUT_SEC = 10; private const int MAX_RETRIES = 10; public bool Waiting { get { return !executing && !stopped; } } private bool executing; public bool Executing { get { return executing; } } private bool stopped; public bool Stopped { get { return stopped; } } private string statusMessage = ""; public string StatusMessage { get { return statusMessage; } } internal GridClient() { fetchOperationTimer = new System.Timers.Timer(); fetchOperationTimer.Interval = 200; fetchOperationTimer.Elapsed += new System.Timers.ElapsedEventHandler(fetchOperationTimer_Elapsed); stopped = true; } internal void Start(string uri) { try { this.uri = uri; ResetConnection(); fetchOperationTimer.Start(); stopped = false; } catch(CommunicationException ex) { statusMessage = DateTime.Now.ToShortTimeString()+": Exception while connecting to the server: " + ex.Message; fetchOperationTimer.Stop(); } } internal void Stop() { fetchOperationTimer.Stop(); lock(connectionLock) { if(factory.State == CommunicationState.Opened || factory.State == CommunicationState.Opening) { IAsyncResult closeResult = factory.BeginClose(null, null); factory.EndClose(closeResult); } } stopped = true; } private void ResetConnection() { Trace.TraceInformation("Reset connection in GridClient"); NetTcpBinding binding = new NetTcpBinding(); binding.MaxReceivedMessageSize = 100000000; // 100Mbytes binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements; factory = new ChannelFactory(binding); engineStore = factory.CreateChannel(new EndpointAddress(uri)); } private void fetchOperationTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { try { byte[] engineXml = null; Guid currentGuid; // first stop the timer! fetchOperationTimer.Stop(); bool gotEngine = false; lock(connectionLock) { if(stopped) return; try { gotEngine = engineStore.TryTakeEngine(out currentGuid, out engineXml); } catch(TimeoutException) { ChangeStatusMessage("TimeoutException while trying to get an engine"); currentGuid = Guid.Empty; // timeout -> just start the timer again fetchOperationTimer.Interval = 5000; fetchOperationTimer.Start(); } catch(CommunicationException) { ChangeStatusMessage("CommunicationException while trying to get an engine"); // connection problem -> reset connection and start the timer again ResetConnection(); currentGuid = Guid.Empty; fetchOperationTimer.Interval = 5000; fetchOperationTimer.Start(); } } // got engine from server and user didn't press stop -> execute the engine if(gotEngine && !stopped) { executing = true; AppDomain engineDomain = PluginManager.Manager.CreateAndInitAppDomain("Engine domain"); Type engineRunnerType = typeof(EngineRunner); EngineRunner runner = (EngineRunner)engineDomain.CreateInstanceAndUnwrap(engineRunnerType.Assembly.GetName().Name, engineRunnerType.FullName); byte[] resultXml = runner.Execute(engineXml); bool success = false; int retries = 0; do { lock(connectionLock) { if(!stopped) { try { engineStore.StoreResult(currentGuid, resultXml); success = true; } catch(TimeoutException) { ChangeStatusMessage("TimeoutException while trying to store the result of an engine"); success = false; retries++; Thread.Sleep(TimeSpan.FromSeconds(CONNECTION_RETRY_TIMEOUT_SEC)); } catch(CommunicationException) { ChangeStatusMessage("CommunicationException while trying to store the result of an engine"); ResetConnection(); success = false; retries++; Thread.Sleep(TimeSpan.FromSeconds(CONNECTION_RETRY_TIMEOUT_SEC)); } } } } while(!stopped && !success && retries < MAX_RETRIES); // dispose the AppDomain that was created to run the job AppDomain.Unload(engineDomain); executing = false; // ok if we could store the result it's probable that the server can send us another engine use a small time-interval if(success) fetchOperationTimer.Interval = 100; else fetchOperationTimer.Interval = 30000; // if there were problems -> sleep for a longer time // clear state currentGuid = Guid.Empty; // start the timer fetchOperationTimer.Start(); } else { // ok we didn't get engine -> if the user didn't press stop this means that the server doesn't have engines for us // if the user pressed stop we must not start the timer if(!stopped) { // start the timer again fetchOperationTimer.Interval = 5000; fetchOperationTimer.Start(); } } } catch(Exception ex) { // in case something goes wrong when creating / unloading the AppDomain ChangeStatusMessage("Uncaught exception " + ex.Message); Stop(); } } private void ChangeStatusMessage(string msg) { Trace.TraceWarning(msg); statusMessage = DateTime.Now.ToShortTimeString() + ": " + msg; } } }