#region License Information
/* HeuristicLab
* Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.ServiceModel;
using HeuristicLab.Common;
using HeuristicLab.Hive.Contracts;
using HeuristicLab.Hive.Contracts.BusinessObjects;
using HeuristicLab.Hive.Contracts.ResponseObjects;
using HeuristicLab.Hive.Slave.Common;
using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
using HeuristicLab.PluginInfrastructure;
using HeuristicLab.Tracing;
namespace HeuristicLab.Hive.Slave.Communication {
///
/// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
///
public class WcfService {
private static WcfService instance;
///
/// Getter for the Instance of the WcfService
///
/// the Instance of the WcfService class
public static WcfService Instance {
get {
if (instance == null) {
Logger.Debug("New WcfService Instance created");
instance = new WcfService();
}
return instance;
}
}
public DateTime ConnectedSince { get; private set; }
public NetworkEnum.WcfConnState ConnState { get; private set; }
private string serverIp;
public string ServerIp {
get { return serverIp; }
set {
if (serverIp != value) {
serverIp = value;
}
}
}
public event EventHandler Connected;
public void OnConnected() {
var handler = Connected;
if (handler != null) handler(this, EventArgs.Empty);
}
///
/// Constructor
///
private WcfService() {
ConnState = NetworkEnum.WcfConnState.Disconnected;
}
///
/// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
///
public void Connect(SlaveDto slaveInfo) {
ServiceLocator.Instance.HostAddress = ServerIp;
RegisterServiceEvents();
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
try {
Logger.Debug("Starting the Connection Process");
if (String.Empty.Equals(ServerIp)) {
Logger.Info("No Server IP set!");
return;
}
ConnState = NetworkEnum.WcfConnState.Connected;
ConnectedSince = DateTime.Now;
service.Obj.Login(slaveInfo);
OnConnected();
}
catch (Exception ex) {
HandleNetworkError(ex);
}
}
}
private void RegisterServiceEvents() {
ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured += new EventHandler>(ClientFacadePool_ExceptionOccured);
ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured += new EventHandler>(ClientFacadePool_ExceptionOccured);
}
private void DeregisterServiceEvents() {
ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured -= new EventHandler>(ClientFacadePool_ExceptionOccured);
ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured -= new EventHandler>(ClientFacadePool_ExceptionOccured);
}
void ClientFacadePool_ExceptionOccured(object sender, EventArgs e) {
HandleNetworkError(e.Value);
Logger.Error("An exception occured in the WCF-Communication: " + e.Value.ToString());
}
/////
///// Disconnects the Slave from the Server
/////
public void Disconnect() {
ConnState = NetworkEnum.WcfConnState.Disconnected;
DeregisterServiceEvents();
}
///
/// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
///
/// The Exception
private void HandleNetworkError(Exception e) {
ConnState = NetworkEnum.WcfConnState.Failed;
DeregisterServiceEvents();
Logger.Error("Network exception occurred: " + e);
}
///
/// Methods for the Server Login
///
//public void Login(SlaveDto slaveInfo) {
// try {
// using (Disposable service = ServiceLocator.Instance.SlaveFacadePool.GetService()) {
// if (ConnState == NetworkEnum.WcfConnState.Connected) {
// Logger.Debug("STARTED: Login Sync");
// Response res = service.Obj.Login(slaveInfo);
// if (res.StatusMessage != ResponseStatus.Ok) {
// Logger.Error("FAILED: Login Failed! " + res.StatusMessage);
// throw new Exception(res.StatusMessage.ToString());
// } else {
// Logger.Info("ENDED: Login succeeded" + res.StatusMessage);
// }
// }
// }
// }
// catch (Exception e) {
// OnExceptionOccured(e);
// }
//}
///
/// Pull a Job from the Server
///
#region PullJob
public event System.EventHandler GetJobCompleted;
public void GetJobAsync(Guid guid) {
Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
Logger.Debug("STARTED: Fetching of Jobs from Server for Slave");
service.Obj.BeginGetStreamedJob(guid, (ar => {
Stream stream = null;
MemoryStream memStream = null;
try {
if (ar.IsCompleted) {
Logger.Debug("ENDED: Fetching of Jobs from Server for Slave");
stream = service.Obj.EndGetStreamedJob(ar);
//first deserialize the response
BinaryFormatter formatter = new BinaryFormatter();
ResponseObject response = (ResponseObject)formatter.Deserialize(stream);
//second deserialize the BLOB
memStream = new MemoryStream();
byte[] buffer = new byte[3024];
int read = 0;
while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
memStream.Write(buffer, 0, read);
}
memStream.Close();
GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, null, !ar.IsCompleted, ar.AsyncState);
GetJobCompleted(this, completedEventArgs);
} else {
HandleNetworkError(new FaultException("GetJobAsync did not complete"));
}
}
catch (Exception e) {
OnExceptionOccured(e);
}
finally {
if (stream != null)
stream.Dispose();
if (memStream != null)
memStream.Dispose();
try { service.Dispose(); }
catch (Exception e) { OnExceptionOccured(e); }
}
}), null);
}
#endregion
///
/// Send back finished Job Results
///
#region SendJobResults
public event System.EventHandler GetFinishedJobResultCompleted;
public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
Logger.Debug("STARTED: Sending back the finished job results");
Logger.Debug("Building stream");
Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
Logger.Debug("Builded stream");
Logger.Debug("Making the call");
service.Obj.BeginStoreFinishedJobResultStreamed(stream, (ar => {
try {
Logger.Debug("Finished storing the job");
if (stream != null)
stream.Dispose();
if (ar.IsCompleted) {
var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { res }, null, false, null);
Logger.Debug("calling the Finished Job Event");
GetFinishedJobResultCompleted(this, args);
Logger.Debug("ENDED: Sending back the finished job results");
} else {
HandleNetworkError(new FaultException("GetFinishedJobResultAsync did not complete"));
}
}
catch (Exception e) {
OnExceptionOccured(e);
}
finally {
try { service.Dispose(); }
catch (Exception e) { OnExceptionOccured(e); }
}
}), null);
}
#endregion
#region Processsnapshots
public event System.EventHandler ProcessSnapshotCompleted;
public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
service.Obj.BeginProcessSnapshotStreamed(stream, (ar => {
try {
if (stream != null)
stream.Dispose();
if (ar.IsCompleted) {
var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
ProcessSnapshotCompletedEventArgs args = new ProcessSnapshotCompletedEventArgs(new object[] { res }, null, false, null);
ProcessSnapshotCompleted(this, args);
} else {
HandleNetworkError(new FaultException("ProcessSnapshotAsync did not complete"));
}
}
catch (Exception e) {
OnExceptionOccured(e);
}
finally {
try { service.Dispose(); }
catch (Exception e) { OnExceptionOccured(e); }
}
}), null);
}
#endregion
///
/// Methods for sending the periodically Heartbeat
///
#region Heartbeat
public event EventHandler ProcessHeartBeatCompleted;
public void ProcessHeartBeatSync(HeartBeatData hbd) {
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
Logger.Debug("STARTING: sending heartbeat");
var res = service.Obj.ProcessHeartBeat(hbd);
if (res.StatusMessage == ResponseStatus.Ok) {
ProcessHeartBeatCompleted(this, new ProcessHeartBeatCompletedEventArgs(new object[] { res }, null, false, null));
Logger.Debug("ENDED: sending heartbeats");
} else {
Logger.Error("FAILED: sending heartbeats: " + res.StatusMessage.ToString());
}
}
}
#endregion
///
/// Send back finished and Stored Job Results
///
private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
JobResult jobResult = new JobResult();
jobResult.SlaveId = clientId;
jobResult.Id = jobId;
jobResult.ExecutionTime = executionTime;
jobResult.Exception = exception;
MultiStream stream = new MultiStream();
//first send result
stream.AddStream(new StreamedObject(jobResult));
//second stream the job binary data
MemoryStream memStream = new MemoryStream(result, false);
stream.AddStream(memStream);
return stream;
}
public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
ResponseResultReceived res = service.Obj.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
return res;
}
}
public Response IsJobStillNeeded(Guid jobId) {
try {
Logger.Debug("STARTING: Sync call: IsJobStillNeeded");
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
Response res = service.Obj.IsJobStillNeeded(jobId);
Logger.Debug("ENDED: Sync call: IsJobStillNeeded");
return res;
}
}
catch (Exception e) {
OnExceptionOccured(e);
return null;
}
}
public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
try {
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
return service.Obj.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
}
}
catch (Exception e) {
OnExceptionOccured(e);
return null;
}
}
public IEnumerable RequestPlugins(List requestedPlugins) {
try {
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
Logger.Debug("STARTED: Requesting Plugins for Job");
Logger.Debug("STARTED: Getting the stream");
Stream stream = service.Obj.GetStreamedPlugins(requestedPlugins.ToArray());
Logger.Debug("ENDED: Getting the stream");
BinaryFormatter formatter = new BinaryFormatter();
Logger.Debug("STARTED: Deserializing the stream");
ResponseList response = (ResponseList)formatter.Deserialize(stream);
Logger.Debug("ENDED: Deserializing the stream");
if (stream != null)
stream.Dispose();
return response.List;
}
}
catch (Exception e) {
OnExceptionOccured(e);
return null;
}
}
public void Logout(Guid guid) {
try {
Logger.Debug("STARTED: Logout");
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
service.Obj.Logout(guid);
}
Logger.Debug("ENDED: Logout");
}
catch (Exception e) {
OnExceptionOccured(e);
}
}
public ResponseCalendar GetCalendarSync(Guid clientId) {
try {
Logger.Debug("STARTED: Syncing Calendars");
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
ResponseCalendar cal = service.Obj.GetCalendar(clientId);
Logger.Debug("ENDED: Syncing Calendars");
return cal;
}
}
catch (Exception e) {
OnExceptionOccured(e);
return null;
}
}
public Response SetCalendarStatus(Guid clientId, CalendarState state) {
try {
Logger.Debug("STARTED: Setting Calendar status to: " + state);
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
Response resp = service.Obj.SetCalendarStatus(clientId, state);
Logger.Debug("ENDED: Setting Calendar status to: " + state);
return resp;
}
}
catch (Exception e) {
OnExceptionOccured(e);
return null;
}
}
public ResponseObject AddChildJob(Guid parentJobId, SerializedJob serializedJob) {
try {
Logger.Debug("STARTED: Add Child Job for parent: " + parentJobId);
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
ResponseObject response = service.Obj.AddChildJob(parentJobId, serializedJob);
Logger.Debug("ENDED: Add Child Job for parent: " + parentJobId);
return response;
}
}
catch (Exception e) {
OnExceptionOccured(e);
return null;
}
}
public ResponseObject PauseJob(SerializedJob serializedJob) {
try {
Logger.Debug("STARTED: Pausing job: " + serializedJob.JobInfo.Id);
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
ResponseObject response = service.Obj.PauseJob(serializedJob);
Logger.Debug("ENDED: Pausing job: " + serializedJob.JobInfo.Id);
return response;
}
}
catch (Exception e) {
OnExceptionOccured(e);
return null;
}
}
public ResponseObject GetChildJobs(Guid parentJob) {
try {
Logger.Debug("STARTED: GetChildJobs job: " + parentJob);
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
SerializedJobList serializedJobs = new SerializedJobList();
JobResult[] results = service.Obj.GetChildJobResults(new Guid?(parentJob), false, false);
foreach (JobResult result in results) {
serializedJobs.Add(service.Obj.GetLastSerializedResult(result.Id));
}
Logger.Debug("ENDED: GetChildJobs job: " + parentJob);
return new ResponseObject() {
Obj = serializedJobs
};
}
}
catch (Exception e) {
OnExceptionOccured(e);
return null;
}
}
public void DeleteChildJobs(Guid jobId) {
try {
using (Disposable service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
service.Obj.DeleteChildJobs(jobId);
}
}
catch (Exception e) {
OnExceptionOccured(e);
}
}
public event EventHandler> ExceptionOccured;
private void OnExceptionOccured(Exception e) {
Logger.Error("Error: " + e.ToString());
var handler = ExceptionOccured;
if (handler != null) handler(this, new EventArgs(e));
}
}
}