#region License Information /* HeuristicLab * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.IO; using System.Runtime.Serialization.Formatters.Binary; using System.ServiceModel; using HeuristicLab.Common; using HeuristicLab.Hive.Contracts; using HeuristicLab.Hive.Contracts.BusinessObjects; using HeuristicLab.Hive.Contracts.ResponseObjects; using HeuristicLab.Hive.Slave.Common; using HeuristicLab.Hive.Slave.Communication.SlaveFacade; using HeuristicLab.PluginInfrastructure; using HeuristicLab.Tracing; namespace HeuristicLab.Hive.Slave.Communication { /// /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server /// public class WcfService { private static WcfService instance; /// /// Getter for the Instance of the WcfService /// /// the Instance of the WcfService class public static WcfService Instance { get { if (instance == null) { Logger.Debug("New WcfService Instance created"); instance = new WcfService(); } return instance; } } public DateTime ConnectedSince { get; private set; } public NetworkEnum.WcfConnState ConnState { get; private set; } private string serverIp; public string ServerIp { get { return serverIp; } set { if (serverIp != value) { serverIp = value; } } } public event EventHandler Connected; public void OnConnected() { var handler = Connected; if (handler != null) handler(this, EventArgs.Empty); } /// /// Constructor /// private WcfService() { ConnState = NetworkEnum.WcfConnState.Disconnected; } /// /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event. /// public void Connect(SlaveDto slaveInfo) { ServiceLocator.Instance.HostAddress = ServerIp; RegisterServiceEvents(); using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { try { Logger.Debug("Starting the Connection Process"); if (String.Empty.Equals(ServerIp)) { Logger.Info("No Server IP set!"); return; } ConnState = NetworkEnum.WcfConnState.Connected; ConnectedSince = DateTime.Now; service.Obj.Login(slaveInfo); OnConnected(); } catch (Exception ex) { HandleNetworkError(ex); } } } private void RegisterServiceEvents() { ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured += new EventHandler>(ClientFacadePool_ExceptionOccured); ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured += new EventHandler>(ClientFacadePool_ExceptionOccured); } private void DeregisterServiceEvents() { ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured -= new EventHandler>(ClientFacadePool_ExceptionOccured); ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured -= new EventHandler>(ClientFacadePool_ExceptionOccured); } void ClientFacadePool_ExceptionOccured(object sender, EventArgs e) { HandleNetworkError(e.Value); Logger.Error("An exception occured in the WCF-Communication: " + e.Value.ToString()); } ///// ///// Disconnects the Slave from the Server ///// public void Disconnect() { ConnState = NetworkEnum.WcfConnState.Disconnected; DeregisterServiceEvents(); } /// /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state /// /// The Exception private void HandleNetworkError(Exception e) { ConnState = NetworkEnum.WcfConnState.Failed; DeregisterServiceEvents(); Logger.Error("Network exception occurred: " + e); } /// /// Methods for the Server Login /// //public void Login(SlaveDto slaveInfo) { // try { // using (Disposable service = ServiceLocator.Instance.SlaveFacadePool.GetService()) { // if (ConnState == NetworkEnum.WcfConnState.Connected) { // Logger.Debug("STARTED: Login Sync"); // Response res = service.Obj.Login(slaveInfo); // if (res.StatusMessage != ResponseStatus.Ok) { // Logger.Error("FAILED: Login Failed! " + res.StatusMessage); // throw new Exception(res.StatusMessage.ToString()); // } else { // Logger.Info("ENDED: Login succeeded" + res.StatusMessage); // } // } // } // } // catch (Exception e) { // OnExceptionOccured(e); // } //} /// /// Pull a Job from the Server /// #region PullJob public event System.EventHandler GetJobCompleted; public void GetJobAsync(Guid guid) { Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService(); Logger.Debug("STARTED: Fetching of Jobs from Server for Slave"); service.Obj.BeginGetStreamedJob(guid, (ar => { Stream stream = null; MemoryStream memStream = null; try { if (ar.IsCompleted) { Logger.Debug("ENDED: Fetching of Jobs from Server for Slave"); stream = service.Obj.EndGetStreamedJob(ar); //first deserialize the response BinaryFormatter formatter = new BinaryFormatter(); ResponseObject response = (ResponseObject)formatter.Deserialize(stream); //second deserialize the BLOB memStream = new MemoryStream(); byte[] buffer = new byte[3024]; int read = 0; while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) { memStream.Write(buffer, 0, read); } memStream.Close(); GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, null, !ar.IsCompleted, ar.AsyncState); GetJobCompleted(this, completedEventArgs); } else { HandleNetworkError(new FaultException("GetJobAsync did not complete")); } } catch (Exception e) { OnExceptionOccured(e); } finally { if (stream != null) stream.Dispose(); if (memStream != null) memStream.Dispose(); try { service.Dispose(); } catch (Exception e) { OnExceptionOccured(e); } } }), null); } #endregion /// /// Send back finished Job Results /// #region SendJobResults public event System.EventHandler GetFinishedJobResultCompleted; public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) { Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService(); Logger.Debug("STARTED: Sending back the finished job results"); Logger.Debug("Building stream"); Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception); Logger.Debug("Builded stream"); Logger.Debug("Making the call"); service.Obj.BeginStoreFinishedJobResultStreamed(stream, (ar => { try { Logger.Debug("Finished storing the job"); if (stream != null) stream.Dispose(); if (ar.IsCompleted) { var res = service.Obj.EndStoreFinishedJobResultStreamed(ar); StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { res }, null, false, null); Logger.Debug("calling the Finished Job Event"); GetFinishedJobResultCompleted(this, args); Logger.Debug("ENDED: Sending back the finished job results"); } else { HandleNetworkError(new FaultException("GetFinishedJobResultAsync did not complete")); } } catch (Exception e) { OnExceptionOccured(e); } finally { try { service.Dispose(); } catch (Exception e) { OnExceptionOccured(e); } } }), null); } #endregion #region Processsnapshots public event System.EventHandler ProcessSnapshotCompleted; public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) { Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService(); Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception); service.Obj.BeginProcessSnapshotStreamed(stream, (ar => { try { if (stream != null) stream.Dispose(); if (ar.IsCompleted) { var res = service.Obj.EndStoreFinishedJobResultStreamed(ar); ProcessSnapshotCompletedEventArgs args = new ProcessSnapshotCompletedEventArgs(new object[] { res }, null, false, null); ProcessSnapshotCompleted(this, args); } else { HandleNetworkError(new FaultException("ProcessSnapshotAsync did not complete")); } } catch (Exception e) { OnExceptionOccured(e); } finally { try { service.Dispose(); } catch (Exception e) { OnExceptionOccured(e); } } }), null); } #endregion /// /// Methods for sending the periodically Heartbeat /// #region Heartbeat public event EventHandler ProcessHeartBeatCompleted; public void ProcessHeartBeatSync(HeartBeatData hbd) { using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { Logger.Debug("STARTING: sending heartbeat"); var res = service.Obj.ProcessHeartBeat(hbd); if (res.StatusMessage == ResponseStatus.Ok) { ProcessHeartBeatCompleted(this, new ProcessHeartBeatCompletedEventArgs(new object[] { res }, null, false, null)); Logger.Debug("ENDED: sending heartbeats"); } else { Logger.Error("FAILED: sending heartbeats: " + res.StatusMessage.ToString()); } } } #endregion /// /// Send back finished and Stored Job Results /// private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) { JobResult jobResult = new JobResult(); jobResult.SlaveId = clientId; jobResult.Id = jobId; jobResult.ExecutionTime = executionTime; jobResult.Exception = exception; MultiStream stream = new MultiStream(); //first send result stream.AddStream(new StreamedObject(jobResult)); //second stream the job binary data MemoryStream memStream = new MemoryStream(result, false); stream.AddStream(memStream); return stream; } public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) { using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { ResponseResultReceived res = service.Obj.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception)); return res; } } public Response IsJobStillNeeded(Guid jobId) { try { Logger.Debug("STARTING: Sync call: IsJobStillNeeded"); using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { Response res = service.Obj.IsJobStillNeeded(jobId); Logger.Debug("ENDED: Sync call: IsJobStillNeeded"); return res; } } catch (Exception e) { OnExceptionOccured(e); return null; } } public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) { try { using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { return service.Obj.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception)); } } catch (Exception e) { OnExceptionOccured(e); return null; } } public IEnumerable RequestPlugins(List requestedPlugins) { try { using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { Logger.Debug("STARTED: Requesting Plugins for Job"); Logger.Debug("STARTED: Getting the stream"); Stream stream = service.Obj.GetStreamedPlugins(requestedPlugins.ToArray()); Logger.Debug("ENDED: Getting the stream"); BinaryFormatter formatter = new BinaryFormatter(); Logger.Debug("STARTED: Deserializing the stream"); ResponseList response = (ResponseList)formatter.Deserialize(stream); Logger.Debug("ENDED: Deserializing the stream"); if (stream != null) stream.Dispose(); return response.List; } } catch (Exception e) { OnExceptionOccured(e); return null; } } public void Logout(Guid guid) { try { Logger.Debug("STARTED: Logout"); using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { service.Obj.Logout(guid); } Logger.Debug("ENDED: Logout"); } catch (Exception e) { OnExceptionOccured(e); } } public ResponseCalendar GetCalendarSync(Guid clientId) { try { Logger.Debug("STARTED: Syncing Calendars"); using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { ResponseCalendar cal = service.Obj.GetCalendar(clientId); Logger.Debug("ENDED: Syncing Calendars"); return cal; } } catch (Exception e) { OnExceptionOccured(e); return null; } } public Response SetCalendarStatus(Guid clientId, CalendarState state) { try { Logger.Debug("STARTED: Setting Calendar status to: " + state); using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { Response resp = service.Obj.SetCalendarStatus(clientId, state); Logger.Debug("ENDED: Setting Calendar status to: " + state); return resp; } } catch (Exception e) { OnExceptionOccured(e); return null; } } public ResponseObject AddChildJob(Guid parentJobId, SerializedJob serializedJob) { try { Logger.Debug("STARTED: Add Child Job for parent: " + parentJobId); using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { ResponseObject response = service.Obj.AddChildJob(parentJobId, serializedJob); Logger.Debug("ENDED: Add Child Job for parent: " + parentJobId); return response; } } catch (Exception e) { OnExceptionOccured(e); return null; } } public ResponseObject PauseJob(SerializedJob serializedJob) { try { Logger.Debug("STARTED: Pausing job: " + serializedJob.JobInfo.Id); using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { ResponseObject response = service.Obj.PauseJob(serializedJob); Logger.Debug("ENDED: Pausing job: " + serializedJob.JobInfo.Id); return response; } } catch (Exception e) { OnExceptionOccured(e); return null; } } public ResponseObject GetChildJobs(Guid parentJob) { try { Logger.Debug("STARTED: GetChildJobs job: " + parentJob); using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { SerializedJobList serializedJobs = new SerializedJobList(); JobResult[] results = service.Obj.GetChildJobResults(new Guid?(parentJob), false, false); foreach (JobResult result in results) { serializedJobs.Add(service.Obj.GetLastSerializedResult(result.Id)); } Logger.Debug("ENDED: GetChildJobs job: " + parentJob); return new ResponseObject() { Obj = serializedJobs }; } } catch (Exception e) { OnExceptionOccured(e); return null; } } public void DeleteChildJobs(Guid jobId) { try { using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) { service.Obj.DeleteChildJobs(jobId); } } catch (Exception e) { OnExceptionOccured(e); } } public event EventHandler> ExceptionOccured; private void OnExceptionOccured(Exception e) { Logger.Error("Error: " + e.ToString()); var handler = ExceptionOccured; if (handler != null) handler(this, new EventArgs(e)); } } }