Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs @ 4338

Last change on this file since 4338 was 4338, checked in by cneumuel, 14 years ago

improved stateless WCF-Proxy client with ServicePool

File size: 16.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using System.ServiceModel;
27using HeuristicLab.Hive.Contracts.Interfaces;
28using HeuristicLab.Hive.Slave.Common;
29using HeuristicLab.PluginInfrastructure;
30using System.IO;
31using System.Runtime.Serialization.Formatters.Binary;
32using HeuristicLab.Tracing;
33using HeuristicLab.Hive.Contracts;
34using HeuristicLab.Hive.Contracts.BusinessObjects;
35using HeuristicLab.Hive.Slave.Communication.SlaveService;
36using HeuristicLab.Hive.Contracts.ResponseObjects;
37using HeuristicLab.Hive.Slave.Communication.Properties;
38
39namespace HeuristicLab.Hive.Slave.Communication {
40
41  /// <summary>
42  /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
43  /// </summary>
44  public class WcfService {
45    private static WcfService instance;
46    /// <summary>
47    /// Getter for the Instance of the WcfService
48    /// </summary>
49    /// <returns>the Instance of the WcfService class</returns>
50    public static WcfService Instance {
51      get {
52        if (instance == null) {
53          Logger.Debug("New WcfService Instance created");
54          instance = new WcfService();
55        }
56        return instance;
57      }
58    }
59
60    public DateTime ConnectedSince { get; private set; }
61    public NetworkEnum.WcfConnState ConnState { get; private set; }
62    public bool LoggedIn { get; set; }
63
64    private string serverIp;
65    public string ServerIp {
66      get { return serverIp; }
67      set {
68        if (serverIp != value) {
69          serverIp = value;
70          if (ServerChanged != null)
71            ServerChanged(this, new EventArgs());
72        }
73      }
74    }
75
76    public event EventHandler ConnectionRestored;
77    public event EventHandler ServerChanged;
78    public event EventHandler Connected;
79
80    SlaveFacadeServicePool servicePool;
81
82    /// <summary>
83    /// Constructor
84    /// </summary>
85    private WcfService() {
86      ConnState = NetworkEnum.WcfConnState.Disconnected;
87      LoggedIn = false;
88    }
89
90    /// <summary>
91    /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
92    /// </summary>
93    public void Connect() {
94      SlaveService.ISlaveFacade client = null;
95      try {
96        Logger.Debug("Starting the Connection Process");
97        if (String.Empty.Equals(ServerIp)) {
98          Logger.Info("No Server IP set!");
99          return;
100        }
101        servicePool = new SlaveFacadeServicePool(ServerIp);
102
103        Logger.Debug("Creating the new connection proxy");
104        client = servicePool.CreateStreamedSlaveFacade();
105        Logger.Debug("Created the new connection proxy");
106
107        ConnState = NetworkEnum.WcfConnState.Connected;
108        ConnectedSince = DateTime.Now;
109
110        if (Connected != null) {
111          Connected(this, new EventArgs());
112        }
113        if (ConnState == NetworkEnum.WcfConnState.Failed) {
114          ConnectionRestored(this, new EventArgs());
115        }
116      }
117      catch (Exception ex) {
118        HandleNetworkError(ex);
119      }
120      finally {
121        servicePool.DisposeSlaveClient(client);
122      }
123    }
124
125    ///// <summary>
126    ///// Disconnects the Slave from the Server
127    ///// </summary>
128    public void Disconnect() {
129      ConnState = NetworkEnum.WcfConnState.Disconnected;
130      LoggedIn = false;
131    }
132
133    /// <summary>
134    /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
135    /// </summary>
136    /// <param name="e">The Exception</param>
137    private void HandleNetworkError(Exception e) {
138      ConnState = NetworkEnum.WcfConnState.Failed;
139      LoggedIn = false;
140      Logger.Error("Network exception occurred: " + e);
141    }
142
143    /// <summary>
144    /// Methods for the Server Login
145    /// </summary>
146    public void Login(SlaveDto slaveInfo) {
147      SlaveService.ISlaveFacade client = null;
148      try {
149        if (ConnState == NetworkEnum.WcfConnState.Connected) {
150          Logger.Debug("STARTED: Login Sync");
151          client = servicePool.CreateSlaveFacade();
152          Response res = client.Login(slaveInfo);
153          if (res.StatusMessage != ResponseStatus.Ok) {
154            Logger.Error("FAILED: Login Failed! " + res.StatusMessage);
155            throw new Exception(res.StatusMessage.ToString());
156          } else {
157            Logger.Info("ENDED: Login succeeded" + res.StatusMessage);
158            LoggedIn = true;
159          }
160        }
161      }
162      catch (Exception e) {
163        HandleNetworkError(e);
164      }
165      finally {
166        servicePool.DisposeSlaveClient(client);
167      }
168    }
169
170    /// <summary>
171    /// Pull a Job from the Server
172    /// </summary>
173    #region PullJob
174    public event System.EventHandler<GetJobCompletedEventArgs> GetJobCompleted;
175    public void GetJobAsync(Guid guid) {
176      if (LoggedIn) {
177        Logger.Debug("STARTED: Fetching of Jobs from Server for Slave");
178        SlaveService.ISlaveFacade client = servicePool.CreateStreamedSlaveFacade();
179        //client.GetStreamedJobAsync(guid);
180        client.BeginGetStreamedJob(guid, (ar => {
181          if (ar.IsCompleted) {
182            Stream stream = null;
183            MemoryStream memStream = null;
184            try {
185              Logger.Debug("ENDED: Fetching of Jobs from Server for Slave");
186              stream = client.EndGetStreamedJob(ar);
187
188              //first deserialize the response
189              BinaryFormatter formatter = new BinaryFormatter();
190              ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream);
191
192              //second deserialize the BLOB
193              memStream = new MemoryStream();
194
195              byte[] buffer = new byte[3024];
196              int read = 0;
197              while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
198                memStream.Write(buffer, 0, read);
199              }
200
201              memStream.Close();
202
203              GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, null, !ar.IsCompleted, ar.AsyncState);
204              GetJobCompleted(this, completedEventArgs);
205            }
206            catch (Exception ex) {
207              Logger.Error(ex);
208            }
209            finally {
210              if (stream != null)
211                stream.Dispose();
212
213              if (memStream != null)
214                memStream.Dispose();
215            }
216          } else
217            HandleNetworkError(new FaultException("GetJobAsync did not complete"));
218
219          servicePool.DisposeSlaveClient(client);
220        }), null);
221      }
222    }
223
224    #endregion
225
226    /// <summary>
227    /// Send back finished Job Results
228    /// </summary>
229    #region SendJobResults
230    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> GetFinishedJobResultCompleted;
231    public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
232      if (LoggedIn) {
233        Logger.Debug("STARTED: Sending back the finished job results");
234        Logger.Debug("Building stream");
235        Stream stream = GetStreamedJobResult(clientId, jobId, result, percentage, exception);
236        Logger.Debug("Builded stream");
237        Logger.Debug("Making the call");
238        //proxy.StoreFinishedJobResultStreamedAsync(stream, stream);
239        SlaveService.ISlaveFacade client = servicePool.CreateStreamedSlaveFacade();
240        client.BeginStoreFinishedJobResultStreamed(stream, (ar => {
241          Logger.Debug("Finished storing the job");
242          if (stream != null)
243            stream.Dispose();
244
245          if (ar.IsCompleted) {
246            var res = client.EndStoreFinishedJobResultStreamed(ar);
247            StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { res }, null, false, null);
248            Logger.Debug("calling the Finished Job Event");
249            GetFinishedJobResultCompleted(this, args);
250            Logger.Debug("ENDED: Sending back the finished job results");
251          } else {
252            HandleNetworkError(new FaultException("GetFinishedJobResultAsync did not complete"));
253          }
254          servicePool.DisposeSlaveClient(client);
255        }), null);
256      }
257    }
258
259    #endregion
260
261    #region Processsnapshots
262    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
263    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
264      if (LoggedIn) {
265        Stream stream = GetStreamedJobResult(clientId, jobId, result, percentage, exception);
266        //proxy.ProcessSnapshotStreamedAsync(stream, stream);
267        SlaveService.ISlaveFacade client = servicePool.CreateStreamedSlaveFacade();
268        client.BeginProcessSnapshotStreamed(stream, (ar => {
269          if (stream != null)
270            stream.Dispose();
271
272          if (ar.IsCompleted) {
273            var res = client.EndStoreFinishedJobResultStreamed(ar);
274            ProcessSnapshotCompletedEventArgs args = new ProcessSnapshotCompletedEventArgs(new object[] { res }, null, false, null);
275            ProcessSnapshotCompleted(this, args);
276          } else {
277            HandleNetworkError(new FaultException("ProcessSnapshotAsync did not complete"));
278          }
279          servicePool.DisposeSlaveClient(client);
280        }), null);
281      }
282    }
283
284    #endregion
285
286    /// <summary>
287    /// Methods for sending the periodically Heartbeat
288    /// </summary>
289    #region Heartbeat
290
291    public event System.EventHandler<ProcessHeartBeatCompletedEventArgs> ProcessHeartBeatCompleted;
292    public void ProcessHeartBeatAsync(HeartBeatData hbd) {
293      if (LoggedIn) {
294        Logger.Debug("STARTING: sending heartbeat");
295        //proxy.ProcessHeartBeatAsync(hbd);
296        SlaveService.ISlaveFacade client = servicePool.CreateStreamedSlaveFacade();
297        client.BeginProcessHeartBeat(hbd, (ar => {
298          if (ar.IsCompleted) {
299            var res = client.EndProcessHeartBeat(ar);
300            if (res.StatusMessage == ResponseStatus.Ok) {
301              ProcessHeartBeatCompleted(this, new ProcessHeartBeatCompletedEventArgs(new object[] { res }, null, false, null));
302              Logger.Debug("ENDED: sending heartbeats");
303            } else {
304              Logger.Error("FAILED: sending heartbeats: " + res.StatusMessage.ToString());
305            }
306          } else {
307            HandleNetworkError(new FaultException("ProcessHeartBeatAsync did not complete"));
308          }
309          servicePool.DisposeSlaveClient(client);
310        }), null);
311      }
312    }
313
314    #endregion
315
316    /// <summary>
317    /// Send back finished and Stored Job Results
318    /// </summary>
319    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
320      JobResult jobResult = new JobResult();
321      jobResult.SlaveId = clientId;
322      jobResult.JobId = jobId;
323      jobResult.Percentage = percentage;
324      jobResult.Exception = exception != null ? exception.Message : "";
325
326      MultiStream stream = new MultiStream();
327
328      //first send result
329      stream.AddStream(new StreamedObject<JobResult>(jobResult));
330
331      //second stream the job binary data
332      MemoryStream memStream = new MemoryStream(result, false);
333      stream.AddStream(memStream);
334
335      return stream;
336    }
337
338    public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) {
339      SlaveService.ISlaveFacade client = servicePool.CreateStreamedSlaveFacade();
340      ResponseResultReceived res = client.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception));
341      servicePool.DisposeSlaveClient(client);
342      return res;
343    }
344
345    public Response IsJobStillNeeded(Guid jobId) {
346      try {
347        Logger.Debug("STARTING: Sync call: IsJobStillNeeded");
348        SlaveService.ISlaveFacade client = servicePool.CreateStreamedSlaveFacade();
349        Response res = client.IsJobStillNeeded(jobId);
350        servicePool.DisposeSlaveClient(client);
351        Logger.Debug("ENDED: Sync call: IsJobStillNeeded");
352        return res;
353      }
354      catch (Exception e) {
355        HandleNetworkError(e);
356        return null;
357      }
358    }
359
360    public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) {
361      try {
362        SlaveService.ISlaveFacade client = servicePool.CreateStreamedSlaveFacade();
363        var res = client.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception));
364        servicePool.DisposeSlaveClient(client);
365        return res;
366      }
367      catch (Exception e) {
368        HandleNetworkError(e);
369        return null;
370      }
371    }
372
373    public IEnumerable<CachedHivePluginInfoDto> RequestPlugins(List<HivePluginInfoDto> requestedPlugins) {
374      try {
375        Logger.Debug("STARTED: Requesting Plugins for Job");
376        Logger.Debug("STARTED: Getting the stream");
377        SlaveService.ISlaveFacade client = servicePool.CreateStreamedSlaveFacade();
378        Stream stream = client.GetStreamedPlugins(requestedPlugins.ToArray());
379        servicePool.DisposeSlaveClient(client);
380        Logger.Debug("ENDED: Getting the stream");
381        BinaryFormatter formatter = new BinaryFormatter();
382        Logger.Debug("STARTED: Deserializing the stream");
383        ResponseList<CachedHivePluginInfoDto> response = (ResponseList<CachedHivePluginInfoDto>)formatter.Deserialize(stream);
384        Logger.Debug("ENDED: Deserializing the stream");
385        if (stream != null)
386          stream.Dispose();
387        return response.List;
388      }
389      catch (Exception e) {
390        HandleNetworkError(e);
391        return null;
392      }
393    }
394
395    public void Logout(Guid guid) {
396      try {
397        Logger.Debug("STARTED: Logout");
398        SlaveService.ISlaveFacade client = servicePool.CreateStreamedSlaveFacade();
399        client.Logout(guid);
400        servicePool.DisposeSlaveClient(client);
401        Logger.Debug("ENDED: Logout");
402      }
403      catch (Exception e) {
404        HandleNetworkError(e);
405      }
406    }
407
408    public ResponseCalendar GetCalendarSync(Guid clientId) {
409      try {
410        Logger.Debug("STARTED: Syncing Calendars");
411        SlaveService.ISlaveFacade client = servicePool.CreateStreamedSlaveFacade();
412        ResponseCalendar cal = client.GetCalendar(clientId);
413        servicePool.DisposeSlaveClient(client);
414        Logger.Debug("ENDED: Syncing Calendars");
415        return cal;
416      }
417      catch (Exception e) {
418        HandleNetworkError(e);
419        return null;
420      }
421    }
422
423    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
424      try {
425        Logger.Debug("STARTED: Setting Calendar status to: " + state);
426        SlaveService.ISlaveFacade client = servicePool.CreateStreamedSlaveFacade();
427        Response resp = client.SetCalendarStatus(clientId, state);
428        servicePool.DisposeSlaveClient(client);
429        Logger.Debug("ENDED: Setting Calendar status to: " + state);
430        return resp;
431      }
432      catch (Exception e) {
433        HandleNetworkError(e);
434        return null;
435      }
436    }
437  }
438}
Note: See TracBrowser for help on using the repository browser.