Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs @ 5093

Last change on this file since 5093 was 5093, checked in by cneumuel, 13 years ago

#1260

  • moved all state-information into lifecycleManager
  • changed isolation level for transactions to ReadCommited
  • made currentlyFetching-status on slave more rubust
  • made LogServiceReader more rubust
File size: 18.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.Serialization.Formatters.Binary;
26using System.ServiceModel;
27using HeuristicLab.Common;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts.ResponseObjects;
31using HeuristicLab.Hive.Slave.Common;
32using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
33using HeuristicLab.PluginInfrastructure;
34using HeuristicLab.Hive.Tracing;
35using HeuristicLab.Clients.Common;
36
37namespace HeuristicLab.Hive.Slave.Communication {
38
39  /// <summary>
40  /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
41  /// </summary>
42  public class WcfService {
43    private static WcfService instance;
44    /// <summary>
45    /// Getter for the Instance of the WcfService
46    /// </summary>
47    /// <returns>the Instance of the WcfService class</returns>
48    public static WcfService Instance {
49      get {
50        if (instance == null) {
51          Logger.Debug("New WcfService Instance created");
52          instance = new WcfService();
53        }
54        return instance;
55      }
56    }
57
58    public DateTime ConnectedSince { get; private set; }
59    public NetworkEnum.WcfConnState ConnState { get; private set; }
60
61    public event EventHandler Connected;
62    public void OnConnected() {
63      var handler = Connected;
64      if (handler != null) handler(this, EventArgs.Empty);
65    }
66
67    /// <summary>
68    /// Constructor
69    /// </summary>
70    private WcfService() {
71      ConnState = NetworkEnum.WcfConnState.Disconnected;
72    }
73
74    /// <summary>
75    /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
76    /// </summary>
77    public void Connect(SlaveDto slaveInfo) {
78      RegisterServiceEvents();
79      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
80        try {
81          Logger.Debug("Starting the Connection Process");
82          ConnState = NetworkEnum.WcfConnState.Connected;
83          ConnectedSince = DateTime.Now;
84          service.Obj.Login(slaveInfo);
85          OnConnected();
86        }
87        catch (Exception ex) {
88          HandleNetworkError(ex);
89        }
90      }
91    }
92
93    private void RegisterServiceEvents() {
94      ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
95      ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
96    }
97
98    private void DeregisterServiceEvents() {
99      ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured -= new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
100      ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured -= new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
101    }
102
103    void ClientFacadePool_ExceptionOccured(object sender, EventArgs<Exception> e) {
104      HandleNetworkError(e.Value);
105      Logger.Error("An exception occured in the WCF-Communication: " + e.Value.ToString());
106    }
107
108    ///// <summary>
109    ///// Disconnects the Slave from the Server
110    ///// </summary>
111    public void Disconnect() {
112      ConnState = NetworkEnum.WcfConnState.Disconnected;
113      DeregisterServiceEvents();
114    }
115
116    /// <summary>
117    /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
118    /// </summary>
119    /// <param name="e">The Exception</param>
120    private void HandleNetworkError(Exception e) {
121      ConnState = NetworkEnum.WcfConnState.Failed;
122      DeregisterServiceEvents();
123      Logger.Error("Network exception occurred: " + e);
124    }
125
126    /// <summary>
127    /// Pull a Job from the Server
128    /// </summary>
129    #region PullJob
130    public event System.EventHandler<GetJobCompletedEventArgs> GetJobCompleted;
131    public event System.EventHandler<EventArgs<Exception>> GetJobFailed;
132    public void GetJobAsync(Guid guid) {
133      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
134      Logger.Debug("STARTED: Fetching of Jobs from Server for Slave");
135      service.Obj.BeginGetStreamedJob(guid, (ar => {
136        Stream stream = null;
137        MemoryStream memStream = null;
138        try {
139          Logger.Debug("ENDED: Fetching of Jobs from Server for Slave");
140          stream = service.Obj.EndGetStreamedJob(ar);
141
142          //first deserialize the response
143          BinaryFormatter formatter = new BinaryFormatter();
144          ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream);
145
146          //second deserialize the BLOB
147          memStream = new MemoryStream();
148
149          byte[] buffer = new byte[3024];
150          int read = 0;
151          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
152            memStream.Write(buffer, 0, read);
153          }
154
155          memStream.Close();
156
157          GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, null, !ar.IsCompleted, ar.AsyncState);
158          GetJobCompleted(this, completedEventArgs);
159        }
160        catch (Exception e) {
161          OnExceptionOccured(e);
162          if (GetJobFailed != null)
163            GetJobFailed(this, new EventArgs<Exception>(e));
164        }
165        finally {
166          if (stream != null)
167            stream.Dispose();
168
169          if (memStream != null)
170            memStream.Dispose();
171
172          try { service.Dispose(); }
173          catch (Exception e) { OnExceptionOccured(e); }
174        }
175      }), null);
176    }
177
178    #endregion
179
180    /// <summary>
181    /// Send back finished Job Results
182    /// </summary>
183    #region SendJobResults
184    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> GetFinishedJobResultCompleted;
185    public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
186      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
187      Logger.Debug("STARTED: Sending back the finished job results");
188      Logger.Debug("Building stream");
189      Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
190      Logger.Debug("Builded stream");
191      Logger.Debug("Making the call");
192
193      service.Obj.BeginStoreFinishedJobResultStreamed(stream, (ar => {
194        try {
195          Logger.Debug("Finished storing the job");
196          if (stream != null)
197            stream.Dispose();
198
199          var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
200          StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { res }, null, false, null);
201          Logger.Debug("calling the Finished Job Event");
202          GetFinishedJobResultCompleted(this, args);
203          Logger.Debug("ENDED: Sending back the finished job results");
204        }
205        catch (Exception e) {
206          OnExceptionOccured(e);
207        }
208        finally {
209          try { service.Dispose(); }
210          catch (Exception e) { OnExceptionOccured(e); }
211        }
212      }), null);
213    }
214
215    #endregion
216
217    #region Processsnapshots
218    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
219    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
220      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
221
222      Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
223      service.Obj.BeginProcessSnapshotStreamed(stream, (ar => {
224        try {
225          if (stream != null)
226            stream.Dispose();
227
228          var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
229          ProcessSnapshotCompletedEventArgs args = new ProcessSnapshotCompletedEventArgs(new object[] { res }, null, false, null);
230          ProcessSnapshotCompleted(this, args);
231        }
232        catch (Exception e) {
233          OnExceptionOccured(e);
234        }
235        finally {
236          try { service.Dispose(); }
237          catch (Exception e) { OnExceptionOccured(e); }
238        }
239      }), null);
240    }
241
242    #endregion
243
244    /// <summary>
245    /// Methods for sending the periodically Heartbeat
246    /// </summary>
247    #region Heartbeat
248
249    public event EventHandler<ProcessHeartBeatCompletedEventArgs> ProcessHeartBeatCompleted;
250    public void ProcessHeartBeatSync(HeartBeatData hbd) {
251      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
252        Logger.Debug("STARTING: sending heartbeat");
253        var res = service.Obj.ProcessHeartBeat(hbd);
254
255        if (res.StatusMessage == ResponseStatus.Ok) {
256          ProcessHeartBeatCompleted(this, new ProcessHeartBeatCompletedEventArgs(new object[] { res }, null, false, null));
257          Logger.Debug("ENDED: sending heartbeats");
258        } else {
259          Logger.Error("FAILED: sending heartbeats: " + res.StatusMessage.ToString());
260        }
261      }
262    }
263
264    #endregion
265
266    /// <summary>
267    /// Send back finished and Stored Job Results
268    /// </summary>
269    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
270      JobResult jobResult = new JobResult();
271      jobResult.SlaveId = clientId;
272      jobResult.Id = jobId;
273      jobResult.ExecutionTime = executionTime;
274      jobResult.Exception = exception;
275
276      MultiStream stream = new MultiStream();
277
278      //first send result
279      stream.AddStream(new StreamedObject<JobResult>(jobResult));
280
281      //second stream the job binary data
282      MemoryStream memStream = new MemoryStream(result, false);
283      stream.AddStream(memStream);
284
285      return stream;
286    }
287
288    public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
289      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
290        ResponseResultReceived res = service.Obj.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
291        return res;
292      }
293    }
294
295    public Response IsJobStillNeeded(Guid jobId) {
296      try {
297        Logger.Debug("STARTING: Sync call: IsJobStillNeeded");
298        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
299          Response res = service.Obj.IsJobStillNeeded(jobId);
300          Logger.Debug("ENDED: Sync call: IsJobStillNeeded");
301          return res;
302        }
303      }
304      catch (Exception e) {
305        OnExceptionOccured(e);
306        return null;
307      }
308    }
309
310    public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
311      try {
312        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
313          return service.Obj.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
314        }
315      }
316      catch (Exception e) {
317        OnExceptionOccured(e);
318        return null;
319      }
320    }
321
322    public IEnumerable<CachedHivePluginInfoDto> RequestPlugins(List<HivePluginInfoDto> requestedPlugins) {
323      try {
324        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
325          Logger.Debug("STARTED: Requesting Plugins for Job");
326          Logger.Debug("STARTED: Getting the stream");
327          Stream stream = service.Obj.GetStreamedPlugins(requestedPlugins.ToArray());
328          Logger.Debug("ENDED: Getting the stream");
329          BinaryFormatter formatter = new BinaryFormatter();
330          Logger.Debug("STARTED: Deserializing the stream");
331          ResponseList<CachedHivePluginInfoDto> response = (ResponseList<CachedHivePluginInfoDto>)formatter.Deserialize(stream);
332          Logger.Debug("ENDED: Deserializing the stream");
333          if (stream != null)
334            stream.Dispose();
335          return response.List;
336        }
337      }
338      catch (Exception e) {
339        OnExceptionOccured(e);
340        return null;
341      }
342    }
343
344    public void Logout(Guid guid) {
345      try {
346        Logger.Debug("STARTED: Logout");
347        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
348          service.Obj.Logout(guid);
349        }
350        Logger.Debug("ENDED: Logout");
351      }
352      catch (Exception e) {
353        OnExceptionOccured(e);
354      }
355    }
356
357    public ResponseCalendar GetCalendarSync(Guid clientId) {
358      try {
359        Logger.Debug("STARTED: Syncing Calendars");
360        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
361          ResponseCalendar cal = service.Obj.GetCalendar(clientId);
362          Logger.Debug("ENDED: Syncing Calendars");
363          return cal;
364        }
365      }
366      catch (Exception e) {
367        OnExceptionOccured(e);
368        return null;
369      }
370    }
371
372    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
373      try {
374        Logger.Debug("STARTED: Setting Calendar status to: " + state);
375        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
376          Response resp = service.Obj.SetCalendarStatus(clientId, state);
377          Logger.Debug("ENDED: Setting Calendar status to: " + state);
378          return resp;
379        }
380      }
381      catch (Exception e) {
382        OnExceptionOccured(e);
383        return null;
384      }
385    }
386
387    public ResponseObject<JobDto> AddChildJob(Guid parentJobId, SerializedJob serializedJob) {
388      try {
389        Logger.Debug("STARTED: Add Child Job for parent: " + parentJobId);
390        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
391          ResponseObject<JobDto> response = service.Obj.AddChildJob(parentJobId, serializedJob);
392          Logger.Debug("ENDED: Add Child Job for parent: " + parentJobId);
393          return response;
394        }
395      }
396      catch (Exception e) {
397        OnExceptionOccured(e);
398        return null;
399      }
400    }
401
402    public ResponseObject<JobDto> PauseJob(SerializedJob serializedJob) {
403      try {
404        Logger.Debug("STARTED: Pausing job: " + serializedJob.JobInfo.Id);
405        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
406          ResponseObject<JobDto> response = service.Obj.PauseJob(serializedJob);
407          Logger.Debug("ENDED: Pausing job: " + serializedJob.JobInfo.Id);
408          return response;
409        }
410      }
411      catch (Exception e) {
412        OnExceptionOccured(e);
413        return null;
414      }
415    }
416
417    public ResponseObject<SerializedJobList> GetChildJobs(Guid parentJob) {
418      try {
419        Logger.Debug("STARTED: GetChildJobs job: " + parentJob);
420        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
421          SerializedJobList serializedJobs = new SerializedJobList();
422          JobResult[] results = service.Obj.GetChildJobResults(new Guid?(parentJob), false, false);
423          foreach (JobResult result in results) {
424            serializedJobs.Add(service.Obj.GetLastSerializedResult(result.Id));
425          }
426
427          Logger.Debug("ENDED: GetChildJobs job: " + parentJob);
428          return new ResponseObject<SerializedJobList>() {
429            Obj = serializedJobs
430          };
431        }
432      }
433      catch (Exception e) {
434        OnExceptionOccured(e);
435        return null;
436      }
437    }
438
439    public void DeleteChildJobs(Guid jobId) {
440      try {
441        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
442          service.Obj.DeleteChildJobs(jobId);
443        }
444      }
445      catch (Exception e) {
446        OnExceptionOccured(e);
447      }
448    }
449
450    public HivePluginFile GetConfigurationFile() {
451      try {
452        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
453          var response = service.Obj.GetConfigurationFile();
454          return response.Obj;
455        }
456      }
457      catch (Exception e) {
458        OnExceptionOccured(e);
459        return null;
460      }
461    }
462
463    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
464    private void OnExceptionOccured(Exception e) {
465      Logger.Error("Error: " + e.ToString());
466      var handler = ExceptionOccured;
467      if (handler != null) handler(this, new EventArgs<Exception>(e));
468    }
469  }
470}
Note: See TracBrowser for help on using the repository browser.