Changeset 5263
- Timestamp:
- 01/10/11 02:29:22 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs
r5232 r5263 24 24 public class HiveEngine : Engine { 25 25 private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections 26 private Semaphore maxConcurrentSerializations = new Semaphore(1, 1); // allow ony ONE concurrent serialization, because the operations share the same ParentScopes and serializing the same objects concurrently causes problems 27 private Semaphore maxSerializedJobsInMemory = new Semaphore(2, 2); // avoid memory problems 26 private Semaphore maxSerializedJobsInMemory = new Semaphore(4, 4); // avoid memory problems 28 27 private CancellationToken cancellationToken; 29 28 … … 68 67 coll = (OperationCollection)next; 69 68 if (coll.Parallel) { 70 IDictionary<IOperation, OperationJob> jobs = new Dictionary<IOperation, OperationJob>();71 for each (IOperation op in coll) {72 jobs .Add(op, new OperationJob(op));73 } 74 75 ExecuteOnHive(jobs, cancellationToken);76 77 for each (var kvp in jobs) {78 if ( kvp.Keyis IAtomicOperation) {79 ReIntegrateScope(kvp.Value.Operation as IAtomicOperation, kvp.Key as IAtomicOperation);80 } else if ( kvp.Keyis OperationCollection) {69 OperationJob[] jobs = new OperationJob[coll.Count]; 70 for (int i = 0; i < coll.Count; i++) { 71 jobs[i] = new OperationJob(coll[i]); 72 } 73 74 IScope[] scopes = ExecuteOnHive(jobs, cancellationToken); 75 76 for (int i = 0; i < coll.Count; i++) { 77 if (coll[i] is IAtomicOperation) { 78 ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope); 79 } else if (coll[i] is OperationCollection) { 81 80 // todo ?? 82 81 } … … 136 135 /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation 137 136 /// </summary> 138 /// <param name="job Dict"></param>139 private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict, CancellationToken cancellationToken) {140 LogMessage(string.Format("Executing {0} operations on the hive.", job Dict.Count));141 137 /// <param name="jobs"></param> 138 private IScope[] ExecuteOnHive(OperationJob[] jobs, CancellationToken cancellationToken) { 139 LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length)); 140 IScope[] scopes = new Scope[jobs.Length]; 142 141 object locker = new object(); 142 143 143 try { 144 IDictionary<Guid, IOperation> jobIds = new Dictionary<Guid, IOperation>();144 IDictionary<Guid, int> jobIndices = new Dictionary<Guid, int>(); 145 145 List<Guid> remainingJobIds = new List<Guid>(); 146 IDictionary<Guid, int> jobNumbers = new Dictionary<Guid, int>(); // for better readability of log147 146 JobResultList results; 148 147 var pluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList(); … … 152 151 // create upload-tasks 153 152 var uploadTasks = new List<Task<JobDto>>(); 154 foreach (var kvp in jobDict) { 153 for (int i = 0; i < jobs.Length; i++) { 154 var job = jobs[i]; 155 155 156 // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable) 156 IRandom random = FindRandomParameter( kvp.Keyas IExecutionContext);157 IRandom random = FindRandomParameter(job.Operation as IExecutionContext); 157 158 if (random != null) 158 159 random.Reset(random.Next()); … … 160 161 uploadTasks.Add(Task.Factory.StartNew<JobDto>((keyValuePairObj) => { 161 162 return UploadJob(pluginsNeeded, keyValuePairObj, cancellationToken); 162 }, kvp, cancellationToken));163 }, new KeyValuePair<int, OperationJob>(i, job), cancellationToken)); 163 164 } 164 165 … … 176 177 } 177 178 178 IOperation key = ((KeyValuePair<IOperation, OperationJob>)task.AsyncState).Key;179 int key = ((KeyValuePair<int, OperationJob>)task.AsyncState).Key; 179 180 JobDto jobDto = task.Result; 180 181 lock (locker) { 181 182 uploadCount++; 182 jobI ds.Add(jobDto.Id, key);183 jobIndices.Add(jobDto.Id, key); 183 184 remainingJobIds.Add(jobDto.Id); 184 jobNumbers.Add(jobDto.Id, uploadCount);185 }186 LogMessage(string.Format("Submitted job #{0}", jobNumbers[jobDto.Id], jobDto.Id));185 } 186 jobs[key] = null; // relax memory 187 LogMessage(string.Format("Submitted job #{0}", key + 1, jobDto.Id)); 187 188 uploadTasks.Remove(task); 188 189 } … … 203 204 finishedCount += jobsFinished.Count(); 204 205 var totalExecutionTime = TimeSpan.FromMilliseconds(results.Select(j => j.ExecutionTime).Union(executionTimes).Select(e => e.TotalMilliseconds).Sum()); 205 LogMessage(string.Format("Results polled. Jobs finished: {0}/{1}, TotalExecutionTime: {2}", finishedCount, job Dict.Count, totalExecutionTime));206 LogMessage(string.Format("Results polled. Jobs finished: {0}/{1}, TotalExecutionTime: {2}", finishedCount, jobs.Length, totalExecutionTime)); 206 207 foreach (var result in jobsFinished) { 207 208 if (result.State == JobState.Finished) { 208 209 downloadTasks.Add(Task.Factory.StartNew<OperationJob>((jobIdObj) => { 209 return DownloadJob(job Numbers, jobIdObj, cancellationToken);210 return DownloadJob(jobIndices, jobIdObj, cancellationToken); 210 211 }, result.Id, cancellationToken)); 211 212 } else if (result.State == JobState.Aborted) { 212 LogMessage(string.Format("Job #{0} aborted (id: {1})", job Numbers[result.Id], result.Id));213 LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id)); 213 214 } else if (result.State == JobState.Failed) { 214 LogMessage(string.Format("Job {0} failed (id: {1}): {2}", job Numbers[result.Id], result.Id, result.Exception));215 LogMessage(string.Format("Job {0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.Exception)); 215 216 } 216 217 remainingJobIds.Remove(result.Id); … … 231 232 throw task.Exception; 232 233 } 233 jobDict[jobIds[(Guid)task.AsyncState]] = task.Result;234 scopes[jobIndices[(Guid)task.AsyncState]] = ((IAtomicOperation)task.Result.Operation).Scope; 234 235 downloadTasks.Remove(task); 235 236 } … … 238 239 // delete jobs 239 240 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 240 foreach (Guid jobId in jobI ds.Keys) {241 foreach (Guid jobId in jobIndices.Keys) { 241 242 service.Obj.DeleteJob(jobId); 242 243 } 243 244 } 244 245 245 LogMessage(string.Format("Operations on the hive finished.", jobDict.Count)); 246 LogMessage(string.Format("Operations on the hive finished.", jobs.Length)); 247 return scopes; 246 248 } 247 249 catch (Exception e) { … … 251 253 } 252 254 255 private static object locker = new object(); 253 256 private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj, CancellationToken cancellationToken) { 254 var keyValuePair = (KeyValuePair< IOperation, OperationJob>)keyValuePairObj;257 var keyValuePair = (KeyValuePair<int, OperationJob>)keyValuePairObj; 255 258 var groups = ResourceIds.Split(';'); 256 259 maxSerializedJobsInMemory.WaitOne(); … … 267 270 } 268 271 } 269 maxConcurrentSerializations.WaitOne(); 270 serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value); 271 maxConcurrentSerializations.Release(); 272 // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems 273 lock (locker) { 274 keyValuePair.Value.Operation = (IOperation)keyValuePair.Value.Operation.Clone(); 275 if (keyValuePair.Value.Operation is IAtomicOperation) 276 ((IAtomicOperation)keyValuePair.Value.Operation).Scope.ClearParentScopes(); 277 serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value); 278 } 272 279 serializedJob.JobInfo = new JobDto(); 273 280 serializedJob.JobInfo.State = JobState.Offline; … … 293 300 } 294 301 295 private OperationJob DownloadJob(IDictionary<Guid, int> job Numbers, object jobIdObj, CancellationToken cancellationToken) {302 private OperationJob DownloadJob(IDictionary<Guid, int> jobIndices, object jobIdObj, CancellationToken cancellationToken) { 296 303 Guid jobId = (Guid)jobIdObj; 297 304 SerializedJob serializedJob = null; … … 313 320 serializedJob = null; 314 321 maxSerializedJobsInMemory.Release(); 315 LogMessage(string.Format("Downloaded job #{0}", job Numbers[jobId], jobId));322 LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId)); 316 323 return operationJob; 317 324 } … … 335 342 } 336 343 } 344 345 public static class ScopeExtensions { 346 public static void ClearParentScopes(this IScope scope) { 347 scope.ClearParentScopes(null); 348 } 349 350 public static void ClearParentScopes(this IScope scope, IScope childScope) { 351 if (childScope != null) { 352 scope.SubScopes.Clear(); 353 scope.SubScopes.Add(childScope); 354 } 355 if (scope.Parent != null) 356 scope.Parent.ClearParentScopes(scope); 357 } 358 } 337 359 }
Note: See TracChangeset
for help on using the changeset viewer.