Changeset 5227 for branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs
- Timestamp:
- 01/06/11 13:17:56 (14 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs
r5213 r5227 98 98 99 99 if (operation.Operator.Breakpoint) { 100 Log .LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName));100 LogMessage(string.Format("Breakpoint: {0}", operation.Operator.Name != string.Empty ? operation.Operator.Name : operation.Operator.ItemName)); 101 101 Pause(); 102 102 } … … 136 136 /// <param name="jobDict"></param> 137 137 private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict) { 138 Log.LogMessage(string.Format("Executing {0} operations on the hive.", jobDict.Count)); 138 LogMessage(string.Format("Executing {0} operations on the hive.", jobDict.Count)); 139 Semaphore concurrentUploads = new Semaphore(6, 6); 139 140 try { 140 141 IDictionary<Guid, IOperation> jobIds = new Dictionary<Guid, IOperation>(); 142 List<Guid> remainingJobIds = new List<Guid>(); 143 IDictionary<Guid, int> jobNumbers = new Dictionary<Guid, int>(); // for better readability of log 141 144 JobResultList results; 142 143 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 144 List<JobDto> jobs = new List<JobDto>(); 145 foreach (var kvp in jobDict) { 146 // shuffle random variable to avoid the same random sequence in each operation 147 IRandom random = FindRandomParameter(kvp.Key as IExecutionContext); 148 if (random != null) 149 random.Reset(random.Next()); 150 145 var pluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList(); 146 147 List<JobDto> jobs = new List<JobDto>(); 148 foreach (var kvp in jobDict) { 149 150 // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable) 151 IRandom random = FindRandomParameter(kvp.Key as IExecutionContext); 152 if (random != null) 153 random.Reset(random.Next()); 154 155 Task.Factory.StartNew((operationJob) => { 151 156 var groups = ResourceIds.Split(';'); 152 157 SerializedJob serializedJob = new SerializedJob(); 153 serializedJob.SerializedJobData = SerializedJob.Serialize( kvp.Value);158 serializedJob.SerializedJobData = SerializedJob.Serialize(operationJob); 154 159 serializedJob.JobInfo = new JobDto(); 155 160 serializedJob.JobInfo.State = JobState.Offline; 156 161 serializedJob.JobInfo.CoresNeeded = 1; 157 serializedJob.JobInfo.PluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList(); 158 ResponseObject<JobDto> response = service.Obj.AddJobWithGroupStrings(serializedJob, groups); 162 serializedJob.JobInfo.PluginsNeeded = pluginsNeeded; 163 ResponseObject<JobDto> response; 164 concurrentUploads.WaitOne(); 165 using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { 166 response = service.Obj.AddJobWithGroupStrings(serializedJob, groups); 167 } 168 concurrentUploads.Release(); 159 169 jobs.Add(response.Obj); 160 170 jobIds.Add(response.Obj.Id, kvp.Key); 161 } 162 results = service.Obj.GetJobResults(jobIds.Keys).Obj; 163 } 164 165 while (!results.All( 166 x => x.State == JobState.Finished || 167 x.State == JobState.Failed || 168 x.State == JobState.Aborted)) { 171 lock (remainingJobIds) { 172 remainingJobIds.Add(response.Obj.Id); 173 jobNumbers.Add(response.Obj.Id, remainingJobIds.Count); 174 } 175 LogMessage(string.Format("Submitted job #{0} (id: {1})", jobNumbers[response.Obj.Id], response.Obj.Id)); 176 }, kvp.Value); 177 } 178 179 while (remainingJobIds.Count != jobDict.Count) { 180 Thread.Sleep(1000); 181 } 182 183 LogMessage("Waiting for results..."); 184 int jobsFinishedCount = 0; 185 Semaphore concurrentDownloads = new Semaphore(4, 4); 186 while (remainingJobIds.Count > 0) { 169 187 Thread.Sleep(5000); 170 188 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 171 results = service.Obj.GetJobResults(jobIds.Keys).Obj; 172 } 173 } 174 175 // all finished 176 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 177 foreach (Guid jobId in jobIds.Keys) { 178 SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj; 179 OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData); 180 jobDict[jobIds[jobId]] = operationJob; 181 } 182 } 183 189 results = service.Obj.GetJobResults(remainingJobIds).Obj; 190 } 191 foreach (var result in results) { 192 if (result.State == JobState.Finished) { 193 Task.Factory.StartNew((jobIdObj) => { 194 Guid jobId = (Guid)jobIdObj; 195 SerializedJob serializedJob; 196 concurrentDownloads.WaitOne(); 197 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 198 serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj; 199 } 200 concurrentDownloads.Release(); 201 OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData); 202 jobDict[jobIds[jobId]] = operationJob; 203 LogMessage(string.Format("Downloaded job #{0} (id: {1})", jobNumbers[jobId], jobId)); 204 jobsFinishedCount++; 205 }, result.Id); 206 207 remainingJobIds.Remove(result.Id); 208 } else if (result.State == JobState.Aborted) { 209 LogMessage(string.Format("Job #{0} aborted (id: {1})", jobNumbers[result.Id], result.Id)); 210 remainingJobIds.Remove(result.Id); 211 } else if (result.State == JobState.Failed) { 212 LogMessage(string.Format("Job {0} failed (id: {1}): {2}", jobNumbers[result.Id], result.Id, result.Exception)); 213 remainingJobIds.Remove(result.Id); 214 } 215 } 216 } 217 218 // wait for all tasks to finish downloading and deserializing 219 while (jobsFinishedCount != jobDict.Count) { 220 Thread.Sleep(1000); 221 } 222 223 LogMessage("All jobs finished. Deleting jobs on hive."); 184 224 // delete jobs 185 225 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { … … 189 229 } 190 230 191 Log .LogMessage(string.Format("Operations on the hive finished.", jobDict.Count));231 LogMessage(string.Format("Operations on the hive finished.", jobDict.Count)); 192 232 } 193 233 catch (Exception e) { 194 Log .LogException(e);234 LogException(e); 195 235 throw e; 236 } 237 } 238 239 private void DownloadJob(Guid jobId, IDictionary<IOperation, OperationJob> jobDict, IDictionary<Guid, IOperation> jobIds) { 240 SerializedJob serializedJob; 241 using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) { 242 serializedJob = service.Obj.GetLastSerializedResult(jobId).Obj; 243 } 244 OperationJob operationJob = SerializedJob.Deserialize<OperationJob>(serializedJob.SerializedJobData); 245 jobDict[jobIds[jobId]] = operationJob; 246 LogMessage(string.Format("Downloaded job (id: {0})", jobId)); 247 } 248 249 /// <summary> 250 /// Threadsafe message logging 251 /// </summary> 252 private void LogMessage(string message) { 253 lock (Log) { 254 Log.LogMessage(message); 255 } 256 } 257 258 /// <summary> 259 /// Threadsafe exception logging 260 /// </summary> 261 private void LogException(Exception exception) { 262 lock (Log) { 263 Log.LogException(exception); 196 264 } 197 265 }
Note: See TracChangeset
for help on using the changeset viewer.