Changeset 6168 for branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/ExperimentManager/HiveJobDownloader.cs
- Timestamp:
- 05/09/11 14:12:10 (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.Clients.Hive/3.4/ExperimentManager/HiveJobDownloader.cs
r6033 r6168 1 using System; 1 #region License Information 2 /* HeuristicLab 3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL) 4 * 5 * This file is part of HeuristicLab. 6 * 7 * HeuristicLab is free software: you can redistribute it and/or modify 8 * it under the terms of the GNU General Public License as published by 9 * the Free Software Foundation, either version 3 of the License, or 10 * (at your option) any later version. 11 * 12 * HeuristicLab is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 * GNU General Public License for more details. 16 * 17 * You should have received a copy of the GNU General Public License 18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>. 19 */ 20 #endregion 21 22 using System; 2 23 using System.Collections.Generic; 3 24 using System.Linq; 4 using System.Threading;5 using System.Threading.Tasks;6 25 using HeuristicLab.Clients.Hive.ExperimentManager; 7 26 using HeuristicLab.Clients.Hive.Jobs; … … 11 30 public class HiveJobDownloader { 12 31 private IEnumerable<Guid> jobIds; 13 private List<Task<HiveJob>> tasks;14 private bool abort = false;32 private JobDownloader<ItemJob> jobDownloader; 33 private IDictionary<Guid, HiveJob> results; 15 34 16 35 public bool IsFinished { 17 36 get { 18 return tasks.TrueForAll(t => t.Status == TaskStatus.RanToCompletion || 19 t.Status == TaskStatus.Faulted || 20 t.Status == TaskStatus.Canceled); 37 //return tasks.TrueForAll(t => t.Status == TaskStatus.RanToCompletion || 38 // t.Status == TaskStatus.Faulted || 39 // t.Status == TaskStatus.Canceled); 40 return results.Count == jobIds.Count(); 21 41 } 22 42 } … … 24 44 public int FinishedCount { 25 45 get { 26 var faulted = tasks.Where(t => t.Status == TaskStatus.Faulted); 27 if (faulted.Count() > 0) { 28 abort = true; 29 throw faulted.First().Exception; 30 } 31 return tasks.Count(t => t.Status == TaskStatus.RanToCompletion || 32 t.Status == TaskStatus.Faulted || 33 t.Status == TaskStatus.Canceled); 46 //var faulted = tasks.Where(t => t.Status == TaskStatus.Faulted); 47 //if (faulted.Count() > 0) { 48 // abort = true; 49 // throw faulted.First().Exception; 50 //} 51 //return tasks.Count(t => t.Status == TaskStatus.RanToCompletion || 52 // t.Status == TaskStatus.Faulted || 53 // t.Status == TaskStatus.Canceled); 54 return results.Count; 34 55 } 35 56 } … … 37 58 public IDictionary<Guid, HiveJob> Results { 38 59 get { 39 var results = new Dictionary<Guid, HiveJob>();40 foreach (var t in tasks) {41 if (t.Status == TaskStatus.Faulted) {42 throw t.Exception;43 }44 if (t.Result != null)45 results.Add(t.Result.Job.Id, t.Result);46 }60 //var results = new Dictionary<Guid, HiveJob>(); 61 //foreach (var t in tasks) { 62 // if (t.Status == TaskStatus.Faulted) { 63 // throw t.Exception; 64 // } 65 // if (t.Result != null) 66 // results.Add(t.Result.Job.Id, t.Result); 67 //} 47 68 return results; 48 69 } … … 51 72 public HiveJobDownloader(IEnumerable<Guid> jobIds) { 52 73 this.jobIds = jobIds; 74 this.jobDownloader = new JobDownloader<ItemJob>(2, 2); 75 this.results = new Dictionary<Guid, HiveJob>(); 53 76 } 54 77 55 78 public void StartAsync() { 56 abort = false;57 tasks = new List<Task<HiveJob>>();58 TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);59 79 foreach (Guid jobId in jobIds) { 60 tasks.Add(Task<JobData>.Factory.StartNew( 61 (x) => DownloadJob(x), jobId) 62 .ContinueWith((x) => DeserializeJob(x.Result))); 80 jobDownloader.DownloadJob(jobId, 81 (id, itemJob, exception) => { 82 if (exception != null) { 83 throw new JobDownloaderException("Downloading job failed", exception); 84 } 85 Job job = ServiceLocator.Instance.CallHiveService(s => s.GetJob(id)); 86 if (job != null && itemJob != null) { 87 HiveJob hiveJob; 88 if (itemJob is OptimizerJob) { 89 hiveJob = new OptimizerHiveJob((OptimizerJob)itemJob); 90 } else { 91 hiveJob = new HiveJob(itemJob, true); 92 } 93 hiveJob.Job = job; 94 this.results.Add(id, hiveJob); 95 } 96 }); 63 97 } 98 99 //tasks = new List<Task<HiveJob>>(); 100 //TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException); 101 //foreach (Guid jobId in jobIds) { 102 // tasks.Add(Task<JobData>.Factory.StartNew( 103 // (x) => DownloadJob(x), jobId) 104 // .ContinueWith((x) => DeserializeJob(x.Result))); 105 //} 64 106 } 65 107 66 private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {67 e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property68 }108 //private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) { 109 // e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property 110 //} 69 111 70 // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory71 private Semaphore downloadSemaphore = new Semaphore(2, 2);72 private Semaphore deserializeSemaphore = new Semaphore(2, 2);73 protected JobData DownloadJob(object jobId) {74 downloadSemaphore.WaitOne();75 deserializeSemaphore.WaitOne();76 JobData result;77 try {78 if (abort) return null;79 result = ServiceLocator.Instance.CallHiveService(s => s.GetJobData((Guid)jobId));80 }81 finally {82 downloadSemaphore.Release();83 }84 return result;85 }112 //// use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory 113 //private Semaphore downloadSemaphore = new Semaphore(2, 2); 114 //private Semaphore deserializeSemaphore = new Semaphore(2, 2); 115 //protected JobData DownloadJob(object jobId) { 116 // downloadSemaphore.WaitOne(); 117 // deserializeSemaphore.WaitOne(); 118 // JobData result; 119 // try { 120 // if (abort) return null; 121 // result = ServiceLocator.Instance.CallHiveService(s => s.GetJobData((Guid)jobId)); 122 // } 123 // finally { 124 // downloadSemaphore.Release(); 125 // } 126 // return result; 127 //} 86 128 87 protected HiveJob DeserializeJob(JobData jobData) {88 try {89 Job job = ServiceLocator.Instance.CallHiveService(s => s.GetJob(jobData.JobId));90 if (abort || job == null || jobData == null) return null;129 //protected HiveJob DeserializeJob(JobData jobData) { 130 // try { 131 // Job job = ServiceLocator.Instance.CallHiveService(s => s.GetJob(jobData.JobId)); 132 // if (abort || job == null || jobData == null) return null; 91 133 92 HiveJob hiveJob;93 var itemJob = PersistenceUtil.Deserialize<ItemJob>(jobData.Data);94 if (itemJob is OptimizerJob) {95 hiveJob = new OptimizerHiveJob((OptimizerJob)itemJob);96 } else {97 hiveJob = new HiveJob(itemJob, true);98 }99 jobData.Data = null; // reduce memory consumption.100 hiveJob.Job = job;101 return hiveJob;102 }103 finally {104 deserializeSemaphore.Release();105 }106 }134 // HiveJob hiveJob; 135 // var itemJob = PersistenceUtil.Deserialize<ItemJob>(jobData.Data); 136 // if (itemJob is OptimizerJob) { 137 // hiveJob = new OptimizerHiveJob((OptimizerJob)itemJob); 138 // } else { 139 // hiveJob = new HiveJob(itemJob, true); 140 // } 141 // jobData.Data = null; // reduce memory consumption. 142 // hiveJob.Job = job; 143 // return hiveJob; 144 // } 145 // finally { 146 // deserializeSemaphore.Release(); 147 // } 148 //} 107 149 } 108 150 }
Note: See TracChangeset
for help on using the changeset viewer.