Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Hive.ExperimentManager/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs @ 4792

Last change on this file since 4792 was 4772, checked in by cneumuel, 14 years ago

#1260

  • added LogServiceReader to display log for slave without writing to local files
  • aborted jobs with childjobs now got back to state WaitForChildJob (instead of Offline)
  • lifecyclemanager now knows about available plugins (does not yet work perfectly)
File size: 18.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2010 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.IO;
25using System.Runtime.Serialization.Formatters.Binary;
26using System.ServiceModel;
27using HeuristicLab.Common;
28using HeuristicLab.Hive.Contracts;
29using HeuristicLab.Hive.Contracts.BusinessObjects;
30using HeuristicLab.Hive.Contracts.ResponseObjects;
31using HeuristicLab.Hive.Slave.Common;
32using HeuristicLab.Hive.Slave.Communication.SlaveFacade;
33using HeuristicLab.PluginInfrastructure;
34using HeuristicLab.Hive.Tracing;
35
36namespace HeuristicLab.Hive.Slave.Communication {
37
38  /// <summary>
39  /// WcfService class is implemented as a Singleton and works as a communication Layer with the Server
40  /// </summary>
41  public class WcfService {
42    private static WcfService instance;
43    /// <summary>
44    /// Getter for the Instance of the WcfService
45    /// </summary>
46    /// <returns>the Instance of the WcfService class</returns>
47    public static WcfService Instance {
48      get {
49        if (instance == null) {
50          Logger.Debug("New WcfService Instance created");
51          instance = new WcfService();
52        }
53        return instance;
54      }
55    }
56
57    public DateTime ConnectedSince { get; private set; }
58    public NetworkEnum.WcfConnState ConnState { get; private set; }
59
60    private string serverIp;
61    public string ServerIp {
62      get { return serverIp; }
63      set {
64        if (serverIp != value) {
65          serverIp = value;
66        }
67      }
68    }
69
70    public event EventHandler Connected;
71    public void OnConnected() {
72      var handler = Connected;
73      if (handler != null) handler(this, EventArgs.Empty);
74    }
75
76    /// <summary>
77    /// Constructor
78    /// </summary>
79    private WcfService() {
80      ConnState = NetworkEnum.WcfConnState.Disconnected;
81    }
82
83    /// <summary>
84    /// Connects with the Server, registers the events and fires the Connected (and quiet possibly the ConnectionRestored) Event.
85    /// </summary>
86    public void Connect(SlaveDto slaveInfo) {
87      ServiceLocator.Instance.HostAddress = ServerIp;
88      RegisterServiceEvents();
89      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
90        try {
91          Logger.Debug("Starting the Connection Process");
92          if (String.Empty.Equals(ServerIp)) {
93            Logger.Info("No Server IP set!");
94            return;
95          }
96          ConnState = NetworkEnum.WcfConnState.Connected;
97          ConnectedSince = DateTime.Now;
98          service.Obj.Login(slaveInfo);
99          OnConnected();
100        }
101        catch (Exception ex) {
102          HandleNetworkError(ex);
103        }
104      }
105    }
106
107    private void RegisterServiceEvents() {
108      ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
109      ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
110    }
111
112    private void DeregisterServiceEvents() {
113      ServiceLocator.Instance.SlaveFacadePool.ExceptionOccured -= new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
114      ServiceLocator.Instance.StreamedSlaveFacadePool.ExceptionOccured -= new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(ClientFacadePool_ExceptionOccured);
115    }
116
117    void ClientFacadePool_ExceptionOccured(object sender, EventArgs<Exception> e) {
118      HandleNetworkError(e.Value);
119      Logger.Error("An exception occured in the WCF-Communication: " + e.Value.ToString());
120    }
121
122    ///// <summary>
123    ///// Disconnects the Slave from the Server
124    ///// </summary>
125    public void Disconnect() {
126      ConnState = NetworkEnum.WcfConnState.Disconnected;
127      DeregisterServiceEvents();
128    }
129
130    /// <summary>
131    /// Network communication Error Handler - Every network error gets logged and the connection switches to faulted state
132    /// </summary>
133    /// <param name="e">The Exception</param>
134    private void HandleNetworkError(Exception e) {
135      ConnState = NetworkEnum.WcfConnState.Failed;
136      DeregisterServiceEvents();
137      Logger.Error("Network exception occurred: " + e);
138    }
139
140    /// <summary>
141    /// Methods for the Server Login
142    /// </summary>
143    //public void Login(SlaveDto slaveInfo) {
144    //  try {
145    //    using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.SlaveFacadePool.GetService()) {
146    //      if (ConnState == NetworkEnum.WcfConnState.Connected) {
147    //        Logger.Debug("STARTED: Login Sync");
148    //        Response res = service.Obj.Login(slaveInfo);
149    //        if (res.StatusMessage != ResponseStatus.Ok) {
150    //          Logger.Error("FAILED: Login Failed! " + res.StatusMessage);
151    //          throw new Exception(res.StatusMessage.ToString());
152    //        } else {
153    //          Logger.Info("ENDED: Login succeeded" + res.StatusMessage);
154    //        }
155    //      }
156    //    }
157    //  }
158    //  catch (Exception e) {
159    //    OnExceptionOccured(e);
160    //  }
161    //}
162
163    /// <summary>
164    /// Pull a Job from the Server
165    /// </summary>
166    #region PullJob
167    public event System.EventHandler<GetJobCompletedEventArgs> GetJobCompleted;
168    public void GetJobAsync(Guid guid) {
169      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
170      Logger.Debug("STARTED: Fetching of Jobs from Server for Slave");
171      service.Obj.BeginGetStreamedJob(guid, (ar => {
172        Stream stream = null;
173        MemoryStream memStream = null;
174        try {
175          Logger.Debug("ENDED: Fetching of Jobs from Server for Slave");
176          stream = service.Obj.EndGetStreamedJob(ar);
177
178          //first deserialize the response
179          BinaryFormatter formatter = new BinaryFormatter();
180          ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream);
181
182          //second deserialize the BLOB
183          memStream = new MemoryStream();
184
185          byte[] buffer = new byte[3024];
186          int read = 0;
187          while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) {
188            memStream.Write(buffer, 0, read);
189          }
190
191          memStream.Close();
192
193          GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, null, !ar.IsCompleted, ar.AsyncState);
194          GetJobCompleted(this, completedEventArgs);
195        }
196        catch (Exception e) {
197          OnExceptionOccured(e);
198        }
199        finally {
200          if (stream != null)
201            stream.Dispose();
202
203          if (memStream != null)
204            memStream.Dispose();
205
206          try { service.Dispose(); }
207          catch (Exception e) { OnExceptionOccured(e); }
208        }
209      }), null);
210    }
211
212    #endregion
213
214    /// <summary>
215    /// Send back finished Job Results
216    /// </summary>
217    #region SendJobResults
218    public event System.EventHandler<StoreFinishedJobResultCompletedEventArgs> GetFinishedJobResultCompleted;
219    public void GetFinishedJobResultAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
220      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
221      Logger.Debug("STARTED: Sending back the finished job results");
222      Logger.Debug("Building stream");
223      Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
224      Logger.Debug("Builded stream");
225      Logger.Debug("Making the call");
226
227      service.Obj.BeginStoreFinishedJobResultStreamed(stream, (ar => {
228        try {
229          Logger.Debug("Finished storing the job");
230          if (stream != null)
231            stream.Dispose();
232
233          var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
234          StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { res }, null, false, null);
235          Logger.Debug("calling the Finished Job Event");
236          GetFinishedJobResultCompleted(this, args);
237          Logger.Debug("ENDED: Sending back the finished job results");
238        }
239        catch (Exception e) {
240          OnExceptionOccured(e);
241        }
242        finally {
243          try { service.Dispose(); }
244          catch (Exception e) { OnExceptionOccured(e); }
245        }
246      }), null);
247    }
248
249    #endregion
250
251    #region Processsnapshots
252    public event System.EventHandler<ProcessSnapshotCompletedEventArgs> ProcessSnapshotCompleted;
253    public void ProcessSnapshotAsync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
254      Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService();
255
256      Stream stream = GetStreamedJobResult(clientId, jobId, result, executionTime, exception);
257      service.Obj.BeginProcessSnapshotStreamed(stream, (ar => {
258        try {
259          if (stream != null)
260            stream.Dispose();
261
262          var res = service.Obj.EndStoreFinishedJobResultStreamed(ar);
263          ProcessSnapshotCompletedEventArgs args = new ProcessSnapshotCompletedEventArgs(new object[] { res }, null, false, null);
264          ProcessSnapshotCompleted(this, args);
265        }
266        catch (Exception e) {
267          OnExceptionOccured(e);
268        }
269        finally {
270          try { service.Dispose(); }
271          catch (Exception e) { OnExceptionOccured(e); }
272        }
273      }), null);
274    }
275
276    #endregion
277
278    /// <summary>
279    /// Methods for sending the periodically Heartbeat
280    /// </summary>
281    #region Heartbeat
282
283    public event EventHandler<ProcessHeartBeatCompletedEventArgs> ProcessHeartBeatCompleted;
284    public void ProcessHeartBeatSync(HeartBeatData hbd) {
285      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
286        Logger.Debug("STARTING: sending heartbeat");
287        var res = service.Obj.ProcessHeartBeat(hbd);
288
289        if (res.StatusMessage == ResponseStatus.Ok) {
290          ProcessHeartBeatCompleted(this, new ProcessHeartBeatCompletedEventArgs(new object[] { res }, null, false, null));
291          Logger.Debug("ENDED: sending heartbeats");
292        } else {
293          Logger.Error("FAILED: sending heartbeats: " + res.StatusMessage.ToString());
294        }
295      }
296    }
297
298    #endregion
299
300    /// <summary>
301    /// Send back finished and Stored Job Results
302    /// </summary>
303    private Stream GetStreamedJobResult(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
304      JobResult jobResult = new JobResult();
305      jobResult.SlaveId = clientId;
306      jobResult.Id = jobId;
307      jobResult.ExecutionTime = executionTime;
308      jobResult.Exception = exception;
309
310      MultiStream stream = new MultiStream();
311
312      //first send result
313      stream.AddStream(new StreamedObject<JobResult>(jobResult));
314
315      //second stream the job binary data
316      MemoryStream memStream = new MemoryStream(result, false);
317      stream.AddStream(memStream);
318
319      return stream;
320    }
321
322    public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception, bool finished) {
323      using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
324        ResponseResultReceived res = service.Obj.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
325        return res;
326      }
327    }
328
329    public Response IsJobStillNeeded(Guid jobId) {
330      try {
331        Logger.Debug("STARTING: Sync call: IsJobStillNeeded");
332        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
333          Response res = service.Obj.IsJobStillNeeded(jobId);
334          Logger.Debug("ENDED: Sync call: IsJobStillNeeded");
335          return res;
336        }
337      }
338      catch (Exception e) {
339        OnExceptionOccured(e);
340        return null;
341      }
342    }
343
344    public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, TimeSpan executionTime, string exception) {
345      try {
346        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
347          return service.Obj.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, executionTime, exception));
348        }
349      }
350      catch (Exception e) {
351        OnExceptionOccured(e);
352        return null;
353      }
354    }
355
356    public IEnumerable<CachedHivePluginInfoDto> RequestPlugins(List<HivePluginInfoDto> requestedPlugins) {
357      try {
358        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
359          Logger.Debug("STARTED: Requesting Plugins for Job");
360          Logger.Debug("STARTED: Getting the stream");
361          Stream stream = service.Obj.GetStreamedPlugins(requestedPlugins.ToArray());
362          Logger.Debug("ENDED: Getting the stream");
363          BinaryFormatter formatter = new BinaryFormatter();
364          Logger.Debug("STARTED: Deserializing the stream");
365          ResponseList<CachedHivePluginInfoDto> response = (ResponseList<CachedHivePluginInfoDto>)formatter.Deserialize(stream);
366          Logger.Debug("ENDED: Deserializing the stream");
367          if (stream != null)
368            stream.Dispose();
369          return response.List;
370        }
371      }
372      catch (Exception e) {
373        OnExceptionOccured(e);
374        return null;
375      }
376    }
377
378    public void Logout(Guid guid) {
379      try {
380        Logger.Debug("STARTED: Logout");
381        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
382          service.Obj.Logout(guid);
383        }
384        Logger.Debug("ENDED: Logout");
385      }
386      catch (Exception e) {
387        OnExceptionOccured(e);
388      }
389    }
390
391    public ResponseCalendar GetCalendarSync(Guid clientId) {
392      try {
393        Logger.Debug("STARTED: Syncing Calendars");
394        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
395          ResponseCalendar cal = service.Obj.GetCalendar(clientId);
396          Logger.Debug("ENDED: Syncing Calendars");
397          return cal;
398        }
399      }
400      catch (Exception e) {
401        OnExceptionOccured(e);
402        return null;
403      }
404    }
405
406    public Response SetCalendarStatus(Guid clientId, CalendarState state) {
407      try {
408        Logger.Debug("STARTED: Setting Calendar status to: " + state);
409        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
410          Response resp = service.Obj.SetCalendarStatus(clientId, state);
411          Logger.Debug("ENDED: Setting Calendar status to: " + state);
412          return resp;
413        }
414      }
415      catch (Exception e) {
416        OnExceptionOccured(e);
417        return null;
418      }
419    }
420
421    public ResponseObject<JobDto> AddChildJob(Guid parentJobId, SerializedJob serializedJob) {
422      try {
423        Logger.Debug("STARTED: Add Child Job for parent: " + parentJobId);
424        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
425          ResponseObject<JobDto> response = service.Obj.AddChildJob(parentJobId, serializedJob);
426          Logger.Debug("ENDED: Add Child Job for parent: " + parentJobId);
427          return response;
428        }
429      }
430      catch (Exception e) {
431        OnExceptionOccured(e);
432        return null;
433      }
434    }
435
436    public ResponseObject<JobDto> PauseJob(SerializedJob serializedJob) {
437      try {
438        Logger.Debug("STARTED: Pausing job: " + serializedJob.JobInfo.Id);
439        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
440          ResponseObject<JobDto> response = service.Obj.PauseJob(serializedJob);
441          Logger.Debug("ENDED: Pausing job: " + serializedJob.JobInfo.Id);
442          return response;
443        }
444      }
445      catch (Exception e) {
446        OnExceptionOccured(e);
447        return null;
448      }
449    }
450
451    public ResponseObject<SerializedJobList> GetChildJobs(Guid parentJob) {
452      try {
453        Logger.Debug("STARTED: GetChildJobs job: " + parentJob);
454        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
455          SerializedJobList serializedJobs = new SerializedJobList();
456          JobResult[] results = service.Obj.GetChildJobResults(new Guid?(parentJob), false, false);
457          foreach (JobResult result in results) {
458            serializedJobs.Add(service.Obj.GetLastSerializedResult(result.Id));
459          }
460
461          Logger.Debug("ENDED: GetChildJobs job: " + parentJob);
462          return new ResponseObject<SerializedJobList>() {
463            Obj = serializedJobs
464          };
465        }
466      }
467      catch (Exception e) {
468        OnExceptionOccured(e);
469        return null;
470      }
471    }
472
473    public void DeleteChildJobs(Guid jobId) {
474      try {
475        using (Disposable<SlaveFacade.ISlaveFacade> service = ServiceLocator.Instance.StreamedSlaveFacadePool.GetService()) {
476          service.Obj.DeleteChildJobs(jobId);
477        }
478      }
479      catch (Exception e) {
480        OnExceptionOccured(e);
481      }
482    }
483
484    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
485    private void OnExceptionOccured(Exception e) {
486      Logger.Error("Error: " + e.ToString());
487      var handler = ExceptionOccured;
488      if (handler != null) handler(this, new EventArgs<Exception>(e));
489    }
490  }
491}
Note: See TracBrowser for help on using the repository browser.