Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Tools/HeuristicLab.HiveDrain/HeuristicLab.HiveDrain/JobTaskDownloader.cs @ 9694

Last change on this file since 9694 was 9694, checked in by ascheibe, 11 years ago

#2017 added a HL application based on apetrei's Hive Drain to the tools branch

File size: 4.9 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.IO;
4using System.Threading;
5using HeuristicLab.Clients.Hive;
6using HeuristicLab.Clients.Hive.Jobs;
7using HeuristicLab.Common;
8using HeuristicLab.Core;
9
10namespace HiveDrain {
11  /// <summary>
12  /// downloads all finished tasks for a job
13  /// </summary>
14  class JobTaskDownloader {
15    public String RootLocation { get; set; }
16    public Job ParentJob { get; set; }
17    private ILog log;
18
19    private static ConcurrentTaskDownloader<ItemTask> downloader =
20        new ConcurrentTaskDownloader<ItemTask>(HeuristicLabHiveDrainApplication.MaxParallelDownloads, HeuristicLabHiveDrainApplication.MaxParallelDownloads);
21
22    private static int jobCount = 0;
23    private static bool endReached = false;
24    private ManualResetEvent allJobsFinished = new ManualResetEvent(false);
25
26    private Semaphore limitSemaphore = null;
27
28    static JobTaskDownloader() {
29      downloader.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(downloader_ExceptionOccured);
30    }
31
32    static void downloader_ExceptionOccured(object sender, HeuristicLab.Common.EventArgs<Exception> e) {
33      HiveDrainMainWindow.Log.LogMessage(DateTime.Now.ToShortTimeString() + " ### Exception occured: " + e.Value.ToString());
34    }
35
36    /// <summary>
37    /// constructor
38    /// </summary>
39    /// <param name="path">root path for this job</param>
40    /// <param name="parentJob">parent job</param>
41    public JobTaskDownloader(string path, Job parentJob, Semaphore sem, ILog log) {
42      RootLocation = path;
43      ParentJob = parentJob;
44      limitSemaphore = sem;
45      this.log = log;
46    }
47
48    /// <summary>
49    /// start downloading all finished tasks for the parentjob
50    /// </summary>
51    public void Start() {
52      string taskPath;
53
54      IEnumerable<LightweightTask> allTasks;
55      allTasks = HiveServiceLocator.Instance.CallHiveService(s =>
56          s.GetLightweightJobTasksWithoutStateLog(ParentJob.Id));
57
58      foreach (var lightTask in allTasks) {
59        if (lightTask.State == TaskState.Finished) {
60          if (!CheckIfTaskDownloaded(lightTask.Id, out taskPath)) {
61            addDownloaderTask(lightTask.Id, taskPath);
62            log.LogMessage(String.Format("   Getting Id {0}: {1}", lightTask.Id, DateTime.Now.ToShortTimeString()));
63          } else
64            log.LogMessage(String.Format("   {0} => already downloaded", lightTask.Id));
65        } else
66          log.LogMessage(String.Format("   {0} => ignored ({1})", lightTask.Id, lightTask.State.ToString()));
67      }
68      endReached = true;
69      if (jobCount == 0)
70        allJobsFinished.Set();
71
72      allJobsFinished.WaitOne();
73
74      GC.Collect();
75      log.LogMessage(String.Format("All tasks for job {0} finished", ParentJob.Name));
76    }
77
78    /// <summary>
79    /// adds a task with state finished to the downloader
80    /// </summary>
81    /// <param name="taskId"></param>
82    /// <param name="taskPath"></param>
83    private void addDownloaderTask(Guid taskId, string taskPath) {
84      //wait for free slot
85      limitSemaphore.WaitOne();
86
87      Interlocked.Increment(ref jobCount);
88      downloader.DownloadTaskDataAndTask(taskId, (task, itemTask) => {
89
90
91        log.LogMessage(String.Format("\"{0}\" - [{1}]: {2} finished", ParentJob.Name, task.Id, itemTask.Name));
92
93
94        //start serialize job
95        if (itemTask is OptimizerTask) {
96          OptimizerTask optimizerTask = itemTask as OptimizerTask;
97
98          //add task to serializer queue
99          TaskSerializer.Serialize(new SerializerTask() {
100            Content = optimizerTask.Item as IStorableContent,
101            FilePath = taskPath,
102            OnSaved = () => {
103              log.LogMessage(String.Format("\"{0}\" - [{1}]: {2} saved", ParentJob.Name, task.Id, itemTask.Name));
104              limitSemaphore.Release();
105            }
106          });
107        } else {
108          throw new InvalidOperationException(
109              String.Format("Unsupported task type {0}", itemTask.GetType().Name));
110        }
111
112        //this job has finished downloading
113        Interlocked.Decrement(ref jobCount);
114
115        //if this was the last job
116        if (jobCount == 0 && endReached)
117          allJobsFinished.Set();
118      });
119    }
120
121    /// <summary>
122    /// check if there is a task directory which is not empty
123    /// </summary>
124    /// <param name="id"></param>
125    /// <param name="taskPath"></param>
126    /// <returns></returns>
127    private bool CheckIfTaskDownloaded(Guid id, out string taskPath) {
128      DirectoryInfo dirInfo = new DirectoryInfo(RootLocation);
129      if (!dirInfo.Exists) {
130        dirInfo.Create();
131      }
132
133      taskPath = Path.Combine(RootLocation, id.ToString() + ".hl");
134      FileInfo fileInfo = new FileInfo(taskPath);
135
136      if (fileInfo.Exists)
137        return true;
138
139      return false;
140    }
141  }
142}
Note: See TracBrowser for help on using the repository browser.