Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs @ 4305

Last change on this file since 4305 was 4305, checked in by cneumuel, 14 years ago

added streamedHttpEndpoit binding (without message-security (for now)) (#1168)

File size: 16.0 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Slave.Common;
29using HeuristicLab.PluginInfrastructure;
30using System.IO;
31using System.Runtime.Serialization.Formatters.Binary;
32using HeuristicLab.Tracing;
33using HeuristicLab.Hive.Contracts;
34using HeuristicLab.Hive.Contracts.BusinessObjects;
35using HeuristicLab.Hive.Slave.Communication.SlaveService;
36using HeuristicLab.Hive.Contracts.ResponseObjects;
37using HeuristicLab.Hive.Slave.Communication.Properties;
38
39namespace HeuristicLab.Hive.Slave.Communication {
40
41  /// <summary>
42  /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
43  /// </summary>
44  public class WcfService {
45    private static WcfService instance;
46    /// <summary>
47    /// Getter for the Instance of the WcfService
48    /// </summary>
49    /// <returns>the Instance of the WcfService class</returns>
50    public static WcfService Instance {
51      get {
52        if (instance == null) {
53          Logger.Debug("New WcfService Instance created");
54          instance = new WcfService();
55        }
56        return instance;
57      }
58    }
59
60    public DateTime ConnectedSince { get; private set; }
61    public NetworkEnum.WcfConnState ConnState { get; private set; }
62    public bool LoggedIn { get; set; }
63    public string ServerIP { get; private set; }
64    public int ServerPort { get; private set; }
65
66    public event EventHandler ConnectionRestored;
67    public event EventHandler ServerChanged;
68    public event EventHandler Connected;
69
70    public SlaveFacadeClient proxy = null;
71
72    /// <summary>
73    /// Constructor
74    /// </summary>
75    private WcfService() {
76      ConnState = NetworkEnum.WcfConnState.Disconnected;
77      LoggedIn = false;
78    }
79
80    /// <summary>
81    /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
82    /// </summary>
83    public void Connect() {
84      try {
85        Logger.Debug("Starting the Connection Process");
86        if (String.Empty.Equals(ServerIP) || ServerPort == 0) {
87          Logger.Info("No Server IP or Port set!");
88          return;
89        }
90
91        Logger.Debug("Creating the new connection proxy");
92        proxy = CreateSlaveFacadeClient();
93        Logger.Debug("Created the new connection proxy");
94
95        Logger.Debug("Registring new Events");
96        proxy.GetStreamedJobCompleted += new EventHandler<GetStreamedJobCompletedEventArgs>(proxy_GetStreamedJobCompleted);
97        proxy.StoreFinishedJobResultStreamedCompleted += new EventHandler<StoreFinishedJobResultStreamedCompletedEventArgs>(proxy_StoreFinishedJobResultStreamedCompleted);
98        proxy.ProcessSnapshotStreamedCompleted += new EventHandler<ProcessSnapshotStreamedCompletedEventArgs>(proxy_ProcessSnapshotStreamedCompleted);
99        proxy.ProcessHeartBeatCompleted += new EventHandler<ProcessHeartBeatCompletedEventArgs>(proxy_ProcessHeartBeatCompleted);
100        Logger.Debug("Registered new Events");
101        Logger.Debug("Opening the Connection");
102        proxy.ClientCredentials.UserName.UserName = Settings.Default.HiveUsername;
103        proxy.ClientCredentials.UserName.Password = Settings.Default.HivePassword;
104        proxy.Open();
105        Logger.Debug("Opened the Connection");
106
107        ConnState = NetworkEnum.WcfConnState.Connected;
108        ConnectedSince = DateTime.Now;
109        LoggedIn = false;
110
111        if (Connected != null) {
112          Logger.Debug("Calling the connected Event");
113          Connected(this, new EventArgs());
114          //Todo: This won't be hit. EVER       
115        }
116        if (ConnState == NetworkEnum.WcfConnState.Failed) {
117          ConnectionRestored(this, new EventArgs());
118        }
119      } catch (Exception ex) {
120        HandleNetworkError(ex);
121      }
122    }
123
124    private SlaveFacadeClient CreateSlaveFacadeClient() {
125      SlaveFacadeClient client = new SlaveFacadeClient("SlaveHttpEndpoint");
126      WcfSettings.SetEndpointAddress(client.Endpoint, string.Format("http://{0}:{1}/{2}", ServerIP, ServerPort, WcfSettings.SlaveServiceName));
127      return client;
128    }
129
130
131    /// <summary>
132    /// Changes the Connectionsettings (serverIP & serverPort) and reconnects
133    /// </summary>
134    /// <param name="serverIP">current Server IP</param>
135    /// <param name="serverPort">current Server Port</param>
136    public void Connect(String serverIP, int serverPort) {
137      Logger.Debug("Called Connected with " + serverIP + ":" + serverPort);
138      String oldIp = this.ServerIP;
139      int oldPort = this.ServerPort;
140      this.ServerIP = serverIP;
141      this.ServerPort = serverPort;
142      Connect();
143      if (oldIp != serverIP || oldPort != ServerPort)
144        if (ServerChanged != null)
145          ServerChanged(this, new EventArgs());
146    }
147
148    public void SetIPAndPort(String serverIP, int serverPort) {
149      Logger.Debug("Called with " + serverIP + ":" + serverPort);
150      this.ServerIP = serverIP;
151      this.ServerPort = serverPort;
152    }
153
154    /// <summary>
155    /// Disconnects the Slave from the Server
156    /// </summary>
157    public void Disconnect() {
158      ConnState = NetworkEnum.WcfConnState.Disconnected;
159      LoggedIn = false;
160    }
161
162    /// <summary>
163    /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
164    /// </summary>
165    /// <param name="e">The Exception</param>
166    private void HandleNetworkError(Exception e) {
167      ConnState = NetworkEnum.WcfConnState.Failed;
168      LoggedIn = false;
169      Logger.Error("Network exception occurred: " + e);
170    }
171
172    /// <summary>
173    /// Methods for the Server Login
174    /// </summary>
175    public void LoginSync(SlaveDto slaveInfo) {
176      try {
177        if (ConnState == NetworkEnum.WcfConnState.Connected) {
178          Logger.Debug("STARTED: Login Sync");
179          Response res = proxy.Login(slaveInfo);
180          if (res.StatusMessage != ResponseStatus.Ok) {
181            Logger.Error("FAILED: Login Failed! " + res.StatusMessage);
182            throw new Exception(res.StatusMessage.ToString());
183          } else {
184            Logger.Info("ENDED: Login succeeded" + res.StatusMessage);
185            LoggedIn = true;
186          }
187        }
188      } catch (Exception e) {
189        HandleNetworkError(e);
190      }
191    }
192   
193    /// <summary>
194    /// Pull a Job from the Server
195    /// </summary>
196    #region PullJob
197    public event System.EventHandler<GetJobCompletedEventArgs> GetJobCompleted;
198    public void GetJobAsync(Guid guid) {
199      if (LoggedIn) {
200        Logger.Debug("STARTED: Fetching of Jobs from Server for Slave");
201        proxy.GetStreamedJobAsync(guid);
202      }
203    }
204
205    void proxy_GetStreamedJobCompleted(object sender, GetStreamedJobCompletedEventArgs e) {
206      if (e.Error == null) {
207        Logger.Debug("ENDED: Fetching of Jobs from Server for Slave");
208        Stream stream = null;
209        MemoryStream memStream = null;
210
211        try {
212          stream = (Stream)e.Result;
213
214          //first deserialize the response
215          BinaryFormatter formatter = new BinaryFormatter();
216          ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream);
217
218          //second deserialize the BLOB
219          memStream = new MemoryStream();
220
221          byte[] buffer = new byte[3024];
222          int read = 0;
223          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
224            memStream.Write(buffer, 0, read);
225          }
226
227          memStream.Close();
228
229          GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, e.Error, e.Cancelled, e.UserState);
230          GetJobCompleted(sender, completedEventArgs);
231        } catch (Exception ex) {
232          Logger.Error(ex);
233        } finally {
234          if (stream != null)
235            stream.Dispose();
236
237          if (memStream != null)
238            memStream.Dispose();
239        }
240      } else
241        HandleNetworkError(e.Error);
242    }
243
244    #endregion
245
246    /// <summary>
247    /// Send back finished Job Results
248    /// </summary>
249    #region SendJobResults
250    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> GetFinishedJobResultCompleted;
251    public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
252      if (LoggedIn) {
253        Logger.Debug("STARTED: Sending back the finished job results");
254        Logger.Debug("Building stream");
255        Stream stream = GetStreamedJobResult(clientId, jobId, result, percentage, exception);
256        Logger.Debug("Builded stream");
257        Logger.Debug("Making the call");
258        proxy.StoreFinishedJobResultStreamedAsync(stream, stream);
259      }
260    }
261
262    private void proxy_StoreFinishedJobResultStreamedCompleted(object sender, StoreFinishedJobResultStreamedCompletedEventArgs e) {
263      Logger.Debug("Finished storing the job");
264      Stream stream = (Stream)e.UserState;
265      if (stream != null) {
266        Logger.Debug("Stream not null, disposing it");
267        stream.Dispose();
268      }
269      if (e.Error == null) {
270        StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { e.Result }, e.Error, e.Cancelled, e.UserState);
271        Logger.Debug("calling the Finished Job Event");
272        GetFinishedJobResultCompleted(sender, args);
273        Logger.Debug("ENDED: Sending back the finished job results");
274      } else {
275        HandleNetworkError(e.Error);
276      }
277    }
278
279    #endregion
280
281    #region Processsnapshots
282    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
283    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
284      if (LoggedIn) {
285        Stream stream = GetStreamedJobResult(clientId, jobId, result, percentage, exception);
286        proxy.ProcessSnapshotStreamedAsync(stream, stream);
287      }
288    }
289
290    void proxy_ProcessSnapshotStreamedCompleted(object sender, ProcessSnapshotStreamedCompletedEventArgs e) {
291      Stream stream =
292        (Stream)e.UserState;
293      if (stream != null)
294        stream.Dispose();
295
296      if (e.Error == null) {
297        ProcessSnapshotCompletedEventArgs args =
298          new ProcessSnapshotCompletedEventArgs(
299            new object[] { e.Result }, e.Error, e.Cancelled, e.UserState);
300
301        ProcessSnapshotCompleted(sender, args);
302      } else
303        HandleNetworkError(e.Error);
304    }
305
306    #endregion
307
308    /// <summary>
309    /// Methods for sending the periodically Heartbeat
310    /// </summary>
311    #region Heartbeat
312
313    public event System.EventHandler<ProcessHeartBeatCompletedEventArgs> ProcessHeartBeatCompleted;
314    public void ProcessHeartBeatAsync(HeartBeatData hbd) {
315      if (LoggedIn)
316        Logger.Debug("STARTING: sending heartbeat");
317      proxy.ProcessHeartBeatAsync(hbd);
318    }
319
320    private void proxy_ProcessHeartBeatCompleted(object sender, ProcessHeartBeatCompletedEventArgs e) {
321      if (e.Error == null && e.Result.StatusMessage == ResponseStatus.Ok) {
322        ProcessHeartBeatCompleted(sender, e);
323        Logger.Debug("ENDED: sending heartbeats");
324      } else {
325        try {
326          Logger.Error("Error: " + e.Result.StatusMessage);
327        } catch (Exception ex) {
328          Logger.Error("Error: ", ex);
329        }
330        HandleNetworkError(e.Error);
331      }
332    }
333
334    #endregion
335
336    /// <summary>
337    /// Send back finished and Stored Job Results
338    /// </summary>
339    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
340      JobResult jobResult = new JobResult();
341      jobResult.SlaveId = clientId;
342      jobResult.JobId = jobId;
343      jobResult.Percentage = percentage;
344      jobResult.Exception = exception != null ? exception.Message : "";
345
346      MultiStream stream = new MultiStream();
347
348      //first send result
349      stream.AddStream(new StreamedObject<JobResult>(jobResult));
350
351      //second stream the job binary data
352      MemoryStream memStream = new MemoryStream(result, false);
353      stream.AddStream(memStream);
354
355      return stream;
356    }
357
358    public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
359      return proxy.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception));
360    }
361
362    public Response IsJobStillNeeded(Guid jobId) {
363      try {
364        Logger.Debug("STARTING: Sync call: IsJobStillNeeded");
365        Response res = proxy.IsJobStillNeeded(jobId);
366        Logger.Debug("ENDED: Sync call: IsJobStillNeeded");
367        return res;
368      } catch (Exception e) {
369        HandleNetworkError(e);
370        return null;
371      }
372    }
373
374    public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
375      try {
376        return proxy.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception));
377      } catch (Exception e) {
378        HandleNetworkError(e);
379        return null;
380      }
381    }
382
383    public IEnumerable<CachedHivePluginInfoDto> RequestPlugins(List<HivePluginInfoDto> requestedPlugins) {
384      try {
385        Logger.Debug("STARTED: Requesting Plugins for Job");
386        Logger.Debug("STARTED: Getting the stream");
387        Stream stream = proxy.GetStreamedPlugins(requestedPlugins.ToArray());
388        Logger.Debug("ENDED: Getting the stream");
389        BinaryFormatter formatter = new BinaryFormatter();
390        Logger.Debug("STARTED: Deserializing the stream");
391        ResponseList<CachedHivePluginInfoDto> response = (ResponseList<CachedHivePluginInfoDto>)formatter.Deserialize(stream);
392        Logger.Debug("ENDED: Deserializing the stream");
393        if (stream != null)
394          stream.Dispose();
395        return response.List;
396      } catch (Exception e) {
397        HandleNetworkError(e);
398        return null;
399      }
400    }
401
402    public void Logout(Guid guid) {
403      try {
404        Logger.Debug("STARTED: Logout");
405        proxy.Logout(guid);
406        Logger.Debug("ENDED: Logout");
407      } catch (Exception e) {
408        HandleNetworkError(e);
409      }
410    }
411
412    public ResponseCalendar GetCalendarSync(Guid clientId) {
413      try {
414        Logger.Debug("STARTED: Syncing Calendars");
415        ResponseCalendar cal = proxy.GetCalendar(clientId);
416        Logger.Debug("ENDED: Syncing Calendars");
417        return cal;
418      } catch (Exception e) {
419        HandleNetworkError(e);
420        return null;
421      }
422    }
423
424    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
425      try {
426        Logger.Debug("STARTED: Setting Calendar status to: " + state);
427        Response resp = proxy.SetCalendarStatus(clientId, state);
428        Logger.Debug("ENDED: Setting Calendar status to: " + state);
429        return resp;
430      } catch (Exception e) {
431        HandleNetworkError(e);
432        return null;
433      }
434    }
435  }
436}
Note: See TracBrowser for help on using the repository browser.