Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Hive.Client.Communication/3.2/WcfService.cs @ 2211

Last change on this file since 2211 was 2122, checked in by svonolfe, 16 years ago

Fixed issue related to the streaming of results (#680)

File size: 13.9 KB
RevLine 
[1132]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
[923]23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Client.Common;
[993]31using HeuristicLab.Hive.Client.Communication.ServerService;
[1450]32using HeuristicLab.PluginInfrastructure;
[1939]33using System.IO;
34using System.Runtime.Serialization.Formatters.Binary;
[923]35
36namespace HeuristicLab.Hive.Client.Communication {
[1132]37  /// <summary>
38  /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
39  /// </summary>
[923]40  public class WcfService {
41    private static WcfService instance;
[1132]42    /// <summary>
43    /// Getter for the Instance of the WcfService
44    /// </summary>
45    /// <returns>the Instance of the WcfService class</returns>
[923]46    public static WcfService Instance {
47      get {
48        if (instance == null) {
49          instance = new WcfService();
50        }
51        return instance;
52      }
53    }
54
[932]55    public DateTime ConnectedSince { get; private set; }   
56    public NetworkEnum.WcfConnState ConnState { get; private set; }
57    public string ServerIP { get; private set; }
58    public int ServerPort { get; private set; }
[923]59
[924]60    public event EventHandler ConnectionRestored;   
[1081]61    public event EventHandler ServerChanged;
62    public event EventHandler Connected;   
[924]63
[1939]64    public ClientFacadeClient proxy = null;
[923]65
[1132]66    /// <summary>
67    /// Constructor
68    /// </summary>
[923]69    private WcfService() {
[949]70      ConnState = NetworkEnum.WcfConnState.Disconnected;
[923]71    }
[1132]72
73    /// <summary>
74    /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
75    /// </summary>
[923]76    public void Connect() {
77      try {
[2025]78        if (String.Empty.Equals(ServerIP) || ServerPort == 0) {
79          Logging.Instance.Info(this.ToString(), "No Server IP or Port set!");
80          return;
81        }
[1939]82        proxy = new ClientFacadeClient(
83          WcfSettings.GetStreamedBinding(),
[1081]84          new EndpointAddress("net.tcp://" + ServerIP + ":" + ServerPort + "/HiveServer/ClientCommunicator")
85        );
[923]86
87        proxy.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(proxy_LoginCompleted);
[1939]88        proxy.SendStreamedJobCompleted += new EventHandler<SendStreamedJobCompletedEventArgs>(proxy_SendStreamedJobCompleted);
89        proxy.StoreFinishedJobResultStreamedCompleted += new EventHandler<StoreFinishedJobResultStreamedCompletedEventArgs>(proxy_StoreFinishedJobResultStreamedCompleted);
90        proxy.ProcessSnapshotStreamedCompleted += new EventHandler<ProcessSnapshotStreamedCompletedEventArgs>(proxy_ProcessSnapshotStreamedCompleted);
[1366]91        proxy.ProcessHeartBeatCompleted += new EventHandler<ProcessHeartBeatCompletedEventArgs>(proxy_ProcessHeartBeatCompleted);
[1082]92        proxy.Open();
93
[932]94        ConnState = NetworkEnum.WcfConnState.Connected;
[1097]95        ConnectedSince = DateTime.Now;
96       
[1082]97        if (Connected != null)
98          Connected(this, new EventArgs());                               
[1340]99        //Todo: This won't be hit. EVER       
[1097]100        if (ConnState == NetworkEnum.WcfConnState.Failed)
101          ConnectionRestored(this, new EventArgs());       
[923]102      }
[1371]103      catch (Exception ex) {     
104        HandleNetworkError(ex);
[923]105      }
106    }
107
[1379]108
[1132]109    /// <summary>
110    /// Changes the Connectionsettings (serverIP & serverPort) and reconnects
111    /// </summary>
112    /// <param name="serverIP">current Server IP</param>
113    /// <param name="serverPort">current Server Port</param>
[932]114    public void Connect(String serverIP, int serverPort) {
[993]115      String oldIp = this.ServerIP;
116      int oldPort = this.ServerPort;
[932]117      this.ServerIP = serverIP;
[944]118      this.ServerPort = serverPort;     
[923]119      Connect();
[993]120      if (oldIp != serverIP || oldPort != ServerPort)
[1036]121        if(ServerChanged != null)
122          ServerChanged(this, new EventArgs());
[923]123    }
[2025]124
125    public void SetIPAndPort(String serverIP, int serverPort) {
126      this.ServerIP = serverIP;
127      this.ServerPort = serverPort;
128    }
[1132]129   
130    /// <summary>
131    /// Disconnects the Client from the Server
132    /// </summary>
[932]133    public void Disconnect() {
134      ConnState = NetworkEnum.WcfConnState.Disconnected;
135    }
136
[1132]137    /// <summary>
138    /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
139    /// </summary>
140    /// <param name="e">The Exception</param>
[1371]141    private void HandleNetworkError(Exception e) {
[932]142      ConnState = NetworkEnum.WcfConnState.Failed;
[1371]143      Logging.Instance.Error(this.ToString(), "exception: ", e);
[923]144    }
145
[1132]146   
147
148    /// <summary>
149    /// Methods for the Server Login
150    /// </summary>
[923]151    #region Login
152    public event System.EventHandler<LoginCompletedEventArgs> LoginCompleted;
153    public void LoginAsync(ClientInfo clientInfo) {
[932]154      if (ConnState == NetworkEnum.WcfConnState.Connected)
[923]155        proxy.LoginAsync(clientInfo);
156    }
157    private void proxy_LoginCompleted(object sender, LoginCompletedEventArgs e) {
158      if (e.Error == null)
159        LoginCompleted(sender, e);
160      else
[1371]161        HandleNetworkError(e.Error.InnerException);
[923]162    }
[1097]163
164    public void LoginSync(ClientInfo clientInfo) {
165      try {
166        if (ConnState == NetworkEnum.WcfConnState.Connected) {
167          Response res = proxy.Login(clientInfo);
[1959]168          if (!res.Success) {
169            Logging.Instance.Error(this.ToString(), "Login Failed! " + res.StatusMessage);
[2025]170            HandleNetworkError(new Exception(res.StatusMessage));
[1959]171          } else {
172            ConnState = NetworkEnum.WcfConnState.Loggedin;
173            Logging.Instance.Info(this.ToString(), res.StatusMessage);
174          }
[1097]175        }
176      }
177      catch (Exception e) {
[1371]178        HandleNetworkError(e);
[1097]179      }
180    }
181
[923]182    #endregion
183
[1132]184    /// <summary>
185    /// Pull a Job from the Server
186    /// </summary>
[923]187    #region PullJob
[1367]188    public event System.EventHandler<SendJobCompletedEventArgs> SendJobCompleted;
189    public void SendJobAsync(Guid guid) {
[1255]190      if (ConnState == NetworkEnum.WcfConnState.Loggedin)       
[1939]191        proxy.SendStreamedJobAsync(guid);
[923]192    }
[1939]193
194    void proxy_SendStreamedJobCompleted(object sender, SendStreamedJobCompletedEventArgs e) {
195      if (e.Error == null) {
[2117]196        Stream stream = null;
[2122]197        MemoryStream memStream = null;
[1939]198
[2117]199        try {
200          stream = (Stream)e.Result;
201
202          //first deserialize the response
203          BinaryFormatter formatter =
204            new BinaryFormatter();
205          ResponseJob response =
206            (ResponseJob)formatter.Deserialize(stream);
207
208          //second deserialize the BLOB
[2122]209          memStream = new MemoryStream();
210
[2117]211          byte[] buffer = new byte[3024];
212          int read = 0;
213          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
214            memStream.Write(buffer, 0, read);
215          }
216
[2122]217          memStream.Close();
218
[2117]219          SendJobCompletedEventArgs completedEventArgs =
220            new SendJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, e.Error, e.Cancelled, e.UserState);
221          SendJobCompleted(sender, completedEventArgs);
222        }
223        finally {
224          if(stream != null)
225            stream.Dispose();
[2122]226
227          if (memStream != null)
228            memStream.Dispose();
[2117]229        }
[1939]230      } else
[1371]231        HandleNetworkError(e.Error);
[923]232    }
[1939]233
[923]234    #endregion
235
[1132]236    /// <summary>
237    /// Send back finished Job Results
238    /// </summary>
[923]239    #region SendJobResults
[1379]240    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> StoreFinishedJobResultCompleted;
[1449]241    public void StoreFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
[2117]242      if (ConnState == NetworkEnum.WcfConnState.Loggedin) {
243        Stream stream =
244          GetStreamedJobResult(clientId, jobId, result, percentage, exception);
245
246        proxy.StoreFinishedJobResultStreamedAsync(stream, stream);
247      }
248     }
[1939]249    private void proxy_StoreFinishedJobResultStreamedCompleted(object sender, StoreFinishedJobResultStreamedCompletedEventArgs e) {
[2117]250      Stream stream =
251        (Stream)e.UserState;
252      if (stream != null)
253        stream.Dispose();
254     
[1939]255      if (e.Error == null) {
256        StoreFinishedJobResultCompletedEventArgs args =
257          new StoreFinishedJobResultCompletedEventArgs(
258            new object[] { e.Result }, e.Error, e.Cancelled, e.UserState);
259        StoreFinishedJobResultCompleted(sender, args);
260      } else
[1371]261        HandleNetworkError(e.Error);
[923]262    }
263
264    #endregion
265
[1379]266    #region Processsnapshots
267    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
[1449]268    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
[2117]269      if (ConnState == NetworkEnum.WcfConnState.Loggedin) {
270        Stream stream = GetStreamedJobResult(
271            clientId, jobId, result, percentage, exception);
272
273        proxy.ProcessSnapshotStreamedAsync(stream, stream);
274      }
[1379]275    }
[1939]276    void proxy_ProcessSnapshotStreamedCompleted(object sender, ProcessSnapshotStreamedCompletedEventArgs e) {
[2117]277      Stream stream =
278        (Stream)e.UserState;
279      if (stream != null)
280        stream.Dispose();
281     
[1939]282      if (e.Error == null) {
283        ProcessSnapshotCompletedEventArgs args =
284          new ProcessSnapshotCompletedEventArgs(
285            new object[] { e.Result }, e.Error, e.Cancelled, e.UserState);
286
287        ProcessSnapshotCompleted(sender, args);
288      } else
[1379]289        HandleNetworkError(e.Error);
290    }   
291   
292    #endregion
[2107]293                                                 
[1132]294    /// <summary>
295    /// Methods for sending the periodically Heartbeat
296    /// </summary>
[923]297    #region Heartbeat
298
[1366]299    public event System.EventHandler<ProcessHeartBeatCompletedEventArgs> SendHeartBeatCompleted;
[923]300    public void SendHeartBeatAsync(HeartBeatData hbd) {
[1255]301      if (ConnState == NetworkEnum.WcfConnState.Loggedin)
[1366]302        proxy.ProcessHeartBeatAsync(hbd);
[923]303    }
304
[1366]305    private void proxy_ProcessHeartBeatCompleted(object sender, ProcessHeartBeatCompletedEventArgs e) {
[1936]306      if (e.Error == null && e.Result.Success == true)
[923]307        SendHeartBeatCompleted(sender, e);
[2063]308      else {
[2107]309        try {
310          Logging.Instance.Error(this.ToString(), "Error: " + e.Result.StatusMessage);
311        } catch (Exception ex) {
312          Logging.Instance.Error(this.ToString(), "Error: ", ex);         
313        }
[1371]314        HandleNetworkError(e.Error);
[2063]315      }
[923]316    }
317
[1097]318    #endregion 
[1271]319
320    /// <summary>
321    /// Send back finished and Stored Job Results
322    /// </summary>
[1939]323    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
[2117]324      JobResult jobResult =
325          new JobResult();
[1939]326      jobResult.ClientId = clientId;
327      jobResult.JobId = jobId;
328      jobResult.Percentage = percentage;
329      jobResult.Exception = exception;
330
[2117]331      MultiStream stream =
332              new MultiStream();
[2099]333
[2117]334      //first send result
335      stream.AddStream(
336        new StreamedObject<JobResult>(jobResult));
[1939]337
[2117]338      //second stream the job binary data
339      MemoryStream memStream =
340        new MemoryStream(result, false);
341      stream.AddStream(memStream);
[1939]342
343      return stream;
344    }
345
[1812]346    public ResponseResultReceived SendStoredJobResultsSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {     
[1939]347      return proxy.StoreFinishedJobResultStreamed(
348        GetStreamedJobResult(clientId, jobId, result, percentage, exception));
[1271]349    }
[1450]350
[1959]351    public Response IsJobStillNeeded(Guid jobId) {
352      try {
353        return proxy.IsJobStillNeeded(jobId);
354      }
355      catch (Exception e) {
356        HandleNetworkError(e);
357        return null;
358      }
359     
360    }
361
[1812]362    public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
363      try {
364        Logging.Instance.Info(this.ToString(), "Snapshot for Job " + jobId + " submitted");
[1939]365        return proxy.ProcessSnapshotStreamed(
366          GetStreamedJobResult(clientId, jobId, result, percentage, exception));
[1812]367      }
368      catch (Exception e) {
369        HandleNetworkError(e);
370        return null;
371      }
372    }
373
[1594]374    public List<CachedHivePluginInfo> RequestPlugins(List<HivePluginInfo> requestedPlugins) {
[1450]375      try {
[1939]376        Stream stream = proxy.SendStreamedPlugins(requestedPlugins.ToArray());
377
378        BinaryFormatter formatter =
379          new BinaryFormatter();
380        ResponsePlugin response = (ResponsePlugin)formatter.Deserialize(stream);
[1635]381        return response.Plugins;       
[1450]382      }
383      catch (Exception e) {
384        HandleNetworkError(e);
385        return null;
386      }
387    }
388
[1635]389    public void Logout(Guid guid) {
390      try {
391        proxy.Logout(guid);
392      }
393      catch (Exception e) {
394        HandleNetworkError(e);
395      }
396    }
[923]397  }
398}
Note: See TracBrowser for help on using the repository browser.