Changeset 5228
- Timestamp:
- 01/06/11 18:34:38 (14 years ago)
- Location:
- branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3
- Files:
-
- 1 added
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HeuristicLab.HiveEngine-3.3.csproj
r5179 r5228 88 88 </ItemGroup> 89 89 <ItemGroup> 90 <Compile Include="HiveEngineException.cs" /> 90 91 <Compile Include="ScopeMergeException.cs" /> 91 92 <Compile Include="Views\HiveEngineView.cs"> -
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs
r5227 r5228 23 23 [Item("Hive Engine", "Engine for parallel execution on the hive. You need enable `Parallel` for at least one operator in your operator graph to have all childoperations parallelized. Also those childoperations must not have sideeffects on a higher scope.")] 24 24 public class HiveEngine : Engine { 25 private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections 26 private Semaphore maxConcurrentSerializations = new Semaphore(1, 1); // allow ony ONE concurrent serialization, because the operations share the same ParentScopes and serializing the same objects concurrently causes problems 27 private Semaphore maxSerializedJobsInMemory = new Semaphore(2, 2); // avoid memory problems 25 28 private CancellationToken cancellationToken; 26 29 … … 76 79 ReIntegrateScope(kvp.Value.Operation as IAtomicOperation, kvp.Key as IAtomicOperation); 77 80 } else if (kvp.Key is OperationCollection) { 78 OperationCollection ocoll = (OperationCollection)kvp.Key; 79 for (int i = ocoll.Count - 1; i >= 0; i--) 80 if (ocoll[i] != null) executionStack.Push(ocoll[i]); 81 // todo ?? 81 82 } 82 83 } … … 137 138 private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict) { 138 139 LogMessage(string.Format("Executing {0} operations on the hive.", jobDict.Count)); 139 Semaphore concurrentUploads = new Semaphore(6, 6); 140 141 object locker = new object(); 140 142 try { 141 143 IDictionary<Guid, IOperation> jobIds = new Dictionary<Guid, IOperation>(); … … 144 146 JobResultList results; 145 147 var pluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList(); 146 147 List<JobDto> jobs = new List<JobDto>(); 148 int finishedCount = 0; 149 150 // create upload-tasks 151 var uploadTasks = new List<Task<JobDto>>(); 148 152 foreach (var kvp in jobDict) { 149 150 153 // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable) 151 154 IRandom random = FindRandomParameter(kvp.Key as IExecutionContext); … … 153 156 random.Reset(random.Next()); 154 157 155 Task.Factory.StartNew((operationJob) => { 156 var groups = ResourceIds.Split(';'); 157 SerializedJob serializedJob = new SerializedJob(); 158 serializedJob.SerializedJobData = SerializedJob.Serialize(operationJob); 159 serializedJob.JobInfo = new JobDto(); 160 serializedJob.JobInfo.State = JobState.Offline; 161 serializedJob.JobInfo.CoresNeeded = 1; 162 serializedJob.JobInfo.PluginsNeeded = pluginsNeeded; 163 ResponseObject<JobDto> response; 164 concurrentUploads.WaitOne(); 165 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 166 response = service.Obj.AddJobWithGroupStrings(serializedJob, groups); 158 uploadTasks.Add(Task.Factory.StartNew<JobDto>((keyValuePairObj) => { 159 return UploadJob(pluginsNeeded, keyValuePairObj); 160 }, kvp)); 161 } 162 163 Task processUploadedJobsTask = Task.Factory.StartNew(() => { 164 // process finished upload-tasks 165 int uploadTasksCount = uploadTasks.Count; 166 for (int i = 0; i < uploadTasksCount; i++) { 167 var uploadTasksArray = uploadTasks.ToArray(); 168 var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)]; 169 if (task.Status == TaskStatus.Faulted) { 170 LogException(task.Exception); 171 throw task.Exception; 167 172 } 168 concurrentUploads.Release(); 169 jobs.Add(response.Obj); 170 jobIds.Add(response.Obj.Id, kvp.Key); 171 lock (remainingJobIds) { 172 remainingJobIds.Add(response.Obj.Id); 173 jobNumbers.Add(response.Obj.Id, remainingJobIds.Count); 174 } 175 LogMessage(string.Format("Submitted job #{0} (id: {1})", jobNumbers[response.Obj.Id], response.Obj.Id)); 176 }, kvp.Value); 177 } 178 179 while (remainingJobIds.Count != jobDict.Count) { 180 Thread.Sleep(1000); 181 } 182 183 LogMessage("Waiting for results..."); 184 int jobsFinishedCount = 0; 185 Semaphore concurrentDownloads = new Semaphore(4, 4); 186 while (remainingJobIds.Count > 0) { 187 Thread.Sleep(5000); 173 174 IOperation key = ((KeyValuePair<IOperation, OperationJob>)task.AsyncState).Key; 175 JobDto jobDto = task.Result; 176 177 jobIds.Add(jobDto.Id, key); 178 remainingJobIds.Add(jobDto.Id); 179 jobNumbers.Add(jobDto.Id, remainingJobIds.Count); 180 181 LogMessage(string.Format("Submitted job #{0}", jobNumbers[jobDto.Id], jobDto.Id)); 182 uploadTasks.Remove(task); 183 } 184 }); 185 186 // poll job-statuses and create tasks for those which are finished 187 var downloadTasks = new List<Task<OperationJob>>(); 188 var executionTimes = new List<TimeSpan>(); 189 while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) { 190 Thread.Sleep(10000); 188 191 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 189 192 results = service.Obj.GetJobResults(remainingJobIds).Obj; 190 193 } 191 foreach (var result in results) { 194 var jobsFinished = results.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted); 195 finishedCount += jobsFinished.Count(); 196 var totalExecutionTime = TimeSpan.FromMilliseconds(results.Select(j => j.ExecutionTime).Union(executionTimes).Select(e => e.TotalMilliseconds).Sum()); 197 LogMessage(string.Format("Results polled. Jobs finished: {0}/{1}, TotalExecutionTime: {2}", finishedCount, jobDict.Count, totalExecutionTime)); 198 foreach (var result in jobsFinished) { 192 199 if (result.State == JobState.Finished) { 193 Task.Factory.StartNew((jobIdObj) => { 194 Guid jobId = (Guid)jobIdObj; 195 SerializedJob serializedJob; 196 concurrentDownloads.WaitOne(); 197 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 198 serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj; 199 } 200 concurrentDownloads.Release(); 201 OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData); 202 jobDict[jobIds[jobId]] = operationJob; 203 LogMessage(string.Format("Downloaded job #{0} (id: {1})", jobNumbers[jobId], jobId)); 204 jobsFinishedCount++; 205 }, result.Id); 206 207 remainingJobIds.Remove(result.Id); 200 downloadTasks.Add(Task.Factory.StartNew<OperationJob>((jobIdObj) => { 201 return DownloadJob(jobNumbers, jobIdObj); 202 }, result.Id)); 208 203 } else if (result.State == JobState.Aborted) { 209 204 LogMessage(string.Format("Job #{0} aborted (id: {1})", jobNumbers[result.Id], result.Id)); 210 remainingJobIds.Remove(result.Id);211 205 } else if (result.State == JobState.Failed) { 212 206 LogMessage(string.Format("Job {0} failed (id: {1}): {2}", jobNumbers[result.Id], result.Id, result.Exception)); 213 remainingJobIds.Remove(result.Id);214 207 } 215 } 216 } 217 218 // wait for all tasks to finish downloading and deserializing 219 while (jobsFinishedCount != jobDict.Count) { 220 Thread.Sleep(1000); 221 } 222 223 LogMessage("All jobs finished. Deleting jobs on hive."); 208 remainingJobIds.Remove(result.Id); 209 executionTimes.Add(result.ExecutionTime); 210 } 211 } 212 213 // process finished download-tasks 214 int downloadTasksCount = downloadTasks.Count; 215 for (int i = 0; i < downloadTasksCount; i++) { 216 var downloadTasksArray = downloadTasks.ToArray(); 217 var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)]; 218 var jobId = (Guid)task.AsyncState; 219 if (task.Status == TaskStatus.Faulted) { 220 LogException(task.Exception); 221 throw task.Exception; 222 } 223 jobDict[jobIds[(Guid)task.AsyncState]] = task.Result; 224 downloadTasks.Remove(task); 225 } 226 227 LogMessage(string.Format("All jobs finished (TotalExecutionTime: {0}). Deleting jobs on hive.", TimeSpan.FromMilliseconds(executionTimes.Select(e => e.TotalMilliseconds).Sum()))); 224 228 // delete jobs 225 229 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { … … 237 241 } 238 242 239 private void DownloadJob(Guid jobId, IDictionary<IOperation, OperationJob> jobDict, IDictionary<Guid, IOperation> jobIds) { 243 private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj) { 244 var keyValuePair = (KeyValuePair<IOperation, OperationJob>)keyValuePairObj; 245 var groups = ResourceIds.Split(';'); 246 maxSerializedJobsInMemory.WaitOne(); 247 SerializedJob serializedJob = new SerializedJob(); 248 maxConcurrentSerializations.WaitOne(); 249 serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value); 250 maxConcurrentSerializations.Release(); 251 serializedJob.JobInfo = new JobDto(); 252 serializedJob.JobInfo.State = JobState.Offline; 253 serializedJob.JobInfo.CoresNeeded = 1; 254 serializedJob.JobInfo.PluginsNeeded = pluginsNeeded; 255 ResponseObject<JobDto> response; 256 maxConcurrentConnections.WaitOne(); 257 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 258 response = service.Obj.AddJobWithGroupStrings(serializedJob, groups); 259 serializedJob = null; 260 maxSerializedJobsInMemory.Release(); 261 } 262 maxConcurrentConnections.Release(); 263 return response.Obj; 264 } 265 266 private OperationJob DownloadJob(IDictionary<Guid, int> jobNumbers, object jobIdObj) { 267 Guid jobId = (Guid)jobIdObj; 240 268 SerializedJob serializedJob; 241 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 269 maxConcurrentConnections.WaitOne(); 270 maxSerializedJobsInMemory.WaitOne(); 271 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 242 272 serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj; 243 273 } 274 maxConcurrentConnections.Release(); 244 275 OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData); 245 jobDict[jobIds[jobId]] = operationJob; 246 LogMessage(string.Format("Downloaded job (id: {0})", jobId)); 276 serializedJob = null; 277 maxSerializedJobsInMemory.Release(); 278 LogMessage(string.Format("Downloaded job #{0}", jobNumbers[jobId], jobId)); 279 return operationJob; 247 280 } 248 281
Note: See TracChangeset
for help on using the changeset viewer.