Free cookie consent management tool by TermsFeed Policy Generator

source: branches/HeuristicLab.Tools/HeuristicLab.HiveDrain/HeuristicLab.HiveDrain/JobTaskOneFileDownloader.cs @ 12823

Last change on this file since 12823 was 12823, checked in by ascheibe, 9 years ago

#2017 added a new mode to hive drain for extracting runs and storing them in a single file

File size: 4.5 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2015 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Threading;
25using HeuristicLab.Clients.Hive;
26using HeuristicLab.Clients.Hive.Jobs;
27using HeuristicLab.Common;
28using HeuristicLab.Core;
29using HeuristicLab.Optimization;
30
31namespace HeuristicLab.HiveDrain {
32  public class JobTaskOneFileDownloader {
33    public String RootLocation { get; set; }
34
35    public Job ParentJob { get; set; }
36
37    private ILog log;
38
39    private RunCollection results = new RunCollection();
40
41    private static ConcurrentTaskDownloader<ItemTask> downloader =
42        new ConcurrentTaskDownloader<ItemTask>(HeuristicLabHiveDrainApplication.MaxParallelDownloads, HeuristicLabHiveDrainApplication.MaxParallelDownloads);
43
44    private static int jobCount = 0;
45
46    private static bool endReached = false;
47
48    private ManualResetEvent allJobsFinished = new ManualResetEvent(false);
49
50    private Semaphore limitSemaphore = null;
51
52    static JobTaskOneFileDownloader() {
53      downloader.ExceptionOccured += downloader_ExceptionOccured;
54    }
55
56    static void downloader_ExceptionOccured(object sender, HeuristicLab.Common.EventArgs<Exception> e) {
57      HiveDrainMainWindow.Log.LogMessage(DateTime.Now.ToShortTimeString() + " ### Exception occured: " + e.Value);
58    }
59
60    public JobTaskOneFileDownloader(string path, Job parentJob, Semaphore sem, ILog log) {
61      RootLocation = path + ".hl";
62      ParentJob = parentJob;
63      limitSemaphore = sem;
64      this.log = log;
65    }
66
67    public void Start() {
68      results = new RunCollection();
69
70      IEnumerable<LightweightTask> allTasks;
71      allTasks = HiveServiceLocator.Instance.CallHiveService(s =>
72          s.GetLightweightJobTasksWithoutStateLog(ParentJob.Id));
73
74      foreach (var lightTask in allTasks) {
75        if (lightTask.State == TaskState.Finished) {
76          AddDownloaderTask(lightTask.Id);
77          log.LogMessage(String.Format("   Getting Id {0}: {1}", lightTask.Id, DateTime.Now.ToShortTimeString()));
78        } else
79          log.LogMessage(String.Format("   {0} => ignored ({1})", lightTask.Id, lightTask.State.ToString()));
80      }
81      endReached = true;
82      if (jobCount == 0)
83        allJobsFinished.Set();
84
85      allJobsFinished.WaitOne();
86      log.LogMessage("Saving data to file...");
87      ContentManager.Save(results, RootLocation, true);
88
89      GC.Collect();
90      log.LogMessage(String.Format("All tasks for job {0} finished", ParentJob.Name));
91    }
92
93    /// <summary>
94    /// adds a task with state finished to the downloader
95    /// </summary>
96    /// <param name="taskId"></param>
97    /// <param name="taskPath"></param>
98    private void AddDownloaderTask(Guid taskId) {
99      //wait for free slot
100      limitSemaphore.WaitOne();
101
102      Interlocked.Increment(ref jobCount);
103      downloader.DownloadTaskDataAndTask(taskId, (task, itemTask) => {
104
105        log.LogMessage(String.Format("\"{0}\" - [{1}]: {2} finished", ParentJob.Name, task.Id, itemTask.Name));
106        if (itemTask is OptimizerTask) {
107          OptimizerTask optimizerTask = itemTask as OptimizerTask;
108          IOptimizer opt = (IOptimizer)optimizerTask.Item;
109
110          lock (results) {
111            results.AddRange(opt.Runs);
112          }
113
114          log.LogMessage(String.Format("\"{0}\" - [{1}]: {2} added to result collection", ParentJob.Name, task.Id, itemTask.Name));
115        } else {
116          log.LogMessage(String.Format("Unsupported task type {0}", itemTask.GetType().Name));
117        }
118
119        limitSemaphore.Release();
120        Interlocked.Decrement(ref jobCount);
121
122        //if this was the last job
123        if (jobCount == 0 && endReached)
124          allJobsFinished.Set();
125      });
126    }
127  }
128}
Note: See TracBrowser for help on using the repository browser.