- Timestamp:
- 05/16/11 00:18:48 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.HiveEngine/3.4/HiveEngine.cs
r6198 r6200 7 7 using HeuristicLab.Common; 8 8 using HeuristicLab.Core; 9 using HeuristicLab.Hive;10 9 using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; 11 using HeuristicLab.PluginInfrastructure;12 10 13 11 namespace HeuristicLab.HiveEngine { … … 18 16 [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")] 19 17 public class HiveEngine : Engine { 20 private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections 21 private Semaphore maxSerializedJobsInMemory = new Semaphore(4, 4); // avoid memory problems 18 private static object logLocker = new object(); 22 19 private CancellationToken cancellationToken; 23 20 24 21 [Storable] 25 22 private IOperator currentOperator; … … 54 51 } 55 52 56 [Storable]57 private ItemCollection<RefreshableHiveExperiment> hiveExperiments ;53 // [Storable] -> HiveExperiment can't be storable, so RefreshableHiveExperiment can't be stored 54 private ItemCollection<RefreshableHiveExperiment> hiveExperiments = new ItemCollection<RefreshableHiveExperiment>(); 58 55 public ItemCollection<RefreshableHiveExperiment> HiveExperiments { 59 56 get { return hiveExperiments; } … … 76 73 public HiveEngine() { 77 74 ResourceNames = "HEAL"; 78 HiveExperiments = new ItemCollection<RefreshableHiveExperiment>();79 75 Priority = 0; 80 76 } … … 89 85 this.executionTimeOnHive = original.executionTimeOnHive; 90 86 this.useLocalPlugins = original.useLocalPlugins; 87 this.hiveExperiments = cloner.Clone(original.hiveExperiments); 91 88 } 92 89 public override IDeepCloneable Clone(Cloner cloner) { … … 202 199 target.SubScopes.AddRange(source.SubScopes); 203 200 // TODO: validate if parent scopes match - otherwise source is invalid 204 }205 206 // testfunction:207 private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {208 IScope[] scopes = new Scope[jobs.Length];209 for (int i = 0; i < jobs.Length; i++) {210 var serialized = PersistenceUtil.Serialize(jobs[i]);211 var deserialized = PersistenceUtil.Deserialize<IJob>(serialized);212 deserialized.Start();213 while (deserialized.ExecutionState != ExecutionState.Stopped) {214 Thread.Sleep(100);215 }216 var serialized2 = PersistenceUtil.Serialize(deserialized);217 var deserialized2 = PersistenceUtil.Deserialize<EngineJob>(serialized2);218 var newScope = ((IAtomicOperation)deserialized2.InitialOperation).Scope;219 scopes[i] = newScope;220 }221 return scopes;222 201 } 223 202 … … 256 235 random.Reset(random.Next()); 257 236 } 258 ExperimentManagerClient.StartExperiment((e) => { throw e; }, refreshableHiveExperiment); 237 ExperimentManagerClient.StartExperiment((e) => { 238 LogException(e); 239 }, refreshableHiveExperiment); 240 259 241 // do polling until experiment is finished and all jobs are downloaded 260 242 while (!refreshableHiveExperiment.AllJobsFinished()) { … … 273 255 refreshableHiveExperiment.RefreshAutomatically = false; 274 256 DeleteHiveExperiment(hiveExperiment.Id); 257 ClearData(refreshableHiveExperiment); 275 258 return scopes; 276 259 } … … 290 273 } 291 274 275 private void ClearData(RefreshableHiveExperiment refreshableHiveExperiment) { 276 var jobs = refreshableHiveExperiment.HiveExperiment.GetAllHiveJobs(); 277 foreach (var job in jobs) { 278 job.ClearData(); 279 } 280 } 281 292 282 private void DeleteHiveExperiment(Guid hiveExperimentId) { 293 TryAndRepeat(() => {283 ExperimentManagerClient.TryAndRepeat(() => { 294 284 ServiceLocator.Instance.CallHiveService(s => s.DeleteHiveExperiment(hiveExperimentId)); 295 285 }, 5, string.Format("Could not delete jobs")); 296 286 } 297 298 private static object locker = new object(); 299 private Job UploadJob(object keyValuePairObj, IScope parentScopeClone, CancellationToken cancellationToken, List<Guid> resourceIds, Guid hiveExperimentId) { 300 var keyValuePair = (KeyValuePair<int, EngineJob>)keyValuePairObj; 301 Job job = new Job(); 302 303 try { 304 maxSerializedJobsInMemory.WaitOne(); 305 JobData jobData = new JobData(); 306 IEnumerable<Type> usedTypes; 307 308 // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems 309 lock (locker) { 310 ((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.Parent = parentScopeClone; 311 keyValuePair.Value.InitialOperation = (IOperation)keyValuePair.Value.InitialOperation.Clone(); 312 if (keyValuePair.Value.InitialOperation is IAtomicOperation) 313 ((IAtomicOperation)keyValuePair.Value.InitialOperation).Scope.ClearParentScopes(); 314 jobData.Data = PersistenceUtil.Serialize(keyValuePair.Value, out usedTypes); 315 } 316 var neededPlugins = new List<IPluginDescription>(); 317 318 bool useAllLocalPlugins = true; 319 if (useAllLocalPlugins) { 320 // use all plugins 321 neededPlugins.AddRange(ApplicationManager.Manager.Plugins); 322 } else { 323 // use only 324 PluginUtil.CollectDeclaringPlugins(neededPlugins, usedTypes); 325 } 326 327 job.CoresNeeded = 1; 328 job.PluginsNeededIds = ServiceLocator.Instance.CallHiveService(s => PluginUtil.GetPluginDependencies(s, this.OnlinePlugins, this.AlreadyUploadedPlugins, neededPlugins, useLocalPlugins)); 329 job.Priority = priority; 330 job.HiveExperimentId = hiveExperimentId; 331 332 try { 333 maxConcurrentConnections.WaitOne(); 334 while (job.Id == Guid.Empty) { // repeat until success 335 cancellationToken.ThrowIfCancellationRequested(); 336 try { 337 job.Id = ServiceLocator.Instance.CallHiveService(s => s.AddJob(job, jobData, resourceIds)); 338 } 339 catch (Exception e) { 340 LogException(e); 341 LogMessage("Repeating upload"); 342 } 343 } 344 } 345 finally { 346 maxConcurrentConnections.Release(); 347 } 348 } 349 finally { 350 maxSerializedJobsInMemory.Release(); 351 } 352 return job; 353 } 354 287 355 288 private List<Guid> GetResourceIds() { 356 289 return ServiceLocator.Instance.CallHiveService(service => { … … 368 301 } 369 302 370 private EngineJob DownloadJob(IDictionary<Guid, int> jobIndices, object jobIdObj, CancellationToken cancellationToken) {371 Guid jobId = (Guid)jobIdObj;372 JobData jobData = null;373 EngineJob engineJob = null;374 try {375 maxSerializedJobsInMemory.WaitOne();376 maxConcurrentConnections.WaitOne();377 while (jobData == null) { // repeat until success378 cancellationToken.ThrowIfCancellationRequested();379 try {380 jobData = ServiceLocator.Instance.CallHiveService(s => s.GetJobData(jobId));381 }382 catch (Exception e) {383 LogException(e);384 LogMessage("Repeating download");385 }386 }387 engineJob = PersistenceUtil.Deserialize<EngineJob>(jobData.Data);388 jobData = null;389 LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId));390 }391 finally {392 maxConcurrentConnections.Release();393 maxSerializedJobsInMemory.Release();394 }395 return engineJob;396 }397 398 303 /// <summary> 399 304 /// Threadsafe message logging 400 305 /// </summary> 401 306 private void LogMessage(string message) { 402 lock ( Log) {307 lock (logLocker) { 403 308 Log.LogMessage(message); 404 309 } … … 409 314 /// </summary> 410 315 private void LogException(Exception exception) { 411 lock ( Log) {316 lock (logLocker) { 412 317 Log.LogException(exception); 413 318 } 414 319 } 415 320 416 /// <summary> 417 /// Executes the action. If it throws an exception it is repeated until repetition-count is reached. 418 /// If repetitions is -1, it is repeated infinitely. 419 /// </summary> 420 private static void TryAndRepeat(Action action, int repetitions, string errorMessage) { 421 try { action(); } 422 catch (Exception e) { 423 repetitions--; 424 if (repetitions <= 0) 425 throw new HiveEngineException(errorMessage, e); 426 TryAndRepeat(action, repetitions, errorMessage); 427 } 428 } 429 } 430 431 public static class EnumerableExtensions { 432 public static TimeSpan Sum(this IEnumerable<TimeSpan> times) { 433 return TimeSpan.FromMilliseconds(times.Select(e => e.TotalMilliseconds).Sum()); 434 } 321 // testfunction: 322 //private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) { 323 // IScope[] scopes = new Scope[jobs.Length]; 324 // for (int i = 0; i < jobs.Length; i++) { 325 // var serialized = PersistenceUtil.Serialize(jobs[i]); 326 // var deserialized = PersistenceUtil.Deserialize<IJob>(serialized); 327 // deserialized.Start(); 328 // while (deserialized.ExecutionState != ExecutionState.Stopped) { 329 // Thread.Sleep(100); 330 // } 331 // var serialized2 = PersistenceUtil.Serialize(deserialized); 332 // var deserialized2 = PersistenceUtil.Deserialize<EngineJob>(serialized2); 333 // var newScope = ((IAtomicOperation)deserialized2.InitialOperation).Scope; 334 // scopes[i] = newScope; 335 // } 336 // return scopes; 337 //} 435 338 } 436 339 }
Note: See TracChangeset
for help on using the changeset viewer.