- Timestamp:
- 05/10/11 17:58:59 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.HiveEngine/3.4/HiveEngine.cs
r6111 r6178 7 7 using HeuristicLab.Common; 8 8 using HeuristicLab.Core; 9 using HeuristicLab.Hive; 9 10 using HeuristicLab.Persistence.Default.CompositeSerializers.Storable; 10 11 using HeuristicLab.PluginInfrastructure; … … 140 141 141 142 IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken); 142 143 //IScope[] scopes = ExecuteLocally(jobs, parentScopeClone, cancellationToken); 144 143 145 for (int i = 0; i < coll.Count; i++) { 144 146 if (coll[i] is IAtomicOperation) { … … 203 205 204 206 // testfunction: 205 //private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) { 206 // IScope[] scopes = new Scope[jobs.Length]; 207 // for (int i = 0; i < jobs.Length; i++) { 208 // var job = (EngineJob)jobs[i].Clone(); 209 // job.Start(); 210 // while (job.ExecutionState != ExecutionState.Stopped) { 211 // Thread.Sleep(100); 212 // } 213 // scopes[i] = ((IAtomicOperation)job.InitialOperation).Scope; 214 // } 215 // return scopes; 216 //} 207 private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) { 208 IScope[] scopes = new Scope[jobs.Length]; 209 for (int i = 0; i < jobs.Length; i++) { 210 var serialized = PersistenceUtil.Serialize(jobs[i]); 211 var deserialized = PersistenceUtil.Deserialize<IJob>(serialized); 212 deserialized.Start(); 213 while (deserialized.ExecutionState != ExecutionState.Stopped) { 214 Thread.Sleep(100); 215 } 216 var serialized2 = PersistenceUtil.Serialize(deserialized); 217 var deserialized2 = PersistenceUtil.Deserialize<EngineJob>(serialized2); 218 var newScope = ((IAtomicOperation)deserialized2.InitialOperation).Scope; 219 scopes[i] = newScope; 220 } 221 return scopes; 222 } 217 223 218 224 /// <summary> … … 230 236 try { 231 237 List<Guid> remainingJobIds = new List<Guid>(); 232 List<LightweightJob> lightweightJobs;233 234 int finishedCount = 0;235 int uploadCount = 0;236 238 237 239 // create hive experiment 238 240 hiveExperiment.Name = "HiveEngine Run " + hiveExperiments.Count; 241 hiveExperiment.DateCreated = DateTime.Now; 239 242 hiveExperiment.UseLocalPlugins = this.UseLocalPlugins; 240 243 hiveExperiment.ResourceNames = this.ResourceNames; 241 hiveExperiment.Id = ServiceLocator.Instance.CallHiveService(s => s.AddHiveExperiment(hiveExperiment));242 244 var refreshableHiveExperiment = new RefreshableHiveExperiment(hiveExperiment); 245 refreshableHiveExperiment.IsControllable = false; 243 246 hiveExperiments.Add(refreshableHiveExperiment); 244 247 … … 246 249 var uploadTasks = new List<Task<Job>>(); 247 250 for (int i = 0; i < jobs.Length; i++) { 248 var job = jobs[i];251 hiveExperiment.HiveJobs.Add(new EngineHiveJob(jobs[i], parentScopeClone)); 249 252 250 253 // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable) 251 IRandom random = FindRandomParameter(job .InitialOperation as IExecutionContext);254 IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext); 252 255 if (random != null) 253 256 random.Reset(random.Next()); 254 255 uploadTasks.Add(Task.Factory.StartNew<Job>((keyValuePairObj) => { 256 return UploadJob(keyValuePairObj, parentScopeClone, cancellationToken, GetResourceIds(), hiveExperiment.Id); 257 }, new KeyValuePair<int, EngineJob>(i, job), cancellationToken)); 258 } 259 260 Task processUploadedJobsTask = new Task(() => { 261 // process finished upload-tasks 262 int uploadTasksCount = uploadTasks.Count; 263 for (int i = 0; i < uploadTasksCount; i++) { 264 cancellationToken.ThrowIfCancellationRequested(); 265 266 var uploadTasksArray = uploadTasks.ToArray(); 267 var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)]; 268 if (task.Status == TaskStatus.Faulted) { 269 LogException(task.Exception); 270 throw task.Exception; 271 } 272 273 int key = ((KeyValuePair<int, EngineJob>)task.AsyncState).Key; 274 Job job = task.Result; 275 lock (locker) { 276 uploadCount++; 277 jobIndices.Add(job.Id, key); 278 remainingJobIds.Add(job.Id); 279 } 280 jobs[key] = null; // relax memory 281 LogMessage(string.Format("Uploaded job #{0}", key + 1, job.Id)); 282 uploadTasks.Remove(task); 283 } 284 }, cancellationToken, TaskCreationOptions.PreferFairness); 285 processUploadedJobsTask.Start(); 286 287 refreshableHiveExperiment.RefreshAutomatically = true; 288 257 } 258 ExperimentManagerClient.StartExperiment((e) => { throw e; }, refreshableHiveExperiment); 259 // do polling until experiment is finished and all jobs are downloaded 289 260 while (!refreshableHiveExperiment.AllJobsFinished()) { 290 Thread.Sleep(1000); 291 // update time 292 // handle cancellation 293 } 294 295 296 297 // poll job-statuses and create tasks for those which are finished 298 var downloadTasks = new List<Task<EngineJob>>(); 299 var executionTimes = new List<TimeSpan>(); 300 var executionTimeOnHiveBefore = executionTimeOnHive; 301 while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) { 302 cancellationToken.ThrowIfCancellationRequested(); 303 304 Thread.Sleep(10000); 305 try { 306 lightweightJobs = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobs(remainingJobIds)); 307 308 var jobsFinished = lightweightJobs.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted); 309 finishedCount += jobsFinished.Count(); 310 if (jobsFinished.Count() > 0) LogMessage(string.Format("Finished: {0}/{1}", finishedCount, jobs.Length)); 311 ExecutionTimeOnHive = executionTimeOnHiveBefore + executionTimes.Sum() + lightweightJobs.Select(x => x.ExecutionTime.HasValue ? x.ExecutionTime.Value : TimeSpan.Zero).Sum(); 312 313 foreach (var result in jobsFinished) { 314 if (result.State == JobState.Finished) { 315 downloadTasks.Add(Task.Factory.StartNew<EngineJob>((jobIdObj) => { 316 return DownloadJob(jobIndices, jobIdObj, cancellationToken); 317 }, result.Id, cancellationToken)); 318 } else if (result.State == JobState.Aborted) { 319 LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id)); 320 } else if (result.State == JobState.Failed) { 321 LogMessage(string.Format("Job #{0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.CurrentStateLog != null ? result.CurrentStateLog.Exception : string.Empty)); 322 } 323 remainingJobIds.Remove(result.Id); 324 executionTimes.Add(result.ExecutionTime.HasValue ? result.ExecutionTime.Value : TimeSpan.Zero); 325 } 326 } 327 catch (Exception e) { 328 LogException(e); 329 } 330 } 331 332 // process finished download-tasks 333 int downloadTasksCount = downloadTasks.Count; 334 for (int i = 0; i < downloadTasksCount; i++) { 335 cancellationToken.ThrowIfCancellationRequested(); 336 337 var downloadTasksArray = downloadTasks.ToArray(); 338 var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)]; 339 var jobId = (Guid)task.AsyncState; 340 if (task.Status == TaskStatus.Faulted) { 341 LogException(task.Exception); 342 throw task.Exception; 343 } 344 scopes[jobIndices[(Guid)task.AsyncState]] = ((IAtomicOperation)task.Result.InitialOperation).Scope; 345 downloadTasks.Remove(task); 346 } 347 348 LogMessage(string.Format("All jobs finished (TotalExecutionTime: {0}).", executionTimes.Sum())); 261 Thread.Sleep(500); 262 this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(hiveExperiments.Sum(x => x.HiveExperiment.ExecutionTime.TotalMilliseconds)); 263 } 264 LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableHiveExperiment.ToString(), refreshableHiveExperiment.HiveExperiment.ExecutionTime)); 265 266 // get scopes 267 int j = 0; 268 foreach (var hiveJob in hiveExperiment.HiveJobs) { 269 var scope = ((IAtomicOperation) ((EngineJob)hiveJob.ItemJob).InitialOperation).Scope; 270 scopes[j++] = scope; 271 } 272 refreshableHiveExperiment.RefreshAutomatically = false; 349 273 DeleteHiveExperiment(hiveExperiment.Id); 350 351 274 return scopes; 352 275 } … … 505 428 } 506 429 507 public static class ScopeExtensions {508 public static void ClearParentScopes(this IScope scope) {509 scope.ClearParentScopes(null);510 }511 512 public static void ClearParentScopes(this IScope scope, IScope childScope) {513 if (childScope != null) {514 scope.SubScopes.Clear();515 scope.SubScopes.Add(childScope);516 }517 if (scope.Parent != null)518 scope.Parent.ClearParentScopes(scope);519 }520 }521 522 430 public static class EnumerableExtensions { 523 431 public static TimeSpan Sum(this IEnumerable<TimeSpan> times) {
Note: See TracChangeset
for help on using the changeset viewer.