using System; using System.Collections.Generic; using System.Linq; using System.Text; using HeuristicLab.Hive.ExperimentManager.Jobs; using System.Threading; using System.Threading.Tasks; using HeuristicLab.Hive.Contracts.BusinessObjects; using HeuristicLab.Hive.Contracts.Interfaces; using HeuristicLab.Clients.Common; namespace HeuristicLab.Hive.ExperimentManager { public class HiveJobDownloader { private IEnumerable jobIds; private List> tasks; private bool abort = false; public bool IsFinished { get { return tasks.TrueForAll(t => t.Status == TaskStatus.RanToCompletion || t.Status == TaskStatus.Faulted || t.Status == TaskStatus.Canceled); } } public int FinishedCount { get { var faulted = tasks.Where(t => t.Status == TaskStatus.Faulted); if (faulted.Count() > 0) { abort = true; throw faulted.First().Exception; } return tasks.Count(t => t.Status == TaskStatus.RanToCompletion || t.Status == TaskStatus.Faulted || t.Status == TaskStatus.Canceled); } } public IDictionary Results { get { var results = new Dictionary(); foreach (var t in tasks) { if (t.Status == TaskStatus.Faulted) { throw t.Exception; } if(t.Result != null) results.Add(t.Result.JobDto.Id, t.Result); } return results; } } public HiveJobDownloader(IEnumerable jobIds) { this.jobIds = jobIds; } public void StartAsync() { abort = false; tasks = new List>(); TaskScheduler.UnobservedTaskException += new EventHandler(TaskScheduler_UnobservedTaskException); foreach (Guid jobId in jobIds) { tasks.Add(Task.Factory.StartNew( (x) => DownloadJob(x), jobId) .ContinueWith((x) => DeserializeJob(x.Result))); } } private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) { e.SetObserved(); // evoid crash of process because task crashes. first exception found is handled in Results property } // use semaphore to ensure only few concurrenct connections and few SerializedJob objects in memory private Semaphore downloadSemaphore = new Semaphore(2, 2); private Semaphore deserializeSemaphore = new Semaphore(2, 2); protected SerializedJob DownloadJob(object jobId) { downloadSemaphore.WaitOne(); deserializeSemaphore.WaitOne(); SerializedJob result; try { if (abort) return null; using (Disposable service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) { result = service.Obj.GetLastSerializedResult((Guid)jobId).Obj; } } finally { downloadSemaphore.Release(); } return result; } protected HiveJob DeserializeJob(SerializedJob serializedJob) { try { if (abort || serializedJob == null) return null; HiveJob job = new HiveJob(serializedJob, false); serializedJob.SerializedJobData = null; // reduce memory consumption. job.Job.Prepare(); // reduce memory consumption. job.JobDto = serializedJob.JobInfo; return job; } finally { deserializeSemaphore.Release(); } } } }