Changeset 7115


Ignore:
Timestamp:
12/03/11 01:01:47 (10 years ago)
Author:
ascheibe
Message:

#1672

  • speed up download of tasks by avoiding unnecessary service calls
  • display download progress correctly
Location:
trunk/sources/HeuristicLab.Clients.Hive/3.3
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Clients.Hive/3.3/ConcurrentTaskDownloader.cs

    r6976 r7115  
    4242    }
    4343
    44     public void DownloadTask(Task job, Action<Task, T> onFinishedAction) {
    45       Task<T> task = Task<TaskData>.Factory.StartNew((x) => DownloadTask(x), job.Id)
    46                                      .ContinueWith((x) => DeserializeTask(x.Result));
    47       task.ContinueWith((x) => OnTaskFinished(job, x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
    48       task.ContinueWith((x) => OnTaskFailed(job, x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted);
     44    public void DownloadTaskData(Task t, Action<Task, T> onFinishedAction) {
     45      Task<Tuple<Task, T>> task = Task<Tuple<Task, TaskData>>.Factory.StartNew(DownloadTaskData, t)
     46                                     .ContinueWith((y) => DeserializeTask(y.Result));
     47
     48      task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
     49      task.ContinueWith((x) => OnTaskFailed(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted);
    4950    }
    5051
    51     private void OnTaskFinished(Task job, Task<T> task, Action<Task, T> onFinishedAction) {
    52       onFinishedAction(job, task.Result);
     52    public void DownloadTaskDataAndTask(Guid taskId, Action<Task, T> onFinishedAction) {
     53      Task<Tuple<Task, T>> task = Task<Task>.Factory.StartNew(DownloadTask, taskId)
     54                                     .ContinueWith((x) => DownloadTaskData(x.Result))
     55                                     .ContinueWith((y) => DeserializeTask(y.Result));
     56
     57      task.ContinueWith((x) => OnTaskFinished(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
     58      task.ContinueWith((x) => OnTaskFailed(x, onFinishedAction), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted);
    5359    }
    54     private void OnTaskFailed(Task job, Task<T> task, Action<Task, T> onFinishedAction) {
     60
     61    private void OnTaskFinished(Task<Tuple<Task, T>> task, Action<Task, T> onFinishedAction) {
     62      onFinishedAction(task.Result.Item1, task.Result.Item2);
     63    }
     64    private void OnTaskFailed(Task<Tuple<Task, T>> task, Action<Task, T> onFinishedAction) {
    5565      task.Exception.Flatten().Handle((e) => { return true; });
    5666      OnExceptionOccured(task.Exception.Flatten());
    57       onFinishedAction(job, null);
     67      onFinishedAction(task.Result.Item1, null);
    5868    }
    5969
    60     protected TaskData DownloadTask(object taskId) {
     70    private Task DownloadTask(object taskId) {
     71      return ServiceLocator.Instance.CallHiveService(s => s.GetTask((Guid)taskId));
     72    }
     73
     74    protected Tuple<Task, TaskData> DownloadTaskData(object taskId) {
     75      return DownloadTaskData((Task)taskId);
     76    }
     77
     78    protected Tuple<Task, TaskData> DownloadTaskData(Task task) {
    6179      downloadSemaphore.WaitOne();
    6280      deserializeSemaphore.WaitOne();
     
    6482      try {
    6583        if (abort) return null;
    66         result = ServiceLocator.Instance.CallHiveService(s => s.GetTaskData((Guid)taskId));
    67       }
    68       finally {
     84        result = ServiceLocator.Instance.CallHiveService(s => s.GetTaskData(task.Id));
     85      } finally {
    6986        downloadSemaphore.Release();
    7087      }
    71       return result;
     88      return new Tuple<Task, TaskData>(task, result);
    7289    }
    7390
    74     protected T DeserializeTask(TaskData taskData) {
     91    protected Tuple<Task, T> DeserializeTask(Tuple<Task, TaskData> taskData) {
    7592      try {
    76         if (abort || taskData == null) return null;
    77         Task task = ServiceLocator.Instance.CallHiveService(s => s.GetTask(taskData.TaskId));
    78         if (task == null) return null;
    79         var deserializedJob = PersistenceUtil.Deserialize<T>(taskData.Data);
    80         taskData.Data = null; // reduce memory consumption.
    81         return deserializedJob;
    82       }
    83       finally {
     93        if (abort || taskData.Item2 == null || taskData.Item1 == null) return null;
     94        var deserializedJob = PersistenceUtil.Deserialize<T>(taskData.Item2.Data);
     95        taskData.Item2.Data = null; // reduce memory consumption.
     96        return new Tuple<Task, T>(taskData.Item1, deserializedJob);
     97      } finally {
    8498        deserializeSemaphore.Release();
    8599      }
  • trunk/sources/HeuristicLab.Clients.Hive/3.3/RefreshableJob.cs

    r7059 r7115  
    277277            log.LogMessage(string.Format("Downloading task {0}", lightweightTask.Id));
    278278            hiveTask.IsDownloading = true;
    279             jobDownloader.DownloadTask(hiveTask.Task, (localJob, itemJob) => {
     279            jobDownloader.DownloadTaskData(hiveTask.Task, (localJob, itemJob) => {
    280280              log.LogMessage(string.Format("Finished downloading task {0}", localJob.Id));
    281281              HiveTask localHiveTask = GetHiveJobById(localJob.Id);
  • trunk/sources/HeuristicLab.Clients.Hive/3.3/TaskDownloader.cs

    r7020 r7115  
    2525using HeuristicLab.Clients.Hive.Jobs;
    2626using HeuristicLab.Common;
     27using System.Threading;
    2728
    2829namespace HeuristicLab.Clients.Hive {
     
    3334    private bool exceptionOccured = false;
    3435    private Exception currentException;
     36    private ReaderWriterLockSlim resultsLock = new ReaderWriterLockSlim();
    3537
    3638    public bool IsFinished {
    3739      get {
    38         return results.Count == taskIds.Count();
     40          try {       
     41              resultsLock.EnterReadLock();
     42              return results.Count == taskIds.Count();
     43          } finally { resultsLock.ExitReadLock(); }
    3944      }
    4045    }
     
    5459    public int FinishedCount {
    5560      get {
    56         return results.Count;
     61            try {
     62              resultsLock.EnterReadLock();
     63              return results.Count;
     64             } finally { resultsLock.ExitReadLock(); }
    5765      }
    5866    }
     
    6068    public IDictionary<Guid, HiveTask> Results {
    6169      get {
    62         return results;
     70            try {
     71              resultsLock.EnterReadLock();
     72              return results;
     73            } finally { resultsLock.ExitReadLock(); }
    6374      }
    6475    }
     
    7384    public void StartAsync() {
    7485      foreach (Guid taskId in taskIds) {
    75         Task task = ServiceLocator.Instance.CallHiveService(s => s.GetTask(taskId));
    76 
    77         taskDownloader.DownloadTask(task,
     86        taskDownloader.DownloadTaskDataAndTask(taskId,
    7887          (localJob, itemJob) => {
    7988            if (localJob != null && itemJob != null) {
     
    8594              }
    8695              hiveTask.Task = localJob;
    87               this.results.Add(localJob.Id, hiveTask);
     96              try {
     97                resultsLock.EnterWriteLock();
     98                this.results.Add(localJob.Id, hiveTask);
     99              } finally { resultsLock.ExitWriteLock(); }
    88100            }
    89101          });
Note: See TracChangeset for help on using the changeset viewer.