- Timestamp:
- 12/03/11 01:01:47 (13 years ago)
- Location:
- trunk/sources/HeuristicLab.Clients.Hive/3.3
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.Clients.Hive/3.3/ConcurrentTaskDownloader.cs
r6976 r7115 42 42 } 43 43 44 public void DownloadTask(Task job, Action<Task, T> onFinishedAction) { 45 Task<T> task = Task<TaskData>.Factory.StartNew((x) => DownloadTask(x), job.Id) 46 .ContinueWith((x) => DeserializeTask(x.Result)); 47 task.ContinueWith((x) => OnTaskFinished(job, x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion); 48 task.ContinueWith((x) => OnTaskFailed(job, x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted); 44 public void DownloadTaskData(Task t, Action<Task, T> onFinishedAction) { 45 Task<Tuple<Task, T>> task = Task<Tuple<Task, TaskData>>.Factory.StartNew(DownloadTaskData, t) 46 .ContinueWith((y) => DeserializeTask(y.Result)); 47 48 task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion); 49 task.ContinueWith((x) => OnTaskFailed(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted); 49 50 } 50 51 51 private void OnTaskFinished(Task job, Task<T> task, Action<Task, T> onFinishedAction) { 52 onFinishedAction(job, task.Result); 52 public void DownloadTaskDataAndTask(Guid taskId, Action<Task, T> onFinishedAction) { 53 Task<Tuple<Task, T>> task = Task<Task>.Factory.StartNew(DownloadTask, taskId) 54 .ContinueWith((x) => DownloadTaskData(x.Result)) 55 .ContinueWith((y) => DeserializeTask(y.Result)); 56 57 task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion); 58 task.ContinueWith((x) => OnTaskFailed(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted); 53 59 } 54 private void OnTaskFailed(Task job, Task<T> task, Action<Task, T> onFinishedAction) { 60 61 private void OnTaskFinished(Task<Tuple<Task, T>> task, Action<Task, T> onFinishedAction) { 62 onFinishedAction(task.Result.Item1, task.Result.Item2); 63 } 64 private void OnTaskFailed(Task<Tuple<Task, T>> task, Action<Task, T> onFinishedAction) { 55 65 task.Exception.Flatten().Handle((e) => { return true; }); 56 66 OnExceptionOccured(task.Exception.Flatten()); 57 onFinishedAction( job, null);67 onFinishedAction(task.Result.Item1, null); 58 68 } 59 69 60 protected TaskData DownloadTask(object taskId) { 70 private Task DownloadTask(object taskId) { 71 return ServiceLocator.Instance.CallHiveService(s => s.GetTask((Guid)taskId)); 72 } 73 74 protected Tuple<Task, TaskData> DownloadTaskData(object taskId) { 75 return DownloadTaskData((Task)taskId); 76 } 77 78 protected Tuple<Task, TaskData> DownloadTaskData(Task task) { 61 79 downloadSemaphore.WaitOne(); 62 80 deserializeSemaphore.WaitOne(); … … 64 82 try { 65 83 if (abort) return null; 66 result = ServiceLocator.Instance.CallHiveService(s => s.GetTaskData((Guid)taskId)); 67 } 68 finally { 84 result = ServiceLocator.Instance.CallHiveService(s => s.GetTaskData(task.Id)); 85 } finally { 69 86 downloadSemaphore.Release(); 70 87 } 71 return result;88 return new Tuple<Task, TaskData>(task, result); 72 89 } 73 90 74 protected T DeserializeTask(TaskDatataskData) {91 protected Tuple<Task, T> DeserializeTask(Tuple<Task, TaskData> taskData) { 75 92 try { 76 if (abort || taskData == null) return null; 77 Task task = ServiceLocator.Instance.CallHiveService(s => s.GetTask(taskData.TaskId)); 78 if (task == null) return null; 79 var deserializedJob = PersistenceUtil.Deserialize<T>(taskData.Data); 80 taskData.Data = null; // reduce memory consumption. 81 return deserializedJob; 82 } 83 finally { 93 if (abort || taskData.Item2 == null || taskData.Item1 == null) return null; 94 var deserializedJob = PersistenceUtil.Deserialize<T>(taskData.Item2.Data); 95 taskData.Item2.Data = null; // reduce memory consumption. 96 return new Tuple<Task, T>(taskData.Item1, deserializedJob); 97 } finally { 84 98 deserializeSemaphore.Release(); 85 99 } -
trunk/sources/HeuristicLab.Clients.Hive/3.3/RefreshableJob.cs
r7059 r7115 277 277 log.LogMessage(string.Format("Downloading task {0}", lightweightTask.Id)); 278 278 hiveTask.IsDownloading = true; 279 jobDownloader.DownloadTask (hiveTask.Task, (localJob, itemJob) => {279 jobDownloader.DownloadTaskData(hiveTask.Task, (localJob, itemJob) => { 280 280 log.LogMessage(string.Format("Finished downloading task {0}", localJob.Id)); 281 281 HiveTask localHiveTask = GetHiveJobById(localJob.Id); -
trunk/sources/HeuristicLab.Clients.Hive/3.3/TaskDownloader.cs
r7020 r7115 25 25 using HeuristicLab.Clients.Hive.Jobs; 26 26 using HeuristicLab.Common; 27 using System.Threading; 27 28 28 29 namespace HeuristicLab.Clients.Hive { … … 33 34 private bool exceptionOccured = false; 34 35 private Exception currentException; 36 private ReaderWriterLockSlim resultsLock = new ReaderWriterLockSlim(); 35 37 36 38 public bool IsFinished { 37 39 get { 38 return results.Count == taskIds.Count(); 40 try { 41 resultsLock.EnterReadLock(); 42 return results.Count == taskIds.Count(); 43 } finally { resultsLock.ExitReadLock(); } 39 44 } 40 45 } … … 54 59 public int FinishedCount { 55 60 get { 56 return results.Count; 61 try { 62 resultsLock.EnterReadLock(); 63 return results.Count; 64 } finally { resultsLock.ExitReadLock(); } 57 65 } 58 66 } … … 60 68 public IDictionary<Guid, HiveTask> Results { 61 69 get { 62 return results; 70 try { 71 resultsLock.EnterReadLock(); 72 return results; 73 } finally { resultsLock.ExitReadLock(); } 63 74 } 64 75 } … … 73 84 public void StartAsync() { 74 85 foreach (Guid taskId in taskIds) { 75 Task task = ServiceLocator.Instance.CallHiveService(s => s.GetTask(taskId)); 76 77 taskDownloader.DownloadTask(task, 86 taskDownloader.DownloadTaskDataAndTask(taskId, 78 87 (localJob, itemJob) => { 79 88 if (localJob != null && itemJob != null) { … … 85 94 } 86 95 hiveTask.Task = localJob; 87 this.results.Add(localJob.Id, hiveTask); 96 try { 97 resultsLock.EnterWriteLock(); 98 this.results.Add(localJob.Id, hiveTask); 99 } finally { resultsLock.ExitWriteLock(); } 88 100 } 89 101 });
Note: See TracChangeset
for help on using the changeset viewer.