Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs @ 5707

Last change on this file since 5707 was 5707, checked in by cneumuel, 13 years ago

#1260

  • some changes due to the removal of Disposable (r5706)
  • copy PluginInfrastructure files into PluginCache folder in slaves (needed due to r5703)
File size: 18.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.Serialization.Formatters.Binary;
26using HeuristicLab.Common;
27using HeuristicLab.Hive.Contracts;
28using HeuristicLab.Hive.Contracts.BusinessObjects;
29using HeuristicLab.Hive.Contracts.ResponseObjects;
30using HeuristicLab.Hive.Slave.Common;
31using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
32using HeuristicLab.Hive.Tracing;
33using HeuristicLab.PluginInfrastructure;
34
35namespace HeuristicLab.Hive.Slave.Communication {
36
37  /// <summary>
38  /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
39  /// </summary>
40  public class WcfService {
41    private static WcfService instance;
42    /// <summary>
43    /// Getter for the Instance of the WcfService
44    /// </summary>
45    /// <returns>the Instance of the WcfService class</returns>
46    public static WcfService Instance {
47      get {
48        if (instance == null) {
49          Logger.Debug("New WcfService Instance created");
50          instance = new WcfService();
51        }
52        return instance;
53      }
54    }
55
56    public DateTime ConnectedSince { get; private set; }
57    public NetworkEnum.WcfConnState ConnState { get; private set; }
58
59    public event EventHandler Connected;
60    public void OnConnected() {
61      var handler = Connected;
62      if (handler != null) handler(this, EventArgs.Empty);
63    }
64
65    /// <summary>
66    /// Constructor
67    /// </summary>
68    private WcfService() {
69      ConnState = NetworkEnum.WcfConnState.Disconnected;
70    }
71
72    /// <summary>
73    /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
74    /// </summary>
75    public void Connect(SlaveDto slaveInfo) {
76      RegisterServiceEvents();
77      using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
78        try {
79          Logger.Debug("Starting the Connection Process");
80          ConnState = NetworkEnum.WcfConnState.Connected;
81          ConnectedSince = DateTime.Now;
82          service.Obj.Login(slaveInfo);
83          OnConnected();
84        }
85        catch (Exception ex) {
86          HandleNetworkError(ex);
87        }
88      }
89    }
90
91    private void RegisterServiceEvents() {
92      ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
93      ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
94    }
95
96    private void DeregisterServiceEvents() {
97      ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured -= new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
98      ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured -= new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
99    }
100
101    void ClientFacadePool_ExceptionOccured(object sender, EventArgs<Exception> e) {
102      HandleNetworkError(e.Value);
103      Logger.Error("An exception occured in the WCF-Communication: " + e.Value.ToString());
104    }
105
106    ///// <summary>
107    ///// Disconnects the Slave from the Server
108    ///// </summary>
109    public void Disconnect() {
110      ConnState = NetworkEnum.WcfConnState.Disconnected;
111      DeregisterServiceEvents();
112    }
113
114    /// <summary>
115    /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
116    /// </summary>
117    /// <param name="e">The Exception</param>
118    private void HandleNetworkError(Exception e) {
119      ConnState = NetworkEnum.WcfConnState.Failed;
120      DeregisterServiceEvents();
121      Logger.Error("Network exception occurred: " + e);
122    }
123
124    /// <summary>
125    /// Pull a Job from the Server
126    /// </summary>
127    #region PullJob
128    public event System.EventHandler<GetJobCompletedEventArgs> GetJobCompleted;
129    public event System.EventHandler<EventArgs<Exception>> GetJobFailed;
130    public void GetJobAsync(Guid guid) {
131      DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
132      Logger.Debug("STARTED: Fetching of Jobs from Server for Slave");
133      service.Obj.BeginGetStreamedJob(guid, (ar => {
134        Stream stream = null;
135        MemoryStream memStream = null;
136        try {
137          Logger.Debug("ENDED: Fetching of Jobs from Server for Slave");
138          stream = service.Obj.EndGetStreamedJob(ar);
139
140          //first deserialize the response
141          BinaryFormatter formatter = new BinaryFormatter();
142          ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream);
143
144          //second deserialize the BLOB
145          memStream = new MemoryStream();
146
147          byte[] buffer = new byte[3024];
148          int read = 0;
149          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
150            memStream.Write(buffer, 0, read);
151          }
152
153          memStream.Close();
154
155          GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, null, !ar.IsCompleted, ar.AsyncState);
156          GetJobCompleted(this, completedEventArgs);
157        }
158        catch (Exception e) {
159          OnExceptionOccured(e);
160          if (GetJobFailed != null)
161            GetJobFailed(this, new EventArgs<Exception>(e));
162        }
163        finally {
164          if (stream != null)
165            stream.Dispose();
166
167          if (memStream != null)
168            memStream.Dispose();
169
170          try { service.Dispose(); }
171          catch (Exception e) { OnExceptionOccured(e); }
172        }
173      }), null);
174    }
175
176    #endregion
177
178    /// <summary>
179    /// Send back finished Job Results
180    /// </summary>
181    #region SendJobResults
182    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> GetFinishedJobResultCompleted;
183    public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
184      DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
185      Logger.Debug("STARTED: Sending back the finished job results");
186      Logger.Debug("Building stream");
187      Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
188      Logger.Debug("Builded stream");
189      Logger.Debug("Making the call");
190
191      service.Obj.BeginStoreFinishedJobResultStreamed(stream, (ar => {
192        try {
193          Logger.Debug("Finished storing the job");
194          if (stream != null)
195            stream.Dispose();
196
197          var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
198          StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { res }, null, false, null);
199          Logger.Debug("calling the Finished Job Event");
200          GetFinishedJobResultCompleted(this, args);
201          Logger.Debug("ENDED: Sending back the finished job results");
202        }
203        catch (Exception e) {
204          OnExceptionOccured(e);
205        }
206        finally {
207          try { service.Dispose(); }
208          catch (Exception e) { OnExceptionOccured(e); }
209        }
210      }), null);
211    }
212
213    #endregion
214
215    #region Processsnapshots
216    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
217    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
218      DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
219
220      Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
221      service.Obj.BeginProcessSnapshotStreamed(stream, (ar => {
222        try {
223          if (stream != null)
224            stream.Dispose();
225
226          var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
227          ProcessSnapshotCompletedEventArgs args = new ProcessSnapshotCompletedEventArgs(new object[] { res }, null, false, null);
228          ProcessSnapshotCompleted(this, args);
229        }
230        catch (Exception e) {
231          OnExceptionOccured(e);
232        }
233        finally {
234          try { service.Dispose(); }
235          catch (Exception e) { OnExceptionOccured(e); }
236        }
237      }), null);
238    }
239
240    #endregion
241
242    /// <summary>
243    /// Methods for sending the periodically Heartbeat
244    /// </summary>
245    #region Heartbeat
246
247    public event EventHandler<ProcessHeartBeatCompletedEventArgs> ProcessHeartBeatCompleted;
248    public void ProcessHeartBeatSync(HeartBeatData hbd) {
249      using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.SlaveFacadePool.GetService()) {
250        Logger.Debug("STARTING: sending heartbeat");
251        var res = service.Obj.ProcessHeartBeat(hbd);
252
253        if (res.StatusMessage == ResponseStatus.Ok) {
254          ProcessHeartBeatCompleted(this, new ProcessHeartBeatCompletedEventArgs(new object[] { res }, null, false, null));
255          Logger.Debug("ENDED: sending heartbeats");
256        } else {
257          Logger.Error("FAILED: sending heartbeats: " + res.StatusMessage.ToString());
258        }
259      }
260    }
261
262    #endregion
263
264    /// <summary>
265    /// Send back finished and Stored Job Results
266    /// </summary>
267    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
268      JobResult jobResult = new JobResult();
269      jobResult.SlaveId = clientId;
270      jobResult.Id = jobId;
271      jobResult.ExecutionTime = executionTime;
272      jobResult.Exception = exception;
273
274      MultiStream stream = new MultiStream();
275
276      //first send result
277      stream.AddStream(new StreamedObject<JobResult>(jobResult));
278
279      //second stream the job binary data
280      MemoryStream memStream = new MemoryStream(result, false);
281      stream.AddStream(memStream);
282
283      return stream;
284    }
285
286    public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
287      using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
288        ResponseResultReceived res = service.Obj.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
289        return res;
290      }
291    }
292
293    public Response IsJobStillNeeded(Guid jobId) {
294      try {
295        Logger.Debug("STARTING: Sync call: IsJobStillNeeded");
296        using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
297          Response res = service.Obj.IsJobStillNeeded(jobId);
298          Logger.Debug("ENDED: Sync call: IsJobStillNeeded");
299          return res;
300        }
301      }
302      catch (Exception e) {
303        OnExceptionOccured(e);
304        return null;
305      }
306    }
307
308    public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
309      try {
310        using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
311          return service.Obj.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
312        }
313      }
314      catch (Exception e) {
315        OnExceptionOccured(e);
316        return null;
317      }
318    }
319
320    public IEnumerable<CachedHivePluginInfoDto> RequestPlugins(List<HivePluginInfoDto> requestedPlugins) {
321      try {
322        using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
323          Logger.Debug("STARTED: Requesting Plugins for Job");
324          Logger.Debug("STARTED: Getting the stream");
325          Stream stream = service.Obj.GetStreamedPlugins(requestedPlugins.ToArray());
326          Logger.Debug("ENDED: Getting the stream");
327          BinaryFormatter formatter = new BinaryFormatter();
328          Logger.Debug("STARTED: Deserializing the stream");
329          ResponseList<CachedHivePluginInfoDto> response = (ResponseList<CachedHivePluginInfoDto>)formatter.Deserialize(stream);
330          Logger.Debug("ENDED: Deserializing the stream");
331          if (stream != null)
332            stream.Dispose();
333          return response.List;
334        }
335      }
336      catch (Exception e) {
337        OnExceptionOccured(e);
338        return null;
339      }
340    }
341
342    public void Logout(Guid guid) {
343      try {
344        Logger.Debug("STARTED: Logout");
345        using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
346          service.Obj.Logout(guid);
347        }
348        Logger.Debug("ENDED: Logout");
349      }
350      catch (Exception e) {
351        OnExceptionOccured(e);
352      }
353    }
354
355    public ResponseCalendar GetCalendarSync(Guid clientId) {
356      try {
357        Logger.Debug("STARTED: Syncing Calendars");
358        using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
359          ResponseCalendar cal = service.Obj.GetCalendar(clientId);
360          Logger.Debug("ENDED: Syncing Calendars");
361          return cal;
362        }
363      }
364      catch (Exception e) {
365        OnExceptionOccured(e);
366        return null;
367      }
368    }
369
370    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
371      try {
372        Logger.Debug("STARTED: Setting Calendar status to: " + state);
373        using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
374          Response resp = service.Obj.SetCalendarStatus(clientId, state);
375          Logger.Debug("ENDED: Setting Calendar status to: " + state);
376          return resp;
377        }
378      }
379      catch (Exception e) {
380        OnExceptionOccured(e);
381        return null;
382      }
383    }
384
385    public ResponseObject<JobDto> AddChildJob(Guid parentJobId, SerializedJob serializedJob) {
386      try {
387        Logger.Debug("STARTED: Add Child Job for parent: " + parentJobId);
388        using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
389          ResponseObject<JobDto> response = service.Obj.AddChildJob(parentJobId, serializedJob);
390          Logger.Debug("ENDED: Add Child Job for parent: " + parentJobId);
391          return response;
392        }
393      }
394      catch (Exception e) {
395        OnExceptionOccured(e);
396        return null;
397      }
398    }
399
400    public ResponseObject<JobDto> PauseJob(SerializedJob serializedJob) {
401      try {
402        Logger.Debug("STARTED: Pausing job: " + serializedJob.JobInfo.Id);
403        using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
404          ResponseObject<JobDto> response = service.Obj.PauseJob(serializedJob);
405          Logger.Debug("ENDED: Pausing job: " + serializedJob.JobInfo.Id);
406          return response;
407        }
408      }
409      catch (Exception e) {
410        OnExceptionOccured(e);
411        return null;
412      }
413    }
414
415    public ResponseObject<SerializedJobList> GetChildJobs(Guid parentJob) {
416      try {
417        Logger.Debug("STARTED: GetChildJobs job: " + parentJob);
418        using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
419          SerializedJobList serializedJobs = new SerializedJobList();
420          JobResult[] results = service.Obj.GetChildJobResults(new Guid?(parentJob), false, false);
421          foreach (JobResult result in results) {
422            serializedJobs.Add(service.Obj.GetLastSerializedResult(result.Id));
423          }
424
425          Logger.Debug("ENDED: GetChildJobs job: " + parentJob);
426          return new ResponseObject<SerializedJobList>() {
427            Obj = serializedJobs
428          };
429        }
430      }
431      catch (Exception e) {
432        OnExceptionOccured(e);
433        return null;
434      }
435    }
436
437    public void DeleteChildJobs(Guid jobId) {
438      try {
439        using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
440          service.Obj.DeleteChildJobs(jobId);
441        }
442      }
443      catch (Exception e) {
444        OnExceptionOccured(e);
445      }
446    }
447
448    public HivePluginFile GetConfigurationFile() {
449      try {
450        using (DisposableWrapper<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
451          var response = service.Obj.GetConfigurationFile();
452          return response.Obj;
453        }
454      }
455      catch (Exception e) {
456        OnExceptionOccured(e);
457        return null;
458      }
459    }
460
461    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
462    private void OnExceptionOccured(Exception e) {
463      Logger.Error("Error: " + e.ToString());
464      var handler = ExceptionOccured;
465      if (handler != null) handler(this, new EventArgs<Exception>(e));
466    }
467  }
468}
Note: See TracBrowser for help on using the repository browser.