1 | using System;
|
---|
2 | using System.Collections.Generic;
|
---|
3 | using System.IO;
|
---|
4 | using System.Threading;
|
---|
5 | using HeuristicLab.Clients.Hive;
|
---|
6 | using HeuristicLab.Clients.Hive.Jobs;
|
---|
7 | using HeuristicLab.Common;
|
---|
8 | using HeuristicLab.Core;
|
---|
9 |
|
---|
10 | namespace HiveDrain {
|
---|
11 | /// <summary>
|
---|
12 | /// downloads all finished tasks for a job
|
---|
13 | /// </summary>
|
---|
14 | class JobTaskDownloader {
|
---|
15 | public String RootLocation { get; set; }
|
---|
16 | public Job ParentJob { get; set; }
|
---|
17 | private ILog log;
|
---|
18 |
|
---|
19 | private static ConcurrentTaskDownloader<ItemTask> downloader =
|
---|
20 | new ConcurrentTaskDownloader<ItemTask>(HeuristicLabHiveDrainApplication.MaxParallelDownloads, HeuristicLabHiveDrainApplication.MaxParallelDownloads);
|
---|
21 |
|
---|
22 | private static int jobCount = 0;
|
---|
23 | private static bool endReached = false;
|
---|
24 | private ManualResetEvent allJobsFinished = new ManualResetEvent(false);
|
---|
25 |
|
---|
26 | private Semaphore limitSemaphore = null;
|
---|
27 |
|
---|
28 | static JobTaskDownloader() {
|
---|
29 | downloader.ExceptionOccured += new EventHandler<HeuristicLab.Common.EventArgs<Exception>>(downloader_ExceptionOccured);
|
---|
30 | }
|
---|
31 |
|
---|
32 | static void downloader_ExceptionOccured(object sender, HeuristicLab.Common.EventArgs<Exception> e) {
|
---|
33 | HiveDrainMainWindow.Log.LogMessage(DateTime.Now.ToShortTimeString() + " ### Exception occured: " + e.Value.ToString());
|
---|
34 | }
|
---|
35 |
|
---|
36 | /// <summary>
|
---|
37 | /// constructor
|
---|
38 | /// </summary>
|
---|
39 | /// <param name="path">root path for this job</param>
|
---|
40 | /// <param name="parentJob">parent job</param>
|
---|
41 | public JobTaskDownloader(string path, Job parentJob, Semaphore sem, ILog log) {
|
---|
42 | RootLocation = path;
|
---|
43 | ParentJob = parentJob;
|
---|
44 | limitSemaphore = sem;
|
---|
45 | this.log = log;
|
---|
46 | }
|
---|
47 |
|
---|
48 | /// <summary>
|
---|
49 | /// start downloading all finished tasks for the parentjob
|
---|
50 | /// </summary>
|
---|
51 | public void Start() {
|
---|
52 | string taskPath;
|
---|
53 |
|
---|
54 | IEnumerable<LightweightTask> allTasks;
|
---|
55 | allTasks = HiveServiceLocator.Instance.CallHiveService(s =>
|
---|
56 | s.GetLightweightJobTasksWithoutStateLog(ParentJob.Id));
|
---|
57 |
|
---|
58 | foreach (var lightTask in allTasks) {
|
---|
59 | if (lightTask.State == TaskState.Finished) {
|
---|
60 | if (!CheckIfTaskDownloaded(lightTask.Id, out taskPath)) {
|
---|
61 | addDownloaderTask(lightTask.Id, taskPath);
|
---|
62 | log.LogMessage(String.Format(" Getting Id {0}: {1}", lightTask.Id, DateTime.Now.ToShortTimeString()));
|
---|
63 | } else
|
---|
64 | log.LogMessage(String.Format(" {0} => already downloaded", lightTask.Id));
|
---|
65 | } else
|
---|
66 | log.LogMessage(String.Format(" {0} => ignored ({1})", lightTask.Id, lightTask.State.ToString()));
|
---|
67 | }
|
---|
68 | endReached = true;
|
---|
69 | if (jobCount == 0)
|
---|
70 | allJobsFinished.Set();
|
---|
71 |
|
---|
72 | allJobsFinished.WaitOne();
|
---|
73 |
|
---|
74 | GC.Collect();
|
---|
75 | log.LogMessage(String.Format("All tasks for job {0} finished", ParentJob.Name));
|
---|
76 | }
|
---|
77 |
|
---|
78 | /// <summary>
|
---|
79 | /// adds a task with state finished to the downloader
|
---|
80 | /// </summary>
|
---|
81 | /// <param name="taskId"></param>
|
---|
82 | /// <param name="taskPath"></param>
|
---|
83 | private void addDownloaderTask(Guid taskId, string taskPath) {
|
---|
84 | //wait for free slot
|
---|
85 | limitSemaphore.WaitOne();
|
---|
86 |
|
---|
87 | Interlocked.Increment(ref jobCount);
|
---|
88 | downloader.DownloadTaskDataAndTask(taskId, (task, itemTask) => {
|
---|
89 |
|
---|
90 |
|
---|
91 | log.LogMessage(String.Format("\"{0}\" - [{1}]: {2} finished", ParentJob.Name, task.Id, itemTask.Name));
|
---|
92 |
|
---|
93 |
|
---|
94 | //start serialize job
|
---|
95 | if (itemTask is OptimizerTask) {
|
---|
96 | OptimizerTask optimizerTask = itemTask as OptimizerTask;
|
---|
97 |
|
---|
98 | //add task to serializer queue
|
---|
99 | TaskSerializer.Serialize(new SerializerTask() {
|
---|
100 | Content = optimizerTask.Item as IStorableContent,
|
---|
101 | FilePath = taskPath,
|
---|
102 | OnSaved = () => {
|
---|
103 | log.LogMessage(String.Format("\"{0}\" - [{1}]: {2} saved", ParentJob.Name, task.Id, itemTask.Name));
|
---|
104 | limitSemaphore.Release();
|
---|
105 | }
|
---|
106 | });
|
---|
107 | } else {
|
---|
108 | throw new InvalidOperationException(
|
---|
109 | String.Format("Unsupported task type {0}", itemTask.GetType().Name));
|
---|
110 | }
|
---|
111 |
|
---|
112 | //this job has finished downloading
|
---|
113 | Interlocked.Decrement(ref jobCount);
|
---|
114 |
|
---|
115 | //if this was the last job
|
---|
116 | if (jobCount == 0 && endReached)
|
---|
117 | allJobsFinished.Set();
|
---|
118 | });
|
---|
119 | }
|
---|
120 |
|
---|
121 | /// <summary>
|
---|
122 | /// check if there is a task directory which is not empty
|
---|
123 | /// </summary>
|
---|
124 | /// <param name="id"></param>
|
---|
125 | /// <param name="taskPath"></param>
|
---|
126 | /// <returns></returns>
|
---|
127 | private bool CheckIfTaskDownloaded(Guid id, out string taskPath) {
|
---|
128 | DirectoryInfo dirInfo = new DirectoryInfo(RootLocation);
|
---|
129 | if (!dirInfo.Exists) {
|
---|
130 | dirInfo.Create();
|
---|
131 | }
|
---|
132 |
|
---|
133 | taskPath = Path.Combine(RootLocation, id.ToString() + ".hl");
|
---|
134 | FileInfo fileInfo = new FileInfo(taskPath);
|
---|
135 |
|
---|
136 | if (fileInfo.Exists)
|
---|
137 | return true;
|
---|
138 |
|
---|
139 | return false;
|
---|
140 | }
|
---|
141 | }
|
---|
142 | }
|
---|