Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs @ 5048

Last change on this file since 5048 was 5037, checked in by cneumuel, 13 years ago

#1260

  • changed ExecutionTime column datatype in DB to string (because time only allows 24-hours, but TimeSpan can be more)
File size: 17.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.Serialization.Formatters.Binary;
26using System.ServiceModel;
27using HeuristicLab.Common;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts.ResponseObjects;
31using HeuristicLab.Hive.Slave.Common;
32using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
33using HeuristicLab.PluginInfrastructure;
34using HeuristicLab.Hive.Tracing;
35using HeuristicLab.Clients.Common;
36
37namespace HeuristicLab.Hive.Slave.Communication {
38
39  /// <summary>
40  /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
41  /// </summary>
42  public class WcfService {
43    private static WcfService instance;
44    /// <summary>
45    /// Getter for the Instance of the WcfService
46    /// </summary>
47    /// <returns>the Instance of the WcfService class</returns>
48    public static WcfService Instance {
49      get {
50        if (instance == null) {
51          Logger.Debug("New WcfService Instance created");
52          instance = new WcfService();
53        }
54        return instance;
55      }
56    }
57
58    public DateTime ConnectedSince { get; private set; }
59    public NetworkEnum.WcfConnState ConnState { get; private set; }
60
61    public event EventHandler Connected;
62    public void OnConnected() {
63      var handler = Connected;
64      if (handler != null) handler(this, EventArgs.Empty);
65    }
66
67    /// <summary>
68    /// Constructor
69    /// </summary>
70    private WcfService() {
71      ConnState = NetworkEnum.WcfConnState.Disconnected;
72    }
73
74    /// <summary>
75    /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
76    /// </summary>
77    public void Connect(SlaveDto slaveInfo) {
78      RegisterServiceEvents();
79      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
80        try {
81          Logger.Debug("Starting the Connection Process");
82          ConnState = NetworkEnum.WcfConnState.Connected;
83          ConnectedSince = DateTime.Now;
84          service.Obj.Login(slaveInfo);
85          OnConnected();
86        }
87        catch (Exception ex) {
88          HandleNetworkError(ex);
89        }
90      }
91    }
92
93    private void RegisterServiceEvents() {
94      ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
95      ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
96    }
97
98    private void DeregisterServiceEvents() {
99      ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured -= new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
100      ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured -= new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
101    }
102
103    void ClientFacadePool_ExceptionOccured(object sender, EventArgs<Exception> e) {
104      HandleNetworkError(e.Value);
105      Logger.Error("An exception occured in the WCF-Communication: " + e.Value.ToString());
106    }
107
108    ///// <summary>
109    ///// Disconnects the Slave from the Server
110    ///// </summary>
111    public void Disconnect() {
112      ConnState = NetworkEnum.WcfConnState.Disconnected;
113      DeregisterServiceEvents();
114    }
115
116    /// <summary>
117    /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
118    /// </summary>
119    /// <param name="e">The Exception</param>
120    private void HandleNetworkError(Exception e) {
121      ConnState = NetworkEnum.WcfConnState.Failed;
122      DeregisterServiceEvents();
123      Logger.Error("Network exception occurred: " + e);
124    }
125
126    /// <summary>
127    /// Pull a Job from the Server
128    /// </summary>
129    #region PullJob
130    public event System.EventHandler<GetJobCompletedEventArgs> GetJobCompleted;
131    public void GetJobAsync(Guid guid) {
132      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
133      Logger.Debug("STARTED: Fetching of Jobs from Server for Slave");
134      service.Obj.BeginGetStreamedJob(guid, (ar => {
135        Stream stream = null;
136        MemoryStream memStream = null;
137        try {
138          Logger.Debug("ENDED: Fetching of Jobs from Server for Slave");
139          stream = service.Obj.EndGetStreamedJob(ar);
140
141          //first deserialize the response
142          BinaryFormatter formatter = new BinaryFormatter();
143          ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream);
144
145          //second deserialize the BLOB
146          memStream = new MemoryStream();
147
148          byte[] buffer = new byte[3024];
149          int read = 0;
150          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
151            memStream.Write(buffer, 0, read);
152          }
153
154          memStream.Close();
155
156          GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, null, !ar.IsCompleted, ar.AsyncState);
157          GetJobCompleted(this, completedEventArgs);
158        }
159        catch (Exception e) {
160          OnExceptionOccured(e);
161        }
162        finally {
163          if (stream != null)
164            stream.Dispose();
165
166          if (memStream != null)
167            memStream.Dispose();
168
169          try { service.Dispose(); }
170          catch (Exception e) { OnExceptionOccured(e); }
171        }
172      }), null);
173    }
174
175    #endregion
176
177    /// <summary>
178    /// Send back finished Job Results
179    /// </summary>
180    #region SendJobResults
181    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> GetFinishedJobResultCompleted;
182    public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
183      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
184      Logger.Debug("STARTED: Sending back the finished job results");
185      Logger.Debug("Building stream");
186      Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
187      Logger.Debug("Builded stream");
188      Logger.Debug("Making the call");
189
190      service.Obj.BeginStoreFinishedJobResultStreamed(stream, (ar => {
191        try {
192          Logger.Debug("Finished storing the job");
193          if (stream != null)
194            stream.Dispose();
195
196          var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
197          StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { res }, null, false, null);
198          Logger.Debug("calling the Finished Job Event");
199          GetFinishedJobResultCompleted(this, args);
200          Logger.Debug("ENDED: Sending back the finished job results");
201        }
202        catch (Exception e) {
203          OnExceptionOccured(e);
204        }
205        finally {
206          try { service.Dispose(); }
207          catch (Exception e) { OnExceptionOccured(e); }
208        }
209      }), null);
210    }
211
212    #endregion
213
214    #region Processsnapshots
215    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
216    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
217      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
218
219      Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
220      service.Obj.BeginProcessSnapshotStreamed(stream, (ar => {
221        try {
222          if (stream != null)
223            stream.Dispose();
224
225          var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
226          ProcessSnapshotCompletedEventArgs args = new ProcessSnapshotCompletedEventArgs(new object[] { res }, null, false, null);
227          ProcessSnapshotCompleted(this, args);
228        }
229        catch (Exception e) {
230          OnExceptionOccured(e);
231        }
232        finally {
233          try { service.Dispose(); }
234          catch (Exception e) { OnExceptionOccured(e); }
235        }
236      }), null);
237    }
238
239    #endregion
240
241    /// <summary>
242    /// Methods for sending the periodically Heartbeat
243    /// </summary>
244    #region Heartbeat
245
246    public event EventHandler<ProcessHeartBeatCompletedEventArgs> ProcessHeartBeatCompleted;
247    public void ProcessHeartBeatSync(HeartBeatData hbd) {
248      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
249        Logger.Debug("STARTING: sending heartbeat");
250        var res = service.Obj.ProcessHeartBeat(hbd);
251
252        if (res.StatusMessage == ResponseStatus.Ok) {
253          ProcessHeartBeatCompleted(this, new ProcessHeartBeatCompletedEventArgs(new object[] { res }, null, false, null));
254          Logger.Debug("ENDED: sending heartbeats");
255        } else {
256          Logger.Error("FAILED: sending heartbeats: " + res.StatusMessage.ToString());
257        }
258      }
259    }
260
261    #endregion
262
263    /// <summary>
264    /// Send back finished and Stored Job Results
265    /// </summary>
266    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
267      JobResult jobResult = new JobResult();
268      jobResult.SlaveId = clientId;
269      jobResult.Id = jobId;
270      jobResult.ExecutionTime = executionTime;
271      jobResult.Exception = exception;
272
273      MultiStream stream = new MultiStream();
274
275      //first send result
276      stream.AddStream(new StreamedObject<JobResult>(jobResult));
277
278      //second stream the job binary data
279      MemoryStream memStream = new MemoryStream(result, false);
280      stream.AddStream(memStream);
281
282      return stream;
283    }
284
285    public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
286      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
287        ResponseResultReceived res = service.Obj.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
288        return res;
289      }
290    }
291
292    public Response IsJobStillNeeded(Guid jobId) {
293      try {
294        Logger.Debug("STARTING: Sync call: IsJobStillNeeded");
295        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
296          Response res = service.Obj.IsJobStillNeeded(jobId);
297          Logger.Debug("ENDED: Sync call: IsJobStillNeeded");
298          return res;
299        }
300      }
301      catch (Exception e) {
302        OnExceptionOccured(e);
303        return null;
304      }
305    }
306
307    public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
308      try {
309        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
310          return service.Obj.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
311        }
312      }
313      catch (Exception e) {
314        OnExceptionOccured(e);
315        return null;
316      }
317    }
318
319    public IEnumerable<CachedHivePluginInfoDto> RequestPlugins(List<HivePluginInfoDto> requestedPlugins) {
320      try {
321        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
322          Logger.Debug("STARTED: Requesting Plugins for Job");
323          Logger.Debug("STARTED: Getting the stream");
324          Stream stream = service.Obj.GetStreamedPlugins(requestedPlugins.ToArray());
325          Logger.Debug("ENDED: Getting the stream");
326          BinaryFormatter formatter = new BinaryFormatter();
327          Logger.Debug("STARTED: Deserializing the stream");
328          ResponseList<CachedHivePluginInfoDto> response = (ResponseList<CachedHivePluginInfoDto>)formatter.Deserialize(stream);
329          Logger.Debug("ENDED: Deserializing the stream");
330          if (stream != null)
331            stream.Dispose();
332          return response.List;
333        }
334      }
335      catch (Exception e) {
336        OnExceptionOccured(e);
337        return null;
338      }
339    }
340
341    public void Logout(Guid guid) {
342      try {
343        Logger.Debug("STARTED: Logout");
344        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
345          service.Obj.Logout(guid);
346        }
347        Logger.Debug("ENDED: Logout");
348      }
349      catch (Exception e) {
350        OnExceptionOccured(e);
351      }
352    }
353
354    public ResponseCalendar GetCalendarSync(Guid clientId) {
355      try {
356        Logger.Debug("STARTED: Syncing Calendars");
357        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
358          ResponseCalendar cal = service.Obj.GetCalendar(clientId);
359          Logger.Debug("ENDED: Syncing Calendars");
360          return cal;
361        }
362      }
363      catch (Exception e) {
364        OnExceptionOccured(e);
365        return null;
366      }
367    }
368
369    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
370      try {
371        Logger.Debug("STARTED: Setting Calendar status to: " + state);
372        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
373          Response resp = service.Obj.SetCalendarStatus(clientId, state);
374          Logger.Debug("ENDED: Setting Calendar status to: " + state);
375          return resp;
376        }
377      }
378      catch (Exception e) {
379        OnExceptionOccured(e);
380        return null;
381      }
382    }
383
384    public ResponseObject<JobDto> AddChildJob(Guid parentJobId, SerializedJob serializedJob) {
385      try {
386        Logger.Debug("STARTED: Add Child Job for parent: " + parentJobId);
387        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
388          ResponseObject<JobDto> response = service.Obj.AddChildJob(parentJobId, serializedJob);
389          Logger.Debug("ENDED: Add Child Job for parent: " + parentJobId);
390          return response;
391        }
392      }
393      catch (Exception e) {
394        OnExceptionOccured(e);
395        return null;
396      }
397    }
398
399    public ResponseObject<JobDto> PauseJob(SerializedJob serializedJob) {
400      try {
401        Logger.Debug("STARTED: Pausing job: " + serializedJob.JobInfo.Id);
402        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
403          ResponseObject<JobDto> response = service.Obj.PauseJob(serializedJob);
404          Logger.Debug("ENDED: Pausing job: " + serializedJob.JobInfo.Id);
405          return response;
406        }
407      }
408      catch (Exception e) {
409        OnExceptionOccured(e);
410        return null;
411      }
412    }
413
414    public ResponseObject<SerializedJobList> GetChildJobs(Guid parentJob) {
415      try {
416        Logger.Debug("STARTED: GetChildJobs job: " + parentJob);
417        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
418          SerializedJobList serializedJobs = new SerializedJobList();
419          JobResult[] results = service.Obj.GetChildJobResults(new Guid?(parentJob), false, false);
420          foreach (JobResult result in results) {
421            serializedJobs.Add(service.Obj.GetLastSerializedResult(result.Id));
422          }
423
424          Logger.Debug("ENDED: GetChildJobs job: " + parentJob);
425          return new ResponseObject<SerializedJobList>() {
426            Obj = serializedJobs
427          };
428        }
429      }
430      catch (Exception e) {
431        OnExceptionOccured(e);
432        return null;
433      }
434    }
435
436    public void DeleteChildJobs(Guid jobId) {
437      try {
438        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
439          service.Obj.DeleteChildJobs(jobId);
440        }
441      }
442      catch (Exception e) {
443        OnExceptionOccured(e);
444      }
445    }
446
447    public HivePluginFile GetConfigurationFile() {
448      try {
449        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
450          var response = service.Obj.GetConfigurationFile();
451          return response.Obj;
452        }
453      }
454      catch (Exception e) {
455        OnExceptionOccured(e);
456        return null;
457      }
458    }
459
460    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
461    private void OnExceptionOccured(Exception e) {
462      Logger.Error("Error: " + e.ToString());
463      var handler = ExceptionOccured;
464      if (handler != null) handler(this, new EventArgs<Exception>(e));
465    }
466  }
467}
Note: See TracBrowser for help on using the repository browser.