Free cookie consent management tool by TermsFeed Policy Generator

source: branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs @ 4539

Last change on this file since 4539 was 4424, checked in by cneumuel, 14 years ago
  • Added and updated License Information in every file
  • Sort and remove usings in every file
  • Deleted obsolete DataAccess.ADOHelper
  • Deleted some obsolete files
File size: 19.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.Serialization.Formatters.Binary;
26using System.ServiceModel;
27using HeuristicLab.Common;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts.ResponseObjects;
31using HeuristicLab.Hive.Slave.Common;
32using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
33using HeuristicLab.PluginInfrastructure;
34using HeuristicLab.Tracing;
35
36namespace HeuristicLab.Hive.Slave.Communication {
37
38  /// <summary>
39  /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
40  /// </summary>
41  public class WcfService {
42    private static WcfService instance;
43    /// <summary>
44    /// Getter for the Instance of the WcfService
45    /// </summary>
46    /// <returns>the Instance of the WcfService class</returns>
47    public static WcfService Instance {
48      get {
49        if (instance == null) {
50          Logger.Debug("New WcfService Instance created");
51          instance = new WcfService();
52        }
53        return instance;
54      }
55    }
56
57    public DateTime ConnectedSince { get; private set; }
58    public NetworkEnum.WcfConnState ConnState { get; private set; }
59
60    private string serverIp;
61    public string ServerIp {
62      get { return serverIp; }
63      set {
64        if (serverIp != value) {
65          serverIp = value;
66        }
67      }
68    }
69
70    public event EventHandler Connected;
71    public void OnConnected() {
72      var handler = Connected;
73      if (handler != null) handler(this, EventArgs.Empty);
74    }
75
76    /// <summary>
77    /// Constructor
78    /// </summary>
79    private WcfService() {
80      ConnState = NetworkEnum.WcfConnState.Disconnected;
81    }
82
83    /// <summary>
84    /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
85    /// </summary>
86    public void Connect(SlaveDto slaveInfo) {
87      ServiceLocator.Instance.HostAddress = ServerIp;
88      RegisterServiceEvents();
89      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
90        try {
91          Logger.Debug("Starting the Connection Process");
92          if (String.Empty.Equals(ServerIp)) {
93            Logger.Info("No Server IP set!");
94            return;
95          }
96          ConnState = NetworkEnum.WcfConnState.Connected;
97          ConnectedSince = DateTime.Now;
98          service.Obj.Login(slaveInfo);
99          OnConnected();
100        }
101        catch (Exception ex) {
102          HandleNetworkError(ex);
103        }
104      }
105    }
106
107    private void RegisterServiceEvents() {
108      ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
109      ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
110    }
111
112    private void DeregisterServiceEvents() {
113      ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured -= new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
114      ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured -= new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
115    }
116
117    void ClientFacadePool_ExceptionOccured(object sender, EventArgs<Exception> e) {
118      HandleNetworkError(e.Value);
119      Logger.Error("An exception occured in the WCF-Communication: " + e.Value.ToString());
120    }
121
122    ///// <summary>
123    ///// Disconnects the Slave from the Server
124    ///// </summary>
125    public void Disconnect() {
126      ConnState = NetworkEnum.WcfConnState.Disconnected;
127      DeregisterServiceEvents();
128    }
129
130    /// <summary>
131    /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
132    /// </summary>
133    /// <param name="e">The Exception</param>
134    private void HandleNetworkError(Exception e) {
135      ConnState = NetworkEnum.WcfConnState.Failed;
136      DeregisterServiceEvents();
137      Logger.Error("Network exception occurred: " + e);
138    }
139
140    /// <summary>
141    /// Methods for the Server Login
142    /// </summary>
143    //public void Login(SlaveDto slaveInfo) {
144    //  try {
145    //    using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.SlaveFacadePool.GetService()) {
146    //      if (ConnState == NetworkEnum.WcfConnState.Connected) {
147    //        Logger.Debug("STARTED: Login Sync");
148    //        Response res = service.Obj.Login(slaveInfo);
149    //        if (res.StatusMessage != ResponseStatus.Ok) {
150    //          Logger.Error("FAILED: Login Failed! " + res.StatusMessage);
151    //          throw new Exception(res.StatusMessage.ToString());
152    //        } else {
153    //          Logger.Info("ENDED: Login succeeded" + res.StatusMessage);
154    //        }
155    //      }
156    //    }
157    //  }
158    //  catch (Exception e) {
159    //    OnExceptionOccured(e);
160    //  }
161    //}
162
163    /// <summary>
164    /// Pull a Job from the Server
165    /// </summary>
166    #region PullJob
167    public event System.EventHandler<GetJobCompletedEventArgs> GetJobCompleted;
168    public void GetJobAsync(Guid guid) {
169      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
170      Logger.Debug("STARTED: Fetching of Jobs from Server for Slave");
171      service.Obj.BeginGetStreamedJob(guid, (ar => {
172        Stream stream = null;
173        MemoryStream memStream = null;
174        try {
175          if (ar.IsCompleted) {
176            Logger.Debug("ENDED: Fetching of Jobs from Server for Slave");
177            stream = service.Obj.EndGetStreamedJob(ar);
178
179            //first deserialize the response
180            BinaryFormatter formatter = new BinaryFormatter();
181            ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream);
182
183            //second deserialize the BLOB
184            memStream = new MemoryStream();
185
186            byte[] buffer = new byte[3024];
187            int read = 0;
188            while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
189              memStream.Write(buffer, 0, read);
190            }
191
192            memStream.Close();
193
194            GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, null, !ar.IsCompleted, ar.AsyncState);
195            GetJobCompleted(this, completedEventArgs);
196
197          } else {
198            HandleNetworkError(new FaultException("GetJobAsync did not complete"));
199          }
200        }
201        catch (Exception e) {
202          OnExceptionOccured(e);
203        }
204        finally {
205          if (stream != null)
206            stream.Dispose();
207
208          if (memStream != null)
209            memStream.Dispose();
210
211          try { service.Dispose(); }
212          catch (Exception e) { OnExceptionOccured(e); }
213        }
214      }), null);
215    }
216
217    #endregion
218
219    /// <summary>
220    /// Send back finished Job Results
221    /// </summary>
222    #region SendJobResults
223    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> GetFinishedJobResultCompleted;
224    public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
225      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
226      Logger.Debug("STARTED: Sending back the finished job results");
227      Logger.Debug("Building stream");
228      Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
229      Logger.Debug("Builded stream");
230      Logger.Debug("Making the call");
231
232      service.Obj.BeginStoreFinishedJobResultStreamed(stream, (ar => {
233        try {
234          Logger.Debug("Finished storing the job");
235          if (stream != null)
236            stream.Dispose();
237
238          if (ar.IsCompleted) {
239            var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
240            StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { res }, null, false, null);
241            Logger.Debug("calling the Finished Job Event");
242            GetFinishedJobResultCompleted(this, args);
243            Logger.Debug("ENDED: Sending back the finished job results");
244          } else {
245            HandleNetworkError(new FaultException("GetFinishedJobResultAsync did not complete"));
246          }
247        }
248        catch (Exception e) {
249          OnExceptionOccured(e);
250        }
251        finally {
252          try { service.Dispose(); }
253          catch (Exception e) { OnExceptionOccured(e); }
254        }
255      }), null);
256    }
257
258    #endregion
259
260    #region Processsnapshots
261    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
262    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
263      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
264
265      Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
266      service.Obj.BeginProcessSnapshotStreamed(stream, (ar => {
267        try {
268          if (stream != null)
269            stream.Dispose();
270
271          if (ar.IsCompleted) {
272            var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
273            ProcessSnapshotCompletedEventArgs args = new ProcessSnapshotCompletedEventArgs(new object[] { res }, null, false, null);
274            ProcessSnapshotCompleted(this, args);
275          } else {
276            HandleNetworkError(new FaultException("ProcessSnapshotAsync did not complete"));
277          }
278        }
279        catch (Exception e) {
280          OnExceptionOccured(e);
281        }
282        finally {
283          try { service.Dispose(); }
284          catch (Exception e) { OnExceptionOccured(e); }
285        }
286      }), null);
287    }
288
289    #endregion
290
291    /// <summary>
292    /// Methods for sending the periodically Heartbeat
293    /// </summary>
294    #region Heartbeat
295
296    public event EventHandler<ProcessHeartBeatCompletedEventArgs> ProcessHeartBeatCompleted;
297    public void ProcessHeartBeatSync(HeartBeatData hbd) {
298      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
299        Logger.Debug("STARTING: sending heartbeat");
300        var res = service.Obj.ProcessHeartBeat(hbd);
301
302        if (res.StatusMessage == ResponseStatus.Ok) {
303          ProcessHeartBeatCompleted(this, new ProcessHeartBeatCompletedEventArgs(new object[] { res }, null, false, null));
304          Logger.Debug("ENDED: sending heartbeats");
305        } else {
306          Logger.Error("FAILED: sending heartbeats: " + res.StatusMessage.ToString());
307        }
308      }
309    }
310
311    #endregion
312
313    /// <summary>
314    /// Send back finished and Stored Job Results
315    /// </summary>
316    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
317      JobResult jobResult = new JobResult();
318      jobResult.SlaveId = clientId;
319      jobResult.Id = jobId;
320      jobResult.ExecutionTime = executionTime;
321      jobResult.Exception = exception;
322
323      MultiStream stream = new MultiStream();
324
325      //first send result
326      stream.AddStream(new StreamedObject<JobResult>(jobResult));
327
328      //second stream the job binary data
329      MemoryStream memStream = new MemoryStream(result, false);
330      stream.AddStream(memStream);
331
332      return stream;
333    }
334
335    public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
336      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
337        ResponseResultReceived res = service.Obj.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
338        return res;
339      }
340    }
341
342    public Response IsJobStillNeeded(Guid jobId) {
343      try {
344        Logger.Debug("STARTING: Sync call: IsJobStillNeeded");
345        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
346          Response res = service.Obj.IsJobStillNeeded(jobId);
347          Logger.Debug("ENDED: Sync call: IsJobStillNeeded");
348          return res;
349        }
350      }
351      catch (Exception e) {
352        OnExceptionOccured(e);
353        return null;
354      }
355    }
356
357    public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
358      try {
359        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
360          return service.Obj.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
361        }
362      }
363      catch (Exception e) {
364        OnExceptionOccured(e);
365        return null;
366      }
367    }
368
369    public IEnumerable<CachedHivePluginInfoDto> RequestPlugins(List<HivePluginInfoDto> requestedPlugins) {
370      try {
371        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
372          Logger.Debug("STARTED: Requesting Plugins for Job");
373          Logger.Debug("STARTED: Getting the stream");
374          Stream stream = service.Obj.GetStreamedPlugins(requestedPlugins.ToArray());
375          Logger.Debug("ENDED: Getting the stream");
376          BinaryFormatter formatter = new BinaryFormatter();
377          Logger.Debug("STARTED: Deserializing the stream");
378          ResponseList<CachedHivePluginInfoDto> response = (ResponseList<CachedHivePluginInfoDto>)formatter.Deserialize(stream);
379          Logger.Debug("ENDED: Deserializing the stream");
380          if (stream != null)
381            stream.Dispose();
382          return response.List;
383        }
384      }
385      catch (Exception e) {
386        OnExceptionOccured(e);
387        return null;
388      }
389    }
390
391    public void Logout(Guid guid) {
392      try {
393        Logger.Debug("STARTED: Logout");
394        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
395          service.Obj.Logout(guid);
396        }
397        Logger.Debug("ENDED: Logout");
398      }
399      catch (Exception e) {
400        OnExceptionOccured(e);
401      }
402    }
403
404    public ResponseCalendar GetCalendarSync(Guid clientId) {
405      try {
406        Logger.Debug("STARTED: Syncing Calendars");
407        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
408          ResponseCalendar cal = service.Obj.GetCalendar(clientId);
409          Logger.Debug("ENDED: Syncing Calendars");
410          return cal;
411        }
412      }
413      catch (Exception e) {
414        OnExceptionOccured(e);
415        return null;
416      }
417    }
418
419    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
420      try {
421        Logger.Debug("STARTED: Setting Calendar status to: " + state);
422        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
423          Response resp = service.Obj.SetCalendarStatus(clientId, state);
424          Logger.Debug("ENDED: Setting Calendar status to: " + state);
425          return resp;
426        }
427      }
428      catch (Exception e) {
429        OnExceptionOccured(e);
430        return null;
431      }
432    }
433
434    public ResponseObject<JobDto> AddChildJob(Guid parentJobId, SerializedJob serializedJob) {
435      try {
436        Logger.Debug("STARTED: Add Child Job for parent: " + parentJobId);
437        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
438          ResponseObject<JobDto> response = service.Obj.AddChildJob(parentJobId, serializedJob);
439          Logger.Debug("ENDED: Add Child Job for parent: " + parentJobId);
440          return response;
441        }
442      }
443      catch (Exception e) {
444        OnExceptionOccured(e);
445        return null;
446      }
447    }
448
449    public ResponseObject<JobDto> PauseJob(SerializedJob serializedJob) {
450      try {
451        Logger.Debug("STARTED: Pausing job: " + serializedJob.JobInfo.Id);
452        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
453          ResponseObject<JobDto> response = service.Obj.PauseJob(serializedJob);
454          Logger.Debug("ENDED: Pausing job: " + serializedJob.JobInfo.Id);
455          return response;
456        }
457      }
458      catch (Exception e) {
459        OnExceptionOccured(e);
460        return null;
461      }
462    }
463
464    public ResponseObject<SerializedJobList> GetChildJobs(Guid parentJob) {
465      try {
466        Logger.Debug("STARTED: GetChildJobs job: " + parentJob);
467        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
468          SerializedJobList serializedJobs = new SerializedJobList();
469          JobResult[] results = service.Obj.GetChildJobResults(new Guid?(parentJob), false, false);
470          foreach (JobResult result in results) {
471            serializedJobs.Add(service.Obj.GetLastSerializedResult(result.Id));
472          }
473
474          Logger.Debug("ENDED: GetChildJobs job: " + parentJob);
475          return new ResponseObject<SerializedJobList>() {
476            Obj = serializedJobs
477          };
478        }
479      }
480      catch (Exception e) {
481        OnExceptionOccured(e);
482        return null;
483      }
484    }
485
486    public void DeleteChildJobs(Guid jobId) {
487      try {
488        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
489          service.Obj.DeleteChildJobs(jobId);
490        }
491      }
492      catch (Exception e) {
493        OnExceptionOccured(e);
494      }
495    }
496
497    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
498    private void OnExceptionOccured(Exception e) {
499      Logger.Error("Error: " + e.ToString());
500      var handler = ExceptionOccured;
501      if (handler != null) handler(this, new EventArgs<Exception>(e));
502    }
503  }
504}
Note: See TracBrowser for help on using the repository browser.