- Timestamp:
- 01/07/11 17:08:36 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs
r5228 r5232 73 73 } 74 74 75 ExecuteOnHive(jobs );75 ExecuteOnHive(jobs, cancellationToken); 76 76 77 77 foreach (var kvp in jobs) { … … 134 134 /// <summary> 135 135 /// This method blocks until all jobs are finished 136 /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation 136 137 /// </summary> 137 138 /// <param name="jobDict"></param> 138 private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict ) {139 private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict, CancellationToken cancellationToken) { 139 140 LogMessage(string.Format("Executing {0} operations on the hive.", jobDict.Count)); 140 141 … … 147 148 var pluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList(); 148 149 int finishedCount = 0; 150 int uploadCount = 0; 149 151 150 152 // create upload-tasks … … 157 159 158 160 uploadTasks.Add(Task.Factory.StartNew<JobDto>((keyValuePairObj) => { 159 return UploadJob(pluginsNeeded, keyValuePairObj );160 }, kvp ));161 } 162 163 Task processUploadedJobsTask = Task.Factory.StartNew(() => {161 return UploadJob(pluginsNeeded, keyValuePairObj, cancellationToken); 162 }, kvp, cancellationToken)); 163 } 164 165 Task processUploadedJobsTask = new Task(() => { 164 166 // process finished upload-tasks 165 167 int uploadTasksCount = uploadTasks.Count; 166 168 for (int i = 0; i < uploadTasksCount; i++) { 169 cancellationToken.ThrowIfCancellationRequested(); 170 167 171 var uploadTasksArray = uploadTasks.ToArray(); 168 172 var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)]; … … 174 178 IOperation key = ((KeyValuePair<IOperation, OperationJob>)task.AsyncState).Key; 175 179 JobDto jobDto = task.Result; 176 177 jobIds.Add(jobDto.Id, key); 178 remainingJobIds.Add(jobDto.Id); 179 jobNumbers.Add(jobDto.Id, remainingJobIds.Count); 180 180 lock (locker) { 181 uploadCount++; 182 jobIds.Add(jobDto.Id, key); 183 remainingJobIds.Add(jobDto.Id); 184 jobNumbers.Add(jobDto.Id, uploadCount); 185 } 181 186 LogMessage(string.Format("Submitted job #{0}", jobNumbers[jobDto.Id], jobDto.Id)); 182 187 uploadTasks.Remove(task); 183 188 } 184 }); 189 }, cancellationToken, TaskCreationOptions.PreferFairness); 190 processUploadedJobsTask.Start(); 185 191 186 192 // poll job-statuses and create tasks for those which are finished … … 188 194 var executionTimes = new List<TimeSpan>(); 189 195 while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) { 196 cancellationToken.ThrowIfCancellationRequested(); 197 190 198 Thread.Sleep(10000); 191 199 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { … … 199 207 if (result.State == JobState.Finished) { 200 208 downloadTasks.Add(Task.Factory.StartNew<OperationJob>((jobIdObj) => { 201 return DownloadJob(jobNumbers, jobIdObj );202 }, result.Id ));209 return DownloadJob(jobNumbers, jobIdObj, cancellationToken); 210 }, result.Id, cancellationToken)); 203 211 } else if (result.State == JobState.Aborted) { 204 212 LogMessage(string.Format("Job #{0} aborted (id: {1})", jobNumbers[result.Id], result.Id)); … … 214 222 int downloadTasksCount = downloadTasks.Count; 215 223 for (int i = 0; i < downloadTasksCount; i++) { 224 cancellationToken.ThrowIfCancellationRequested(); 225 216 226 var downloadTasksArray = downloadTasks.ToArray(); 217 227 var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)]; … … 241 251 } 242 252 243 private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj ) {253 private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj, CancellationToken cancellationToken) { 244 254 var keyValuePair = (KeyValuePair<IOperation, OperationJob>)keyValuePairObj; 245 255 var groups = ResourceIds.Split(';'); 246 256 maxSerializedJobsInMemory.WaitOne(); 247 SerializedJob serializedJob = new SerializedJob(); 257 SerializedJob serializedJob = null; 258 while (serializedJob == null) { // repeat until success; rare race-conditions occur at serializations (enumeration was changed-exceptions); maybe this is because all the parent-scopes and execution-contexts at some point contain the hiveengine and the Log in here 259 cancellationToken.ThrowIfCancellationRequested(); 260 try { 261 lock (Log) { 262 serializedJob = new SerializedJob(); 263 } 264 } 265 catch (Exception e) { 266 LogException(e); 267 } 268 } 248 269 maxConcurrentSerializations.WaitOne(); 249 270 serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value); … … 253 274 serializedJob.JobInfo.CoresNeeded = 1; 254 275 serializedJob.JobInfo.PluginsNeeded = pluginsNeeded; 255 ResponseObject<JobDto> response ;276 ResponseObject<JobDto> response = null; 256 277 maxConcurrentConnections.WaitOne(); 257 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 258 response = service.Obj.AddJobWithGroupStrings(serializedJob, groups); 259 serializedJob = null; 260 maxSerializedJobsInMemory.Release(); 278 while (response == null) { // repeat until success 279 cancellationToken.ThrowIfCancellationRequested(); 280 try { 281 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 282 response = service.Obj.AddJobWithGroupStrings(serializedJob, groups); 283 serializedJob = null; 284 maxSerializedJobsInMemory.Release(); 285 } 286 } 287 catch (Exception e) { 288 LogException(e); 289 } 261 290 } 262 291 maxConcurrentConnections.Release(); … … 264 293 } 265 294 266 private OperationJob DownloadJob(IDictionary<Guid, int> jobNumbers, object jobIdObj ) {295 private OperationJob DownloadJob(IDictionary<Guid, int> jobNumbers, object jobIdObj, CancellationToken cancellationToken) { 267 296 Guid jobId = (Guid)jobIdObj; 268 SerializedJob serializedJob; 297 SerializedJob serializedJob = null; 298 maxSerializedJobsInMemory.WaitOne(); 269 299 maxConcurrentConnections.WaitOne(); 270 maxSerializedJobsInMemory.WaitOne(); 271 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 272 serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj; 300 while (serializedJob == null) { // repeat until success 301 cancellationToken.ThrowIfCancellationRequested(); 302 try { 303 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 304 serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj; 305 } 306 } 307 catch (Exception e) { 308 LogException(e); 309 } 273 310 } 274 311 maxConcurrentConnections.Release();
Note: See TracChangeset
for help on using the changeset viewer.