Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs @ 4289

Last change on this file since 4289 was 4289, checked in by cneumuel, 14 years ago

added role-permission checks, updated service reference (#1168)

File size: 16.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Slave.Common;
29using HeuristicLab.PluginInfrastructure;
30using System.IO;
31using System.Runtime.Serialization.Formatters.Binary;
32using HeuristicLab.Tracing;
33using HeuristicLab.Hive.Contracts;
34using HeuristicLab.Hive.Contracts.BusinessObjects;
35
36namespace HeuristicLab.Hive.Slave.Communication {
37  using Service = HeuristicLab.Hive.Slave.Communication.ServerService;
38  using HeuristicLab.Hive.Slave.Communication.ServerService;
39  using HeuristicLab.Hive.Contracts.ResponseObjects;
40
41  /// <summary>
42  /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
43  /// </summary>
44  public class WcfService {
45    private static WcfService instance;
46    /// <summary>
47    /// Getter for the Instance of the WcfService
48    /// </summary>
49    /// <returns>the Instance of the WcfService class</returns>
50    public static WcfService Instance {
51      get {
52        if (instance == null) {
53          Logger.Debug("New WcfService Instance created");
54          instance = new WcfService();
55        }
56        return instance;
57      }
58    }
59
60    public DateTime ConnectedSince { get; private set; }
61    public NetworkEnum.WcfConnState ConnState { get; private set; }
62    public bool LoggedIn { get; set; }
63    public string ServerIP { get; private set; }
64    public int ServerPort { get; private set; }
65
66    public event EventHandler ConnectionRestored;
67    public event EventHandler ServerChanged;
68    public event EventHandler Connected;
69
70    public SlaveFacadeClient proxy = null;
71
72    /// <summary>
73    /// Constructor
74    /// </summary>
75    private WcfService() {
76      ConnState = NetworkEnum.WcfConnState.Disconnected;
77      LoggedIn = false;
78    }
79
80    /// <summary>
81    /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
82    /// </summary>
83    public void Connect() {
84      try {
85        Logger.Debug("Starting the Connection Process");
86        if (String.Empty.Equals(ServerIP) || ServerPort == 0) {
87          Logger.Info("No Server IP or Port set!");
88          return;
89        }
90
91        Logger.Debug("Creating the new connection proxy");
92        proxy = new SlaveFacadeClient(
93          HeuristicLab.Hive.Contracts.WcfSettings.GetStreamedBinding(),
94          new EndpointAddress("net.tcp://" + ServerIP + ":" + ServerPort + "/HiveServer/SlaveCommunicator")
95        );
96        Logger.Debug("Created the new connection proxy");
97
98        Logger.Debug("Registring new Events");
99        proxy.LoginCompleted += new EventHandler<LoginCompletedEventArgs>(proxy_LoginCompleted);
100        proxy.GetStreamedJobCompleted += new EventHandler<GetStreamedJobCompletedEventArgs>(proxy_GetStreamedJobCompleted);
101        proxy.StoreFinishedJobResultStreamedCompleted += new EventHandler<StoreFinishedJobResultStreamedCompletedEventArgs>(proxy_StoreFinishedJobResultStreamedCompleted);
102        proxy.ProcessSnapshotStreamedCompleted += new EventHandler<ProcessSnapshotStreamedCompletedEventArgs>(proxy_ProcessSnapshotStreamedCompleted);
103        proxy.ProcessHeartBeatCompleted += new EventHandler<ProcessHeartBeatCompletedEventArgs>(proxy_ProcessHeartBeatCompleted);
104        Logger.Debug("Registered new Events");
105        Logger.Debug("Opening the Connection");
106        proxy.Open();
107        Logger.Debug("Opened the Connection");
108
109        ConnState = NetworkEnum.WcfConnState.Connected;
110        ConnectedSince = DateTime.Now;
111        LoggedIn = false;
112
113        if (Connected != null) {
114          Logger.Debug("Calling the connected Event");
115          Connected(this, new EventArgs());
116          //Todo: This won't be hit. EVER       
117        }
118        if (ConnState == NetworkEnum.WcfConnState.Failed) {
119          ConnectionRestored(this, new EventArgs());
120        }
121      } catch (Exception ex) {
122        HandleNetworkError(ex);
123      }
124    }
125
126
127    /// <summary>
128    /// Changes the Connectionsettings (serverIP & serverPort) and reconnects
129    /// </summary>
130    /// <param name="serverIP">current Server IP</param>
131    /// <param name="serverPort">current Server Port</param>
132    public void Connect(String serverIP, int serverPort) {
133      Logger.Debug("Called Connected with " + serverIP + ":" + serverPort);
134      String oldIp = this.ServerIP;
135      int oldPort = this.ServerPort;
136      this.ServerIP = serverIP;
137      this.ServerPort = serverPort;
138      Connect();
139      if (oldIp != serverIP || oldPort != ServerPort)
140        if (ServerChanged != null)
141          ServerChanged(this, new EventArgs());
142    }
143
144    public void SetIPAndPort(String serverIP, int serverPort) {
145      Logger.Debug("Called with " + serverIP + ":" + serverPort);
146      this.ServerIP = serverIP;
147      this.ServerPort = serverPort;
148    }
149
150    /// <summary>
151    /// Disconnects the Slave from the Server
152    /// </summary>
153    public void Disconnect() {
154      ConnState = NetworkEnum.WcfConnState.Disconnected;
155      LoggedIn = false;
156    }
157
158    /// <summary>
159    /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
160    /// </summary>
161    /// <param name="e">The Exception</param>
162    private void HandleNetworkError(Exception e) {
163      ConnState = NetworkEnum.WcfConnState.Failed;
164      LoggedIn = false;
165      Logger.Error("Network exception occurred: " + e);
166    }
167
168    /// <summary>
169    /// Methods for the Server Login
170    /// </summary>
171    #region Login
172    public event System.EventHandler<LoginCompletedEventArgs> LoginCompleted;
173    public void LoginAsync(SlaveDto clientInfo) {
174      if (ConnState == NetworkEnum.WcfConnState.Connected) {
175        Logger.Debug("STARTED: Login Async");
176        proxy.LoginAsync(clientInfo.Id);
177      }
178    }
179    private void proxy_LoginCompleted(object sender, LoginCompletedEventArgs e) {
180      if (e.Error == null) {
181        Logger.Debug("ENDED: Login Async");
182        LoggedIn = true;
183        LoginCompleted(sender, e);
184      } else
185        HandleNetworkError(e.Error.InnerException);
186    }
187
188    public void LoginSync(SlaveDto clientInfo) {
189      try {
190        if (ConnState == NetworkEnum.WcfConnState.Connected) {
191          Logger.Debug("STARTED: Login Sync");
192          Response res = proxy.Login(clientInfo.Id);
193          if (res.StatusMessage != ResponseStatus.Ok) {
194            Logger.Error("FAILED: Login Failed! " + res.StatusMessage);
195            throw new Exception(res.StatusMessage.ToString());
196          } else {
197            Logger.Info("ENDED: Login succeeded" + res.StatusMessage);
198            LoggedIn = true;
199          }
200        }
201      } catch (Exception e) {
202        HandleNetworkError(e);
203      }
204    }
205
206    #endregion
207
208    /// <summary>
209    /// Pull a Job from the Server
210    /// </summary>
211    #region PullJob
212    public event System.EventHandler<GetJobCompletedEventArgs> GetJobCompleted;
213    public void GetJobAsync(Guid guid) {
214      if (LoggedIn) {
215        Logger.Debug("STARTED: Fetching of Jobs from Server for Slave");
216        proxy.GetStreamedJobAsync(guid);
217      }
218    }
219
220    void proxy_GetStreamedJobCompleted(object sender, GetStreamedJobCompletedEventArgs e) {
221      if (e.Error == null) {
222        Logger.Debug("ENDED: Fetching of Jobs from Server for Slave");
223        Stream stream = null;
224        MemoryStream memStream = null;
225
226        try {
227          stream = (Stream)e.Result;
228
229          //first deserialize the response
230          BinaryFormatter formatter = new BinaryFormatter();
231          ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream);
232
233          //second deserialize the BLOB
234          memStream = new MemoryStream();
235
236          byte[] buffer = new byte[3024];
237          int read = 0;
238          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
239            memStream.Write(buffer, 0, read);
240          }
241
242          memStream.Close();
243
244          GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, e.Error, e.Cancelled, e.UserState);
245          GetJobCompleted(sender, completedEventArgs);
246        } catch (Exception ex) {
247          Logger.Error(ex);
248        } finally {
249          if (stream != null)
250            stream.Dispose();
251
252          if (memStream != null)
253            memStream.Dispose();
254        }
255      } else
256        HandleNetworkError(e.Error);
257    }
258
259    #endregion
260
261    /// <summary>
262    /// Send back finished Job Results
263    /// </summary>
264    #region SendJobResults
265    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> GetFinishedJobResultCompleted;
266    public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
267      if (LoggedIn) {
268        Logger.Debug("STARTED: Sending back the finished job results");
269        Logger.Debug("Building stream");
270        Stream stream = GetStreamedJobResult(clientId, jobId, result, percentage, exception);
271        Logger.Debug("Builded stream");
272        Logger.Debug("Making the call");
273        proxy.StoreFinishedJobResultStreamedAsync(stream, stream);
274      }
275    }
276
277    private void proxy_StoreFinishedJobResultStreamedCompleted(object sender, StoreFinishedJobResultStreamedCompletedEventArgs e) {
278      Logger.Debug("Finished storing the job");
279      Stream stream = (Stream)e.UserState;
280      if (stream != null) {
281        Logger.Debug("Stream not null, disposing it");
282        stream.Dispose();
283      }
284      if (e.Error == null) {
285        StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { e.Result }, e.Error, e.Cancelled, e.UserState);
286        Logger.Debug("calling the Finished Job Event");
287        GetFinishedJobResultCompleted(sender, args);
288        Logger.Debug("ENDED: Sending back the finished job results");
289      } else {
290        HandleNetworkError(e.Error);
291      }
292    }
293
294    #endregion
295
296    #region Processsnapshots
297    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
298    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
299      if (LoggedIn) {
300        Stream stream = GetStreamedJobResult(clientId, jobId, result, percentage, exception);
301        proxy.ProcessSnapshotStreamedAsync(stream, stream);
302      }
303    }
304
305    void proxy_ProcessSnapshotStreamedCompleted(object sender, ProcessSnapshotStreamedCompletedEventArgs e) {
306      Stream stream =
307        (Stream)e.UserState;
308      if (stream != null)
309        stream.Dispose();
310
311      if (e.Error == null) {
312        ProcessSnapshotCompletedEventArgs args =
313          new ProcessSnapshotCompletedEventArgs(
314            new object[] { e.Result }, e.Error, e.Cancelled, e.UserState);
315
316        ProcessSnapshotCompleted(sender, args);
317      } else
318        HandleNetworkError(e.Error);
319    }
320
321    #endregion
322
323    /// <summary>
324    /// Methods for sending the periodically Heartbeat
325    /// </summary>
326    #region Heartbeat
327
328    public event System.EventHandler<ProcessHeartBeatCompletedEventArgs> ProcessHeartBeatCompleted;
329    public void ProcessHeartBeatAsync(HeartBeatData hbd) {
330      if (LoggedIn)
331        Logger.Debug("STARTING: sending heartbeat");
332      proxy.ProcessHeartBeatAsync(hbd);
333    }
334
335    private void proxy_ProcessHeartBeatCompleted(object sender, ProcessHeartBeatCompletedEventArgs e) {
336      if (e.Error == null && e.Result.StatusMessage == ResponseStatus.Ok) {
337        ProcessHeartBeatCompleted(sender, e);
338        Logger.Debug("ENDED: sending heartbeats");
339      } else {
340        try {
341          Logger.Error("Error: " + e.Result.StatusMessage);
342        } catch (Exception ex) {
343          Logger.Error("Error: ", ex);
344        }
345        HandleNetworkError(e.Error);
346      }
347    }
348
349    #endregion
350
351    /// <summary>
352    /// Send back finished and Stored Job Results
353    /// </summary>
354    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
355      JobResult jobResult = new JobResult();
356      jobResult.SlaveId = clientId;
357      jobResult.JobId = jobId;
358      jobResult.Percentage = percentage;
359      jobResult.Exception = exception != null ? exception.Message : "";
360
361      MultiStream stream = new MultiStream();
362
363      //first send result
364      stream.AddStream(new StreamedObject<JobResult>(jobResult));
365
366      //second stream the job binary data
367      MemoryStream memStream = new MemoryStream(result, false);
368      stream.AddStream(memStream);
369
370      return stream;
371    }
372
373    public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
374      return proxy.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception));
375    }
376
377    public Response IsJobStillNeeded(Guid jobId) {
378      try {
379        Logger.Debug("STARTING: Sync call: IsJobStillNeeded");
380        Response res = proxy.IsJobStillNeeded(jobId);
381        Logger.Debug("ENDED: Sync call: IsJobStillNeeded");
382        return res;
383      } catch (Exception e) {
384        HandleNetworkError(e);
385        return null;
386      }
387    }
388
389    public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
390      try {
391        return proxy.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception));
392      } catch (Exception e) {
393        HandleNetworkError(e);
394        return null;
395      }
396    }
397
398    public IEnumerable<CachedHivePluginInfoDto> RequestPlugins(List<HivePluginInfoDto> requestedPlugins) {
399      try {
400        Logger.Debug("STARTED: Requesting Plugins for Job");
401        Logger.Debug("STARTED: Getting the stream");
402        Stream stream = proxy.GetStreamedPlugins(requestedPlugins.ToArray());
403        Logger.Debug("ENDED: Getting the stream");
404        BinaryFormatter formatter = new BinaryFormatter();
405        Logger.Debug("STARTED: Deserializing the stream");
406        ResponseList<CachedHivePluginInfoDto> response = (ResponseList<CachedHivePluginInfoDto>)formatter.Deserialize(stream);
407        Logger.Debug("ENDED: Deserializing the stream");
408        if (stream != null)
409          stream.Dispose();
410        return response.List;
411      } catch (Exception e) {
412        HandleNetworkError(e);
413        return null;
414      }
415    }
416
417    public void Logout(Guid guid) {
418      try {
419        Logger.Debug("STARTED: Logout");
420        proxy.Logout(guid);
421        Logger.Debug("ENDED: Logout");
422      } catch (Exception e) {
423        HandleNetworkError(e);
424      }
425    }
426
427    public ResponseCalendar GetCalendarSync(Guid clientId) {
428      try {
429        Logger.Debug("STARTED: Syncing Calendars");
430        ResponseCalendar cal = proxy.GetCalendar(clientId);
431        Logger.Debug("ENDED: Syncing Calendars");
432        return cal;
433      } catch (Exception e) {
434        HandleNetworkError(e);
435        return null;
436      }
437    }
438
439    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
440      try {
441        Logger.Debug("STARTED: Setting Calendar status to: " + state);
442        Response resp = proxy.SetCalendarStatus(clientId, state);
443        Logger.Debug("ENDED: Setting Calendar status to: " + state);
444        return resp;
445      } catch (Exception e) {
446        HandleNetworkError(e);
447        return null;
448      }
449    }
450  }
451}
Note: See TracBrowser for help on using the repository browser.