Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs @ 5000

Last change on this file since 5000 was 5000, checked in by cneumuel, 13 years ago

#1260

  • custom configuration file is now deployed on slaves
  • made plugin distribution process more robust
  • fixed bug: slave assemblies are required in each sandbox appdomain
  • cleaned up some configuration elements in slave console client
  • fixed wrong path for locally persisted jobs
File size: 18.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.Serialization.Formatters.Binary;
26using System.ServiceModel;
27using HeuristicLab.Common;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts.ResponseObjects;
31using HeuristicLab.Hive.Slave.Common;
32using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
33using HeuristicLab.PluginInfrastructure;
34using HeuristicLab.Hive.Tracing;
35using HeuristicLab.Clients.Common;
36
37namespace HeuristicLab.Hive.Slave.Communication {
38
39  /// <summary>
40  /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
41  /// </summary>
42  public class WcfService {
43    private static WcfService instance;
44    /// <summary>
45    /// Getter for the Instance of the WcfService
46    /// </summary>
47    /// <returns>the Instance of the WcfService class</returns>
48    public static WcfService Instance {
49      get {
50        if (instance == null) {
51          Logger.Debug("New WcfService Instance created");
52          instance = new WcfService();
53        }
54        return instance;
55      }
56    }
57
58    public DateTime ConnectedSince { get; private set; }
59    public NetworkEnum.WcfConnState ConnState { get; private set; }
60
61    public event EventHandler Connected;
62    public void OnConnected() {
63      var handler = Connected;
64      if (handler != null) handler(this, EventArgs.Empty);
65    }
66
67    /// <summary>
68    /// Constructor
69    /// </summary>
70    private WcfService() {
71      ConnState = NetworkEnum.WcfConnState.Disconnected;
72    }
73
74    /// <summary>
75    /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
76    /// </summary>
77    public void Connect(SlaveDto slaveInfo) {
78      RegisterServiceEvents();
79      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
80        try {
81          Logger.Debug("Starting the Connection Process");
82          ConnState = NetworkEnum.WcfConnState.Connected;
83          ConnectedSince = DateTime.Now;
84          service.Obj.Login(slaveInfo);
85          OnConnected();
86        }
87        catch (Exception ex) {
88          HandleNetworkError(ex);
89        }
90      }
91    }
92
93    private void RegisterServiceEvents() {
94      ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
95      ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
96    }
97
98    private void DeregisterServiceEvents() {
99      ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured -= new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
100      ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured -= new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
101    }
102
103    void ClientFacadePool_ExceptionOccured(object sender, EventArgs<Exception> e) {
104      HandleNetworkError(e.Value);
105      Logger.Error("An exception occured in the WCF-Communication: " + e.Value.ToString());
106    }
107
108    ///// <summary>
109    ///// Disconnects the Slave from the Server
110    ///// </summary>
111    public void Disconnect() {
112      ConnState = NetworkEnum.WcfConnState.Disconnected;
113      DeregisterServiceEvents();
114    }
115
116    /// <summary>
117    /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
118    /// </summary>
119    /// <param name="e">The Exception</param>
120    private void HandleNetworkError(Exception e) {
121      ConnState = NetworkEnum.WcfConnState.Failed;
122      DeregisterServiceEvents();
123      Logger.Error("Network exception occurred: " + e);
124    }
125
126    /// <summary>
127    /// Methods for the Server Login
128    /// </summary>
129    //public void Login(SlaveDto slaveInfo) {
130    //  try {
131    //    using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.SlaveFacadePool.GetService()) {
132    //      if (ConnState == NetworkEnum.WcfConnState.Connected) {
133    //        Logger.Debug("STARTED: Login Sync");
134    //        Response res = service.Obj.Login(slaveInfo);
135    //        if (res.StatusMessage != ResponseStatus.Ok) {
136    //          Logger.Error("FAILED: Login Failed! " + res.StatusMessage);
137    //          throw new Exception(res.StatusMessage.ToString());
138    //        } else {
139    //          Logger.Info("ENDED: Login succeeded" + res.StatusMessage);
140    //        }
141    //      }
142    //    }
143    //  }
144    //  catch (Exception e) {
145    //    OnExceptionOccured(e);
146    //  }
147    //}
148
149    /// <summary>
150    /// Pull a Job from the Server
151    /// </summary>
152    #region PullJob
153    public event System.EventHandler<GetJobCompletedEventArgs> GetJobCompleted;
154    public void GetJobAsync(Guid guid) {
155      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
156      Logger.Debug("STARTED: Fetching of Jobs from Server for Slave");
157      service.Obj.BeginGetStreamedJob(guid, (ar => {
158        Stream stream = null;
159        MemoryStream memStream = null;
160        try {
161          Logger.Debug("ENDED: Fetching of Jobs from Server for Slave");
162          stream = service.Obj.EndGetStreamedJob(ar);
163
164          //first deserialize the response
165          BinaryFormatter formatter = new BinaryFormatter();
166          ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream);
167
168          //second deserialize the BLOB
169          memStream = new MemoryStream();
170
171          byte[] buffer = new byte[3024];
172          int read = 0;
173          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
174            memStream.Write(buffer, 0, read);
175          }
176
177          memStream.Close();
178
179          GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, null, !ar.IsCompleted, ar.AsyncState);
180          GetJobCompleted(this, completedEventArgs);
181        }
182        catch (Exception e) {
183          OnExceptionOccured(e);
184        }
185        finally {
186          if (stream != null)
187            stream.Dispose();
188
189          if (memStream != null)
190            memStream.Dispose();
191
192          try { service.Dispose(); }
193          catch (Exception e) { OnExceptionOccured(e); }
194        }
195      }), null);
196    }
197
198    #endregion
199
200    /// <summary>
201    /// Send back finished Job Results
202    /// </summary>
203    #region SendJobResults
204    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> GetFinishedJobResultCompleted;
205    public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
206      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
207      Logger.Debug("STARTED: Sending back the finished job results");
208      Logger.Debug("Building stream");
209      Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
210      Logger.Debug("Builded stream");
211      Logger.Debug("Making the call");
212
213      service.Obj.BeginStoreFinishedJobResultStreamed(stream, (ar => {
214        try {
215          Logger.Debug("Finished storing the job");
216          if (stream != null)
217            stream.Dispose();
218
219          var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
220          StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { res }, null, false, null);
221          Logger.Debug("calling the Finished Job Event");
222          GetFinishedJobResultCompleted(this, args);
223          Logger.Debug("ENDED: Sending back the finished job results");
224        }
225        catch (Exception e) {
226          OnExceptionOccured(e);
227        }
228        finally {
229          try { service.Dispose(); }
230          catch (Exception e) { OnExceptionOccured(e); }
231        }
232      }), null);
233    }
234
235    #endregion
236
237    #region Processsnapshots
238    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
239    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
240      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
241
242      Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
243      service.Obj.BeginProcessSnapshotStreamed(stream, (ar => {
244        try {
245          if (stream != null)
246            stream.Dispose();
247
248          var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
249          ProcessSnapshotCompletedEventArgs args = new ProcessSnapshotCompletedEventArgs(new object[] { res }, null, false, null);
250          ProcessSnapshotCompleted(this, args);
251        }
252        catch (Exception e) {
253          OnExceptionOccured(e);
254        }
255        finally {
256          try { service.Dispose(); }
257          catch (Exception e) { OnExceptionOccured(e); }
258        }
259      }), null);
260    }
261
262    #endregion
263
264    /// <summary>
265    /// Methods for sending the periodically Heartbeat
266    /// </summary>
267    #region Heartbeat
268
269    public event EventHandler<ProcessHeartBeatCompletedEventArgs> ProcessHeartBeatCompleted;
270    public void ProcessHeartBeatSync(HeartBeatData hbd) {
271      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
272        Logger.Debug("STARTING: sending heartbeat");
273        var res = service.Obj.ProcessHeartBeat(hbd);
274
275        if (res.StatusMessage == ResponseStatus.Ok) {
276          ProcessHeartBeatCompleted(this, new ProcessHeartBeatCompletedEventArgs(new object[] { res }, null, false, null));
277          Logger.Debug("ENDED: sending heartbeats");
278        } else {
279          Logger.Error("FAILED: sending heartbeats: " + res.StatusMessage.ToString());
280        }
281      }
282    }
283
284    #endregion
285
286    /// <summary>
287    /// Send back finished and Stored Job Results
288    /// </summary>
289    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
290      JobResult jobResult = new JobResult();
291      jobResult.SlaveId = clientId;
292      jobResult.Id = jobId;
293      jobResult.ExecutionTime = executionTime;
294      jobResult.Exception = exception;
295
296      MultiStream stream = new MultiStream();
297
298      //first send result
299      stream.AddStream(new StreamedObject<JobResult>(jobResult));
300
301      //second stream the job binary data
302      MemoryStream memStream = new MemoryStream(result, false);
303      stream.AddStream(memStream);
304
305      return stream;
306    }
307
308    public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
309      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
310        ResponseResultReceived res = service.Obj.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
311        return res;
312      }
313    }
314
315    public Response IsJobStillNeeded(Guid jobId) {
316      try {
317        Logger.Debug("STARTING: Sync call: IsJobStillNeeded");
318        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
319          Response res = service.Obj.IsJobStillNeeded(jobId);
320          Logger.Debug("ENDED: Sync call: IsJobStillNeeded");
321          return res;
322        }
323      }
324      catch (Exception e) {
325        OnExceptionOccured(e);
326        return null;
327      }
328    }
329
330    public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
331      try {
332        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
333          return service.Obj.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
334        }
335      }
336      catch (Exception e) {
337        OnExceptionOccured(e);
338        return null;
339      }
340    }
341
342    public IEnumerable<CachedHivePluginInfoDto> RequestPlugins(List<HivePluginInfoDto> requestedPlugins) {
343      try {
344        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
345          Logger.Debug("STARTED: Requesting Plugins for Job");
346          Logger.Debug("STARTED: Getting the stream");
347          Stream stream = service.Obj.GetStreamedPlugins(requestedPlugins.ToArray());
348          Logger.Debug("ENDED: Getting the stream");
349          BinaryFormatter formatter = new BinaryFormatter();
350          Logger.Debug("STARTED: Deserializing the stream");
351          ResponseList<CachedHivePluginInfoDto> response = (ResponseList<CachedHivePluginInfoDto>)formatter.Deserialize(stream);
352          Logger.Debug("ENDED: Deserializing the stream");
353          if (stream != null)
354            stream.Dispose();
355          return response.List;
356        }
357      }
358      catch (Exception e) {
359        OnExceptionOccured(e);
360        return null;
361      }
362    }
363
364    public void Logout(Guid guid) {
365      try {
366        Logger.Debug("STARTED: Logout");
367        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
368          service.Obj.Logout(guid);
369        }
370        Logger.Debug("ENDED: Logout");
371      }
372      catch (Exception e) {
373        OnExceptionOccured(e);
374      }
375    }
376
377    public ResponseCalendar GetCalendarSync(Guid clientId) {
378      try {
379        Logger.Debug("STARTED: Syncing Calendars");
380        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
381          ResponseCalendar cal = service.Obj.GetCalendar(clientId);
382          Logger.Debug("ENDED: Syncing Calendars");
383          return cal;
384        }
385      }
386      catch (Exception e) {
387        OnExceptionOccured(e);
388        return null;
389      }
390    }
391
392    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
393      try {
394        Logger.Debug("STARTED: Setting Calendar status to: " + state);
395        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
396          Response resp = service.Obj.SetCalendarStatus(clientId, state);
397          Logger.Debug("ENDED: Setting Calendar status to: " + state);
398          return resp;
399        }
400      }
401      catch (Exception e) {
402        OnExceptionOccured(e);
403        return null;
404      }
405    }
406
407    public ResponseObject<JobDto> AddChildJob(Guid parentJobId, SerializedJob serializedJob) {
408      try {
409        Logger.Debug("STARTED: Add Child Job for parent: " + parentJobId);
410        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
411          ResponseObject<JobDto> response = service.Obj.AddChildJob(parentJobId, serializedJob);
412          Logger.Debug("ENDED: Add Child Job for parent: " + parentJobId);
413          return response;
414        }
415      }
416      catch (Exception e) {
417        OnExceptionOccured(e);
418        return null;
419      }
420    }
421
422    public ResponseObject<JobDto> PauseJob(SerializedJob serializedJob) {
423      try {
424        Logger.Debug("STARTED: Pausing job: " + serializedJob.JobInfo.Id);
425        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
426          ResponseObject<JobDto> response = service.Obj.PauseJob(serializedJob);
427          Logger.Debug("ENDED: Pausing job: " + serializedJob.JobInfo.Id);
428          return response;
429        }
430      }
431      catch (Exception e) {
432        OnExceptionOccured(e);
433        return null;
434      }
435    }
436
437    public ResponseObject<SerializedJobList> GetChildJobs(Guid parentJob) {
438      try {
439        Logger.Debug("STARTED: GetChildJobs job: " + parentJob);
440        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
441          SerializedJobList serializedJobs = new SerializedJobList();
442          JobResult[] results = service.Obj.GetChildJobResults(new Guid?(parentJob), false, false);
443          foreach (JobResult result in results) {
444            serializedJobs.Add(service.Obj.GetLastSerializedResult(result.Id));
445          }
446
447          Logger.Debug("ENDED: GetChildJobs job: " + parentJob);
448          return new ResponseObject<SerializedJobList>() {
449            Obj = serializedJobs
450          };
451        }
452      }
453      catch (Exception e) {
454        OnExceptionOccured(e);
455        return null;
456      }
457    }
458
459    public void DeleteChildJobs(Guid jobId) {
460      try {
461        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
462          service.Obj.DeleteChildJobs(jobId);
463        }
464      }
465      catch (Exception e) {
466        OnExceptionOccured(e);
467      }
468    }
469
470    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
471    private void OnExceptionOccured(Exception e) {
472      Logger.Error("Error: " + e.ToString());
473      var handler = ExceptionOccured;
474      if (handler != null) handler(this, new EventArgs<Exception>(e));
475    }
476
477    public HivePluginFile GetConfigurationFile() {
478      try {
479        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
480          var response = service.Obj.GetConfigurationFile();
481          return response.Obj;
482        }
483      }
484      catch (Exception e) {
485        OnExceptionOccured(e);
486        return null;
487      }
488    }
489  }
490}
Note: See TracBrowser for help on using the repository browser.