Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs @ 4316

Last change on this file since 4316 was 4316, checked in by cneumuel, 14 years ago

made streaming wcf-services work with Transport-Security and net.tcp but with Message-Level Credentials (#1168)

File size: 15.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Slave.Common;
29using HeuristicLab.PluginInfrastructure;
30using System.IO;
31using System.Runtime.Serialization.Formatters.Binary;
32using HeuristicLab.Tracing;
33using HeuristicLab.Hive.Contracts;
34using HeuristicLab.Hive.Contracts.BusinessObjects;
35using HeuristicLab.Hive.Slave.Communication.SlaveService;
36using HeuristicLab.Hive.Contracts.ResponseObjects;
37using HeuristicLab.Hive.Slave.Communication.Properties;
38
39namespace HeuristicLab.Hive.Slave.Communication {
40
41  /// <summary>
42  /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
43  /// </summary>
44  public class WcfService {
45    private static WcfService instance;
46    /// <summary>
47    /// Getter for the Instance of the WcfService
48    /// </summary>
49    /// <returns>the Instance of the WcfService class</returns>
50    public static WcfService Instance {
51      get {
52        if (instance == null) {
53          Logger.Debug("New WcfService Instance created");
54          instance = new WcfService();
55        }
56        return instance;
57      }
58    }
59
60    public DateTime ConnectedSince { get; private set; }
61    public NetworkEnum.WcfConnState ConnState { get; private set; }
62    public bool LoggedIn { get; set; }
63    public string ServerIP { get; private set; }
64
65    public event EventHandler ConnectionRestored;
66    public event EventHandler ServerChanged;
67    public event EventHandler Connected;
68
69    public SlaveFacadeClient proxy = null;
70
71    /// <summary>
72    /// Constructor
73    /// </summary>
74    private WcfService() {
75      ConnState = NetworkEnum.WcfConnState.Disconnected;
76      LoggedIn = false;
77    }
78
79    /// <summary>
80    /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
81    /// </summary>
82    public void Connect() {
83      try {
84        Logger.Debug("Starting the Connection Process");
85        if (String.Empty.Equals(ServerIP)) {
86          Logger.Info("No Server IP set!");
87          return;
88        }
89
90        Logger.Debug("Creating the new connection proxy");
91        proxy = ServiceLocator.CreateStreamedSlaveFacade(ServerIP);
92        Logger.Debug("Created the new connection proxy");
93
94        Logger.Debug("Registring new Events");
95        proxy.GetStreamedJobCompleted += new EventHandler<GetStreamedJobCompletedEventArgs>(proxy_GetStreamedJobCompleted);
96        proxy.StoreFinishedJobResultStreamedCompleted += new EventHandler<StoreFinishedJobResultStreamedCompletedEventArgs>(proxy_StoreFinishedJobResultStreamedCompleted);
97        proxy.ProcessSnapshotStreamedCompleted += new EventHandler<ProcessSnapshotStreamedCompletedEventArgs>(proxy_ProcessSnapshotStreamedCompleted);
98        proxy.ProcessHeartBeatCompleted += new EventHandler<ProcessHeartBeatCompletedEventArgs>(proxy_ProcessHeartBeatCompleted);
99        Logger.Debug("Registered new Events");
100        Logger.Debug("Opening the Connection");
101        proxy.ClientCredentials.UserName.UserName = Settings.Default.HiveUsername;
102        proxy.ClientCredentials.UserName.Password = Settings.Default.HivePassword;
103        proxy.Open();
104        Logger.Debug("Opened the Connection");
105
106        ConnState = NetworkEnum.WcfConnState.Connected;
107        ConnectedSince = DateTime.Now;
108        LoggedIn = false;
109
110        if (Connected != null) {
111          Logger.Debug("Calling the connected Event");
112          Connected(this, new EventArgs());
113          //Todo: This won't be hit. EVER       
114        }
115        if (ConnState == NetworkEnum.WcfConnState.Failed) {
116          ConnectionRestored(this, new EventArgs());
117        }
118      } catch (Exception ex) {
119        HandleNetworkError(ex);
120      }
121    }
122
123    /// <summary>
124    /// Changes the Connectionsettings (serverIP) and reconnects
125    /// </summary>
126    /// <param name="serverIP">current Server IP</param>
127    public void Connect(String serverIP) {
128      Logger.Debug("Called Connected with " + serverIP);
129      String oldIp = this.ServerIP;
130      this.ServerIP = serverIP;
131      Connect();
132      if (oldIp != serverIP)
133        if (ServerChanged != null)
134          ServerChanged(this, new EventArgs());
135    }
136
137    public void SetIP(String serverIP) {
138      Logger.Debug("Called with " + serverIP);
139      this.ServerIP = serverIP;
140    }
141
142    /// <summary>
143    /// Disconnects the Slave from the Server
144    /// </summary>
145    public void Disconnect() {
146      ConnState = NetworkEnum.WcfConnState.Disconnected;
147      LoggedIn = false;
148    }
149
150    /// <summary>
151    /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
152    /// </summary>
153    /// <param name="e">The Exception</param>
154    private void HandleNetworkError(Exception e) {
155      ConnState = NetworkEnum.WcfConnState.Failed;
156      LoggedIn = false;
157      Logger.Error("Network exception occurred: " + e);
158    }
159
160    /// <summary>
161    /// Methods for the Server Login
162    /// </summary>
163    public void LoginSync(SlaveDto slaveInfo) {
164      try {
165        if (ConnState == NetworkEnum.WcfConnState.Connected) {
166          Logger.Debug("STARTED: Login Sync");
167          Response res = proxy.Login(slaveInfo);
168          if (res.StatusMessage != ResponseStatus.Ok) {
169            Logger.Error("FAILED: Login Failed! " + res.StatusMessage);
170            throw new Exception(res.StatusMessage.ToString());
171          } else {
172            Logger.Info("ENDED: Login succeeded" + res.StatusMessage);
173            LoggedIn = true;
174          }
175        }
176      } catch (Exception e) {
177        HandleNetworkError(e);
178      }
179    }
180   
181    /// <summary>
182    /// Pull a Job from the Server
183    /// </summary>
184    #region PullJob
185    public event System.EventHandler<GetJobCompletedEventArgs> GetJobCompleted;
186    public void GetJobAsync(Guid guid) {
187      if (LoggedIn) {
188        Logger.Debug("STARTED: Fetching of Jobs from Server for Slave");
189        proxy.GetStreamedJobAsync(guid);
190      }
191    }
192
193    void proxy_GetStreamedJobCompleted(object sender, GetStreamedJobCompletedEventArgs e) {
194      if (e.Error == null) {
195        Logger.Debug("ENDED: Fetching of Jobs from Server for Slave");
196        Stream stream = null;
197        MemoryStream memStream = null;
198
199        try {
200          stream = (Stream)e.Result;
201
202          //first deserialize the response
203          BinaryFormatter formatter = new BinaryFormatter();
204          ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream);
205
206          //second deserialize the BLOB
207          memStream = new MemoryStream();
208
209          byte[] buffer = new byte[3024];
210          int read = 0;
211          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
212            memStream.Write(buffer, 0, read);
213          }
214
215          memStream.Close();
216
217          GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, e.Error, e.Cancelled, e.UserState);
218          GetJobCompleted(sender, completedEventArgs);
219        } catch (Exception ex) {
220          Logger.Error(ex);
221        } finally {
222          if (stream != null)
223            stream.Dispose();
224
225          if (memStream != null)
226            memStream.Dispose();
227        }
228      } else
229        HandleNetworkError(e.Error);
230    }
231
232    #endregion
233
234    /// <summary>
235    /// Send back finished Job Results
236    /// </summary>
237    #region SendJobResults
238    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> GetFinishedJobResultCompleted;
239    public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
240      if (LoggedIn) {
241        Logger.Debug("STARTED: Sending back the finished job results");
242        Logger.Debug("Building stream");
243        Stream stream = GetStreamedJobResult(clientId, jobId, result, percentage, exception);
244        Logger.Debug("Builded stream");
245        Logger.Debug("Making the call");
246        proxy.StoreFinishedJobResultStreamedAsync(stream, stream);
247      }
248    }
249
250    private void proxy_StoreFinishedJobResultStreamedCompleted(object sender, StoreFinishedJobResultStreamedCompletedEventArgs e) {
251      Logger.Debug("Finished storing the job");
252      Stream stream = (Stream)e.UserState;
253      if (stream != null) {
254        Logger.Debug("Stream not null, disposing it");
255        stream.Dispose();
256      }
257      if (e.Error == null) {
258        StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { e.Result }, e.Error, e.Cancelled, e.UserState);
259        Logger.Debug("calling the Finished Job Event");
260        GetFinishedJobResultCompleted(sender, args);
261        Logger.Debug("ENDED: Sending back the finished job results");
262      } else {
263        HandleNetworkError(e.Error);
264      }
265    }
266
267    #endregion
268
269    #region Processsnapshots
270    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
271    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
272      if (LoggedIn) {
273        Stream stream = GetStreamedJobResult(clientId, jobId, result, percentage, exception);
274        proxy.ProcessSnapshotStreamedAsync(stream, stream);
275      }
276    }
277
278    void proxy_ProcessSnapshotStreamedCompleted(object sender, ProcessSnapshotStreamedCompletedEventArgs e) {
279      Stream stream =
280        (Stream)e.UserState;
281      if (stream != null)
282        stream.Dispose();
283
284      if (e.Error == null) {
285        ProcessSnapshotCompletedEventArgs args =
286          new ProcessSnapshotCompletedEventArgs(
287            new object[] { e.Result }, e.Error, e.Cancelled, e.UserState);
288
289        ProcessSnapshotCompleted(sender, args);
290      } else
291        HandleNetworkError(e.Error);
292    }
293
294    #endregion
295
296    /// <summary>
297    /// Methods for sending the periodically Heartbeat
298    /// </summary>
299    #region Heartbeat
300
301    public event System.EventHandler<ProcessHeartBeatCompletedEventArgs> ProcessHeartBeatCompleted;
302    public void ProcessHeartBeatAsync(HeartBeatData hbd) {
303      if (LoggedIn)
304        Logger.Debug("STARTING: sending heartbeat");
305      proxy.ProcessHeartBeatAsync(hbd);
306    }
307
308    private void proxy_ProcessHeartBeatCompleted(object sender, ProcessHeartBeatCompletedEventArgs e) {
309      if (e.Error == null && e.Result.StatusMessage == ResponseStatus.Ok) {
310        ProcessHeartBeatCompleted(sender, e);
311        Logger.Debug("ENDED: sending heartbeats");
312      } else {
313        try {
314          Logger.Error("Error: " + e.Result.StatusMessage);
315        } catch (Exception ex) {
316          Logger.Error("Error: ", ex);
317        }
318        HandleNetworkError(e.Error);
319      }
320    }
321
322    #endregion
323
324    /// <summary>
325    /// Send back finished and Stored Job Results
326    /// </summary>
327    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
328      JobResult jobResult = new JobResult();
329      jobResult.SlaveId = clientId;
330      jobResult.JobId = jobId;
331      jobResult.Percentage = percentage;
332      jobResult.Exception = exception != null ? exception.Message : "";
333
334      MultiStream stream = new MultiStream();
335
336      //first send result
337      stream.AddStream(new StreamedObject<JobResult>(jobResult));
338
339      //second stream the job binary data
340      MemoryStream memStream = new MemoryStream(result, false);
341      stream.AddStream(memStream);
342
343      return stream;
344    }
345
346    public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
347      return proxy.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception));
348    }
349
350    public Response IsJobStillNeeded(Guid jobId) {
351      try {
352        Logger.Debug("STARTING: Sync call: IsJobStillNeeded");
353        Response res = proxy.IsJobStillNeeded(jobId);
354        Logger.Debug("ENDED: Sync call: IsJobStillNeeded");
355        return res;
356      } catch (Exception e) {
357        HandleNetworkError(e);
358        return null;
359      }
360    }
361
362    public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
363      try {
364        return proxy.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception));
365      } catch (Exception e) {
366        HandleNetworkError(e);
367        return null;
368      }
369    }
370
371    public IEnumerable<CachedHivePluginInfoDto> RequestPlugins(List<HivePluginInfoDto> requestedPlugins) {
372      try {
373        Logger.Debug("STARTED: Requesting Plugins for Job");
374        Logger.Debug("STARTED: Getting the stream");
375        Stream stream = proxy.GetStreamedPlugins(requestedPlugins.ToArray());
376        Logger.Debug("ENDED: Getting the stream");
377        BinaryFormatter formatter = new BinaryFormatter();
378        Logger.Debug("STARTED: Deserializing the stream");
379        ResponseList<CachedHivePluginInfoDto> response = (ResponseList<CachedHivePluginInfoDto>)formatter.Deserialize(stream);
380        Logger.Debug("ENDED: Deserializing the stream");
381        if (stream != null)
382          stream.Dispose();
383        return response.List;
384      } catch (Exception e) {
385        HandleNetworkError(e);
386        return null;
387      }
388    }
389
390    public void Logout(Guid guid) {
391      try {
392        Logger.Debug("STARTED: Logout");
393        proxy.Logout(guid);
394        Logger.Debug("ENDED: Logout");
395      } catch (Exception e) {
396        HandleNetworkError(e);
397      }
398    }
399
400    public ResponseCalendar GetCalendarSync(Guid clientId) {
401      try {
402        Logger.Debug("STARTED: Syncing Calendars");
403        ResponseCalendar cal = proxy.GetCalendar(clientId);
404        Logger.Debug("ENDED: Syncing Calendars");
405        return cal;
406      } catch (Exception e) {
407        HandleNetworkError(e);
408        return null;
409      }
410    }
411
412    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
413      try {
414        Logger.Debug("STARTED: Setting Calendar status to: " + state);
415        Response resp = proxy.SetCalendarStatus(clientId, state);
416        Logger.Debug("ENDED: Setting Calendar status to: " + state);
417        return resp;
418      } catch (Exception e) {
419        HandleNetworkError(e);
420        return null;
421      }
422    }
423  }
424}
Note: See TracBrowser for help on using the repository browser.