- Timestamp:
- 02/18/13 10:34:23 (11 years ago)
- Location:
- trunk/sources
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources
- Property svn:mergeinfo changed
/branches/UnloadJobs (added) merged: 9168-9170,9173-9174,9183-9184,9187-9188,9193,9201-9202
- Property svn:mergeinfo changed
-
trunk/sources/HeuristicLab.Clients.Hive/3.3/ConcurrentTaskDownloader.cs
r7259 r9219 30 30 /// Downloads and deserializes jobs. It avoids too many jobs beeing downloaded or deserialized at the same time to avoid memory problems 31 31 /// </summary> 32 public class ConcurrentTaskDownloader<T> where T : class, ITask {32 public class ConcurrentTaskDownloader<T> : IDisposable where T : class, ITask { 33 33 private bool abort = false; 34 34 // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory … … 39 39 downloadSemaphore = new Semaphore(concurrentDownloads, concurrentDownloads); 40 40 deserializeSemaphore = new Semaphore(concurrentDeserializations, concurrentDeserializations); 41 TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);42 41 } 43 42 … … 69 68 70 69 private Task DownloadTask(object taskId) { 71 return HiveServiceLocator.Instance.CallHiveService(s => s.GetTask((Guid)taskId)); 70 Task t = null; 71 HiveClient.TryAndRepeat(() => { 72 t = HiveServiceLocator.Instance.CallHiveService(s => s.GetTask((Guid)taskId)); 73 }, Settings.Default.MaxRepeatServiceCalls, "Failed to download task."); 74 return t; 72 75 } 73 76 … … 78 81 protected Tuple<Task, TaskData> DownloadTaskData(Task task) { 79 82 downloadSemaphore.WaitOne(); 80 TaskData result ;83 TaskData result = null; 81 84 try { 82 85 if (abort) return null; 83 result = HiveServiceLocator.Instance.CallHiveService(s => s.GetTaskData(task.Id)); 84 } finally { 86 HiveClient.TryAndRepeat(() => { 87 result = HiveServiceLocator.Instance.CallHiveService(s => s.GetTaskData(task.Id)); 88 }, Settings.Default.MaxRepeatServiceCalls, "Failed to download task data."); 89 } 90 finally { 85 91 downloadSemaphore.Release(); 86 92 } … … 95 101 taskData.Item2.Data = null; // reduce memory consumption. 96 102 return new Tuple<Task, T>(taskData.Item1, deserializedJob); 97 } finally { 103 } 104 finally { 98 105 deserializeSemaphore.Release(); 99 106 } 100 }101 102 private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {103 e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property104 OnExceptionOccured(new HiveException("Unobserved Exception in ConcurrentTaskDownloader", e.Exception));105 107 } 106 108 … … 110 112 if (handler != null) handler(this, new EventArgs<Exception>(exception)); 111 113 } 114 115 #region IDisposable Members 116 public void Dispose() { 117 deserializeSemaphore.Dispose(); 118 downloadSemaphore.Dispose(); 119 } 120 #endregion 112 121 } 113 122 }
Note: See TracChangeset
for help on using the changeset viewer.