Changeset 4337 for branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs
- Timestamp:
- 08/27/10 11:46:46 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Slave.Communication/3.3/WcfService.cs
r4333 r4337 61 61 public NetworkEnum.WcfConnState ConnState { get; private set; } 62 62 public bool LoggedIn { get; set; } 63 public string ServerIP { get; private set; } 63 64 private string serverIp; 65 public string ServerIp { 66 get { return serverIp; } 67 set { 68 if (serverIp != value) { 69 serverIp = value; 70 if (ServerChanged != null) 71 ServerChanged(this, new EventArgs()); 72 } 73 } 74 } 64 75 65 76 public event EventHandler ConnectionRestored; … … 67 78 public event EventHandler Connected; 68 79 69 public SlaveFacadeClient proxy = null;70 71 80 /// <summary> 72 81 /// Constructor … … 81 90 /// </summary> 82 91 public void Connect() { 92 SlaveService.ISlaveFacade client = null; 83 93 try { 84 94 Logger.Debug("Starting the Connection Process"); 85 if (String.Empty.Equals(ServerI P)) {95 if (String.Empty.Equals(ServerIp)) { 86 96 Logger.Info("No Server IP set!"); 87 97 return; … … 89 99 90 100 Logger.Debug("Creating the new connection proxy"); 91 proxy = ServiceLocator.CreateStreamedSlaveFacade(ServerIP);101 client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp); 92 102 Logger.Debug("Created the new connection proxy"); 93 103 94 Logger.Debug("Registring new Events");95 proxy.GetStreamedJobCompleted += new EventHandler<GetStreamedJobCompletedEventArgs>(proxy_GetStreamedJobCompleted);96 proxy.StoreFinishedJobResultStreamedCompleted += new EventHandler<StoreFinishedJobResultStreamedCompletedEventArgs>(proxy_StoreFinishedJobResultStreamedCompleted);97 proxy.ProcessSnapshotStreamedCompleted += new EventHandler<ProcessSnapshotStreamedCompletedEventArgs>(proxy_ProcessSnapshotStreamedCompleted);98 proxy.ProcessHeartBeatCompleted += new EventHandler<ProcessHeartBeatCompletedEventArgs>(proxy_ProcessHeartBeatCompleted);99 Logger.Debug("Registered new Events");100 Logger.Debug("Opening the Connection");101 proxy.ClientCredentials.UserName.UserName = Settings.Default.HiveUsername;102 proxy.ClientCredentials.UserName.Password = Settings.Default.HivePassword;103 proxy.Open();104 Logger.Debug("Opened the Connection");105 106 104 ConnState = NetworkEnum.WcfConnState.Connected; 107 105 ConnectedSince = DateTime.Now; 108 LoggedIn = false;109 106 110 107 if (Connected != null) { 111 Logger.Debug("Calling the connected Event");112 108 Connected(this, new EventArgs()); 113 //Todo: This won't be hit. EVER114 109 } 115 110 if (ConnState == NetworkEnum.WcfConnState.Failed) { 116 111 ConnectionRestored(this, new EventArgs()); 117 112 } 118 } catch (Exception ex) { 113 } 114 catch (Exception ex) { 119 115 HandleNetworkError(ex); 120 116 } 121 } 122 123 /// <summary> 124 /// Changes the Connectionsettings (serverIP) and reconnects 125 /// </summary> 126 /// <param name="serverIP">current Server IP</param> 127 public void Connect(String serverIP) { 128 Logger.Debug("Called Connected with " + serverIP); 129 String oldIp = this.ServerIP; 130 this.ServerIP = serverIP; 131 Connect(); 132 if (oldIp != serverIP) 133 if (ServerChanged != null) 134 ServerChanged(this, new EventArgs()); 135 } 136 137 public void SetIP(String serverIP) { 138 Logger.Debug("Called with " + serverIP); 139 this.ServerIP = serverIP; 140 } 141 142 /// <summary> 143 /// Disconnects the Slave from the Server 144 /// </summary> 117 finally { 118 ServiceLocator.DisposeSlaveClient(client); 119 } 120 } 121 122 ///// <summary> 123 ///// Disconnects the Slave from the Server 124 ///// </summary> 145 125 public void Disconnect() { 146 126 ConnState = NetworkEnum.WcfConnState.Disconnected; … … 161 141 /// Methods for the Server Login 162 142 /// </summary> 163 public void LoginSync(SlaveDto slaveInfo) { 143 public void Login(SlaveDto slaveInfo) { 144 SlaveService.ISlaveFacade client = null; 164 145 try { 165 146 if (ConnState == NetworkEnum.WcfConnState.Connected) { 166 147 Logger.Debug("STARTED: Login Sync"); 167 Response res = proxy.Login(slaveInfo); 148 client = ServiceLocator.CreateSlaveFacade(ServerIp); 149 Response res = client.Login(slaveInfo); 168 150 if (res.StatusMessage != ResponseStatus.Ok) { 169 151 Logger.Error("FAILED: Login Failed! " + res.StatusMessage); … … 174 156 } 175 157 } 176 } catch (Exception e) { 177 HandleNetworkError(e); 178 } 179 } 180 158 } 159 catch (Exception e) { 160 HandleNetworkError(e); 161 } 162 finally { 163 ServiceLocator.DisposeSlaveClient(client); 164 } 165 } 166 181 167 /// <summary> 182 168 /// Pull a Job from the Server … … 187 173 if (LoggedIn) { 188 174 Logger.Debug("STARTED: Fetching of Jobs from Server for Slave"); 189 proxy.GetStreamedJobAsync(guid); 190 } 191 } 192 193 void proxy_GetStreamedJobCompleted(object sender, GetStreamedJobCompletedEventArgs e) { 194 if (e.Error == null) { 195 Logger.Debug("ENDED: Fetching of Jobs from Server for Slave"); 196 Stream stream = null; 197 MemoryStream memStream = null; 198 199 try { 200 stream = (Stream)e.Result; 201 202 //first deserialize the response 203 BinaryFormatter formatter = new BinaryFormatter(); 204 ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream); 205 206 //second deserialize the BLOB 207 memStream = new MemoryStream(); 208 209 byte[] buffer = new byte[3024]; 210 int read = 0; 211 while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) { 212 memStream.Write(buffer, 0, read); 213 } 214 215 memStream.Close(); 216 217 GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, e.Error, e.Cancelled, e.UserState); 218 GetJobCompleted(sender, completedEventArgs); 219 } catch (Exception ex) { 220 Logger.Error(ex); 221 } finally { 222 if (stream != null) 223 stream.Dispose(); 224 225 if (memStream != null) 226 memStream.Dispose(); 227 } 228 } else 229 HandleNetworkError(e.Error); 175 SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp); 176 //client.GetStreamedJobAsync(guid); 177 client.BeginGetStreamedJob(guid, (ar => { 178 if (ar.IsCompleted) { 179 Stream stream = null; 180 MemoryStream memStream = null; 181 try { 182 Logger.Debug("ENDED: Fetching of Jobs from Server for Slave"); 183 stream = client.EndGetStreamedJob(ar); 184 185 //first deserialize the response 186 BinaryFormatter formatter = new BinaryFormatter(); 187 ResponseObject<JobDto> response = (ResponseObject<JobDto>)formatter.Deserialize(stream); 188 189 //second deserialize the BLOB 190 memStream = new MemoryStream(); 191 192 byte[] buffer = new byte[3024]; 193 int read = 0; 194 while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) { 195 memStream.Write(buffer, 0, read); 196 } 197 198 memStream.Close(); 199 200 GetJobCompletedEventArgs completedEventArgs = new GetJobCompletedEventArgs(new object[] { response, memStream.GetBuffer() }, null, !ar.IsCompleted, ar.AsyncState); 201 GetJobCompleted(this, completedEventArgs); 202 } 203 catch (Exception ex) { 204 Logger.Error(ex); 205 } 206 finally { 207 if (stream != null) 208 stream.Dispose(); 209 210 if (memStream != null) 211 memStream.Dispose(); 212 } 213 } else 214 HandleNetworkError(new FaultException("GetJobAsync did not complete")); 215 216 ServiceLocator.DisposeSlaveClient(client); 217 }), null); 218 } 230 219 } 231 220 … … 244 233 Logger.Debug("Builded stream"); 245 234 Logger.Debug("Making the call"); 246 proxy.StoreFinishedJobResultStreamedAsync(stream, stream);247 }248 }249 250 private void proxy_StoreFinishedJobResultStreamedCompleted(object sender, StoreFinishedJobResultStreamedCompletedEventArgs e) {251 Logger.Debug("Finished storing the job");252 Stream stream = (Stream)e.UserState; 253 if (stream != null) {254 Logger.Debug("Stream not null, disposing it");255 stream.Dispose();256 }257 if (e.Error == null) {258 StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { e.Result }, e.Error, e.Cancelled, e.UserState);259 Logger.Debug("calling the Finished Job Event");260 GetFinishedJobResultCompleted(sender, args);261 Logger.Debug("ENDED: Sending back the finished job results");262 } else {263 HandleNetworkError(e.Error);235 //proxy.StoreFinishedJobResultStreamedAsync(stream, stream); 236 SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp); 237 client.BeginStoreFinishedJobResultStreamed(stream, (ar => { 238 Logger.Debug("Finished storing the job"); 239 if (stream != null) 240 stream.Dispose(); 241 242 if (ar.IsCompleted) { 243 var res = client.EndStoreFinishedJobResultStreamed(ar); 244 StoreFinishedJobResultCompletedEventArgs args = new StoreFinishedJobResultCompletedEventArgs(new object[] { res }, null, false, null); 245 Logger.Debug("calling the Finished Job Event"); 246 GetFinishedJobResultCompleted(this, args); 247 Logger.Debug("ENDED: Sending back the finished job results"); 248 } else { 249 HandleNetworkError(new FaultException("GetFinishedJobResultAsync did not complete")); 250 } 251 ServiceLocator.DisposeSlaveClient(client); 252 }), null); 264 253 } 265 254 } … … 272 261 if (LoggedIn) { 273 262 Stream stream = GetStreamedJobResult(clientId, jobId, result, percentage, exception); 274 proxy.ProcessSnapshotStreamedAsync(stream, stream); 275 } 276 } 277 278 void proxy_ProcessSnapshotStreamedCompleted(object sender, ProcessSnapshotStreamedCompletedEventArgs e) { 279 Stream stream = 280 (Stream)e.UserState; 281 if (stream != null) 282 stream.Dispose(); 283 284 if (e.Error == null) { 285 ProcessSnapshotCompletedEventArgs args = 286 new ProcessSnapshotCompletedEventArgs( 287 new object[] { e.Result }, e.Error, e.Cancelled, e.UserState); 288 289 ProcessSnapshotCompleted(sender, args); 290 } else 291 HandleNetworkError(e.Error); 263 //proxy.ProcessSnapshotStreamedAsync(stream, stream); 264 SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp); 265 client.BeginProcessSnapshotStreamed(stream, (ar => { 266 if (stream != null) 267 stream.Dispose(); 268 269 if (ar.IsCompleted) { 270 var res = client.EndStoreFinishedJobResultStreamed(ar); 271 ProcessSnapshotCompletedEventArgs args = new ProcessSnapshotCompletedEventArgs(new object[] { res }, null, false, null); 272 ProcessSnapshotCompleted(this, args); 273 } else { 274 HandleNetworkError(new FaultException("ProcessSnapshotAsync did not complete")); 275 } 276 ServiceLocator.DisposeSlaveClient(client); 277 }), null); 278 } 292 279 } 293 280 … … 301 288 public event System.EventHandler<ProcessHeartBeatCompletedEventArgs> ProcessHeartBeatCompleted; 302 289 public void ProcessHeartBeatAsync(HeartBeatData hbd) { 303 if (LoggedIn) 290 if (LoggedIn) { 304 291 Logger.Debug("STARTING: sending heartbeat"); 305 proxy.ProcessHeartBeatAsync(hbd); 306 } 307 308 private void proxy_ProcessHeartBeatCompleted(object sender, ProcessHeartBeatCompletedEventArgs e) { 309 if (e.Error == null && e.Result.StatusMessage == ResponseStatus.Ok) { 310 ProcessHeartBeatCompleted(sender, e); 311 Logger.Debug("ENDED: sending heartbeats"); 312 } else { 313 try { 314 Logger.Error("Error: " + e.Result.StatusMessage); 315 } catch (Exception ex) { 316 Logger.Error("Error: ", ex); 317 } 318 HandleNetworkError(e.Error); 292 //proxy.ProcessHeartBeatAsync(hbd); 293 SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp); 294 client.BeginProcessHeartBeat(hbd, (ar => { 295 if (ar.IsCompleted) { 296 var res = client.EndProcessHeartBeat(ar); 297 if (res.StatusMessage == ResponseStatus.Ok) { 298 ProcessHeartBeatCompleted(this, new ProcessHeartBeatCompletedEventArgs(new object[] { res }, null, false, null)); 299 Logger.Debug("ENDED: sending heartbeats"); 300 } else { 301 Logger.Error("FAILED: sending heartbeats: " + res.StatusMessage.ToString()); 302 } 303 } else { 304 HandleNetworkError(new FaultException("ProcessHeartBeatAsync did not complete")); 305 } 306 ServiceLocator.DisposeSlaveClient(client); 307 }), null); 319 308 } 320 309 } … … 345 334 346 335 public ResponseResultReceived StoreFinishedJobResultsSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception, bool finished) { 347 return proxy.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception)); 336 SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp); 337 ResponseResultReceived res = client.StoreFinishedJobResultStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception)); 338 ServiceLocator.DisposeSlaveClient(client); 339 return res; 348 340 } 349 341 … … 351 343 try { 352 344 Logger.Debug("STARTING: Sync call: IsJobStillNeeded"); 353 Response res = proxy.IsJobStillNeeded(jobId); 345 SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp); 346 Response res = client.IsJobStillNeeded(jobId); 347 ServiceLocator.DisposeSlaveClient(client); 354 348 Logger.Debug("ENDED: Sync call: IsJobStillNeeded"); 355 349 return res; 356 } catch (Exception e) { 350 } 351 catch (Exception e) { 357 352 HandleNetworkError(e); 358 353 return null; … … 362 357 public ResponseResultReceived ProcessSnapshotSync(Guid clientId, Guid jobId, byte[] result, double percentage, Exception exception) { 363 358 try { 364 return proxy.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception)); 365 } catch (Exception e) { 359 SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp); 360 var res = client.ProcessSnapshotStreamed(GetStreamedJobResult(clientId, jobId, result, percentage, exception)); 361 ServiceLocator.DisposeSlaveClient(client); 362 return res; 363 } 364 catch (Exception e) { 366 365 HandleNetworkError(e); 367 366 return null; … … 373 372 Logger.Debug("STARTED: Requesting Plugins for Job"); 374 373 Logger.Debug("STARTED: Getting the stream"); 375 Stream stream = proxy.GetStreamedPlugins(requestedPlugins.ToArray()); 374 SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp); 375 Stream stream = client.GetStreamedPlugins(requestedPlugins.ToArray()); 376 ServiceLocator.DisposeSlaveClient(client); 376 377 Logger.Debug("ENDED: Getting the stream"); 377 378 BinaryFormatter formatter = new BinaryFormatter(); … … 382 383 stream.Dispose(); 383 384 return response.List; 384 } catch (Exception e) { 385 } 386 catch (Exception e) { 385 387 HandleNetworkError(e); 386 388 return null; … … 391 393 try { 392 394 Logger.Debug("STARTED: Logout"); 393 proxy.Logout(guid); 395 SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp); 396 client.Logout(guid); 397 ServiceLocator.DisposeSlaveClient(client); 394 398 Logger.Debug("ENDED: Logout"); 395 } catch (Exception e) { 399 } 400 catch (Exception e) { 396 401 HandleNetworkError(e); 397 402 } … … 401 406 try { 402 407 Logger.Debug("STARTED: Syncing Calendars"); 403 ResponseCalendar cal = proxy.GetCalendar(clientId); 408 SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp); 409 ResponseCalendar cal = client.GetCalendar(clientId); 410 ServiceLocator.DisposeSlaveClient(client); 404 411 Logger.Debug("ENDED: Syncing Calendars"); 405 412 return cal; 406 } catch (Exception e) { 413 } 414 catch (Exception e) { 407 415 HandleNetworkError(e); 408 416 return null; … … 413 421 try { 414 422 Logger.Debug("STARTED: Setting Calendar status to: " + state); 415 Response resp = proxy.SetCalendarStatus(clientId, state); 423 SlaveService.ISlaveFacade client = ServiceLocator.CreateStreamedSlaveFacade(ServerIp); 424 Response resp = client.SetCalendarStatus(clientId, state); 425 ServiceLocator.DisposeSlaveClient(client); 416 426 Logger.Debug("ENDED: Setting Calendar status to: " + state); 417 427 return resp; 418 } catch (Exception e) { 428 } 429 catch (Exception e) { 419 430 HandleNetworkError(e); 420 431 return null;
Note: See TracChangeset
for help on using the changeset viewer.