Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs @ 4337

Last change on this file since 4337 was 4337, checked in by cneumuel, 14 years ago

changed Slave.Core WCF-Proxy to stateless object

File size: 16.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Slave.Common;
29using HeuristicLab.PluginInfrastructure;
30using System.IO;
31using System.Runtime.Serialization.Formatters.Binary;
32using HeuristicLab.Tracing;
33using HeuristicLab.Hive.Contracts;
34using HeuristicLab.Hive.Contracts.BusinessObjects;
35using HeuristicLab.Hive.Slave.Communication.SlaveService;
36using HeuristicLab.Hive.Contracts.ResponseObjects;
37using HeuristicLab.Hive.Slave.Communication.Properties;
38
39namespace HeuristicLab.Hive.Slave.Communication {
40
41  /// <summary>
42  /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
43  /// </summary>
44  public class WcfService {
45    private static WcfService instance;
46    /// <summary>
47    /// Getter for the Instance of the WcfService
48    /// </summary>
49    /// <returns>the Instance of the WcfService class</returns>
50    public static WcfService Instance {
51      get {
52        if (instance == null) {
53          Logger.Debug("New WcfService Instance created");
54          instance = new WcfService();
55        }
56        return instance;
57      }
58    }
59
60    public DateTime ConnectedSince { get; private set; }
61    public NetworkEnum.WcfConnState ConnState { get; private set; }
62    public bool LoggedIn { get; set; }
63
64    private string serverIp;
65    public string ServerIp {
66      get { return serverIp; }
67      set {
68        if (serverIp != value) {
69          serverIp = value;
70          if (ServerChanged != null)
71            ServerChanged(this, new EventArgs());
72        }
73      }
74    }
75
76    public event EventHandler ConnectionRestored;
77    public event EventHandler ServerChanged;
78    public event EventHandler Connected;
79
80    /// <summary>
81    /// Constructor
82    /// </summary>
83    private WcfService() {
84      ConnState = NetworkEnum.WcfConnState.Disconnected;
85      LoggedIn = false;
86    }
87
88    /// <summary>
89    /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
90    /// </summary>
91    public void Connect() {
92      SlaveService.ISlaveFacade client = null;
93      try {
94        Logger.Debug("Starting the Connection Process");
95        if (String.Empty.Equals(ServerIp)) {
96          Logger.Info("No Server IP set!");
97          return;
98        }
99
100        Logger.Debug("Creating the new connection proxy");
101        client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp);
102        Logger.Debug("Created the new connection proxy");
103
104        ConnState = NetworkEnum.WcfConnState.Connected;
105        ConnectedSince = DateTime.Now;
106
107        if (Connected != null) {
108          Connected(this, new EventArgs());
109        }
110        if (ConnState == NetworkEnum.WcfConnState.Failed) {
111          ConnectionRestored(this, new EventArgs());
112        }
113      }
114      catch (Exception ex) {
115        HandleNetworkError(ex);
116      }
117      finally {
118        ServiceLocator.DisposeSlaveClient(client);
119      }
120    }
121
122    ///// <summary>
123    ///// Disconnects the Slave from the Server
124    ///// </summary>
125    public void Disconnect() {
126      ConnState = NetworkEnum.WcfConnState.Disconnected;
127      LoggedIn = false;
128    }
129
130    /// <summary>
131    /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
132    /// </summary>
133    /// <param name="e">The Exception</param>
134    private void HandleNetworkError(Exception e) {
135      ConnState = NetworkEnum.WcfConnState.Failed;
136      LoggedIn = false;
137      Logger.Error("Network exception occurred: " + e);
138    }
139
140    /// <summary>
141    /// Methods for the Server Login
142    /// </summary>
143    public void Login(SlaveDto slaveInfo) {
144      SlaveService.ISlaveFacade client = null;
145      try {
146        if (ConnState == NetworkEnum.WcfConnState.Connected) {
147          Logger.Debug("STARTED: Login Sync");
148          client = ServiceLocator.CreateSlaveFacade(ServerIp);
149          Response res = client.Login(slaveInfo);
150          if (res.StatusMessage != ResponseStatus.Ok) {
151            Logger.Error("FAILED: Login Failed! " + res.StatusMessage);
152            throw new Exception(res.StatusMessage.ToString());
153          } else {
154            Logger.Info("ENDED: Login succeeded" + res.StatusMessage);
155            LoggedIn = true;
156          }
157        }
158      }
159      catch (Exception e) {
160        HandleNetworkError(e);
161      }
162      finally {
163        ServiceLocator.DisposeSlaveClient(client);
164      }
165    }
166
167    /// <summary>
168    /// Pull a Job from the Server
169    /// </summary>
170    #region PullJob
171    public event System.EventHandler<GetJobCompletedEventArgs> GetJobCompleted;
172    public void GetJobAsync(Guid guid) {
173      if (LoggedIn) {
174        Logger.Debug("STARTED: Fetching of Jobs from Server for Slave");
175        SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp);
176        //client.GetStreamedJobAsync(guid);
177        client.BeginGetStreamedJob(guid, (ar => {
178          if (ar.IsCompleted) {
179            Stream stream = null;
180            MemoryStream memStream = null;
181            try {
182              Logger.Debug("ENDED: Fetching of Jobs from Server for Slave");
183              stream = client.EndGetStreamedJob(ar);
184
185              //first deserialize the response
186              BinaryFormatter formatter = new BinaryFormatter();
187              ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream);
188
189              //second deserialize the BLOB
190              memStream = new MemoryStream();
191
192              byte[] buffer = new byte[3024];
193              int read = 0;
194              while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
195                memStream.Write(buffer, 0, read);
196              }
197
198              memStream.Close();
199
200              GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, null, !ar.IsCompleted, ar.AsyncState);
201              GetJobCompleted(this, completedEventArgs);
202            }
203            catch (Exception ex) {
204              Logger.Error(ex);
205            }
206            finally {
207              if (stream != null)
208                stream.Dispose();
209
210              if (memStream != null)
211                memStream.Dispose();
212            }
213          } else
214            HandleNetworkError(new FaultException("GetJobAsync did not complete"));
215
216          ServiceLocator.DisposeSlaveClient(client);
217        }), null);
218      }
219    }
220
221    #endregion
222
223    /// <summary>
224    /// Send back finished Job Results
225    /// </summary>
226    #region SendJobResults
227    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> GetFinishedJobResultCompleted;
228    public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
229      if (LoggedIn) {
230        Logger.Debug("STARTED: Sending back the finished job results");
231        Logger.Debug("Building stream");
232        Stream stream = GetStreamedJobResult(clientId, jobId, result, percentage, exception);
233        Logger.Debug("Builded stream");
234        Logger.Debug("Making the call");
235        //proxy.StoreFinishedJobResultStreamedAsync(stream, stream);
236        SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp);
237        client.BeginStoreFinishedJobResultStreamed(stream, (ar => {
238          Logger.Debug("Finished storing the job");
239          if (stream != null)
240            stream.Dispose();
241
242          if (ar.IsCompleted) {
243            var res = client.EndStoreFinishedJobResultStreamed(ar);
244            StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { res }, null, false, null);
245            Logger.Debug("calling the Finished Job Event");
246            GetFinishedJobResultCompleted(this, args);
247            Logger.Debug("ENDED: Sending back the finished job results");
248          } else {
249            HandleNetworkError(new FaultException("GetFinishedJobResultAsync did not complete"));
250          }
251          ServiceLocator.DisposeSlaveClient(client);
252        }), null);
253      }
254    }
255
256    #endregion
257
258    #region Processsnapshots
259    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
260    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
261      if (LoggedIn) {
262        Stream stream = GetStreamedJobResult(clientId, jobId, result, percentage, exception);
263        //proxy.ProcessSnapshotStreamedAsync(stream, stream);
264        SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp);
265        client.BeginProcessSnapshotStreamed(stream, (ar => {
266          if (stream != null)
267            stream.Dispose();
268
269          if (ar.IsCompleted) {
270            var res = client.EndStoreFinishedJobResultStreamed(ar);
271            ProcessSnapshotCompletedEventArgs args = new ProcessSnapshotCompletedEventArgs(new object[] { res }, null, false, null);
272            ProcessSnapshotCompleted(this, args);
273          } else {
274            HandleNetworkError(new FaultException("ProcessSnapshotAsync did not complete"));
275          }
276          ServiceLocator.DisposeSlaveClient(client);
277        }), null);
278      }
279    }
280
281    #endregion
282
283    /// <summary>
284    /// Methods for sending the periodically Heartbeat
285    /// </summary>
286    #region Heartbeat
287
288    public event System.EventHandler<ProcessHeartBeatCompletedEventArgs> ProcessHeartBeatCompleted;
289    public void ProcessHeartBeatAsync(HeartBeatData hbd) {
290      if (LoggedIn) {
291        Logger.Debug("STARTING: sending heartbeat");
292        //proxy.ProcessHeartBeatAsync(hbd);
293        SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp);
294        client.BeginProcessHeartBeat(hbd, (ar => {
295          if (ar.IsCompleted) {
296            var res = client.EndProcessHeartBeat(ar);
297            if (res.StatusMessage == ResponseStatus.Ok) {
298              ProcessHeartBeatCompleted(this, new ProcessHeartBeatCompletedEventArgs(new object[] { res }, null, false, null));
299              Logger.Debug("ENDED: sending heartbeats");
300            } else {
301              Logger.Error("FAILED: sending heartbeats: " + res.StatusMessage.ToString());
302            }
303          } else {
304            HandleNetworkError(new FaultException("ProcessHeartBeatAsync did not complete"));
305          }
306          ServiceLocator.DisposeSlaveClient(client);
307        }), null);
308      }
309    }
310
311    #endregion
312
313    /// <summary>
314    /// Send back finished and Stored Job Results
315    /// </summary>
316    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
317      JobResult jobResult = new JobResult();
318      jobResult.SlaveId = clientId;
319      jobResult.JobId = jobId;
320      jobResult.Percentage = percentage;
321      jobResult.Exception = exception != null ? exception.Message : "";
322
323      MultiStream stream = new MultiStream();
324
325      //first send result
326      stream.AddStream(new StreamedObject<JobResult>(jobResult));
327
328      //second stream the job binary data
329      MemoryStream memStream = new MemoryStream(result, false);
330      stream.AddStream(memStream);
331
332      return stream;
333    }
334
335    public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
336      SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp);
337      ResponseResultReceived res = client.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception));
338      ServiceLocator.DisposeSlaveClient(client);
339      return res;
340    }
341
342    public Response IsJobStillNeeded(Guid jobId) {
343      try {
344        Logger.Debug("STARTING: Sync call: IsJobStillNeeded");
345        SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp);
346        Response res = client.IsJobStillNeeded(jobId);
347        ServiceLocator.DisposeSlaveClient(client);
348        Logger.Debug("ENDED: Sync call: IsJobStillNeeded");
349        return res;
350      }
351      catch (Exception e) {
352        HandleNetworkError(e);
353        return null;
354      }
355    }
356
357    public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
358      try {
359        SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp);
360        var res = client.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception));
361        ServiceLocator.DisposeSlaveClient(client);
362        return res;
363      }
364      catch (Exception e) {
365        HandleNetworkError(e);
366        return null;
367      }
368    }
369
370    public IEnumerable<CachedHivePluginInfoDto> RequestPlugins(List<HivePluginInfoDto> requestedPlugins) {
371      try {
372        Logger.Debug("STARTED: Requesting Plugins for Job");
373        Logger.Debug("STARTED: Getting the stream");
374        SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp);
375        Stream stream = client.GetStreamedPlugins(requestedPlugins.ToArray());
376        ServiceLocator.DisposeSlaveClient(client);
377        Logger.Debug("ENDED: Getting the stream");
378        BinaryFormatter formatter = new BinaryFormatter();
379        Logger.Debug("STARTED: Deserializing the stream");
380        ResponseList<CachedHivePluginInfoDto> response = (ResponseList<CachedHivePluginInfoDto>)formatter.Deserialize(stream);
381        Logger.Debug("ENDED: Deserializing the stream");
382        if (stream != null)
383          stream.Dispose();
384        return response.List;
385      }
386      catch (Exception e) {
387        HandleNetworkError(e);
388        return null;
389      }
390    }
391
392    public void Logout(Guid guid) {
393      try {
394        Logger.Debug("STARTED: Logout");
395        SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp);
396        client.Logout(guid);
397        ServiceLocator.DisposeSlaveClient(client);
398        Logger.Debug("ENDED: Logout");
399      }
400      catch (Exception e) {
401        HandleNetworkError(e);
402      }
403    }
404
405    public ResponseCalendar GetCalendarSync(Guid clientId) {
406      try {
407        Logger.Debug("STARTED: Syncing Calendars");
408        SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp);
409        ResponseCalendar cal = client.GetCalendar(clientId);
410        ServiceLocator.DisposeSlaveClient(client);
411        Logger.Debug("ENDED: Syncing Calendars");
412        return cal;
413      }
414      catch (Exception e) {
415        HandleNetworkError(e);
416        return null;
417      }
418    }
419
420    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
421      try {
422        Logger.Debug("STARTED: Setting Calendar status to: " + state);
423        SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp);
424        Response resp = client.SetCalendarStatus(clientId, state);
425        ServiceLocator.DisposeSlaveClient(client);
426        Logger.Debug("ENDED: Setting Calendar status to: " + state);
427        return resp;
428      }
429      catch (Exception e) {
430        HandleNetworkError(e);
431        return null;
432      }
433    }
434  }
435}
Note: See TracBrowser for help on using the repository browser.