Free cookie consent management tool by TermsFeed Policy Generator

source: branches/OaaS/HeuristicLab.Clients.Hive/3.3/HiveClient.cs @ 13783

Last change on this file since 13783 was 9508, checked in by fschoepp, 12 years ago

#1888:
HL:

  • Web projects requires different users to interact with hive. The singleton HiveServiceLocator.Instance doesn't allow different users at the same time, resulting in serialization during access of HiveClient methods.

The following changes have been introduced in favor of a parallel use of the HL libs:

  • HiveClient, TaskDownloader and ConcurrentTaskDownloader may now use a different IHiveServiceLocator than HiveServiceLocator.Instance (all methods have appropriate overloads now).
  • The default instance is still HiveServiceLocator.Instance.

Automated Scaling of Instances:

  • Added Scaler project to solution which represents a WorkerRole that scales the slave instances based on the global cpu utilization of all slaves.
  • Scaler is based on WASABi, rules can be adjusted in rulesstore.xml. Basic rule is: if < 45% global cpu utilization => remove an instance; if > 65% cpu => add an instance. Minimum boundary is 1 and maximum boundary is 8 slave instances.
  • Adjusted Slave project to automatically register itself to a SlaveGroup during WebRole startup (can be adjusted in service configuration).

Web-Frontend:

  • Added basic error messages to the dialogs when an ajax call fails.
  • Removed Styling.js from scripts.
File size: 24.6 KB
RevLine 
[6976]1#region License Information
2/* HeuristicLab
[7259]3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
[6976]4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Configuration;
25using System.IO;
26using System.Linq;
27using System.Security.Cryptography;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
[7582]32using HeuristicLab.MainForm;
[6976]33using HeuristicLab.PluginInfrastructure;
34using TS = System.Threading.Tasks;
35
36namespace HeuristicLab.Clients.Hive {
37  [Item("HiveClient", "Hive client.")]
38  public sealed class HiveClient : IContent {
39    private static HiveClient instance;
40    public static HiveClient Instance {
41      get {
42        if (instance == null) instance = new HiveClient();
43        return instance;
44      }
45    }
46
47    #region Properties
[9363]48    private HiveItemCollection<RefreshableJob> jobs;
49    public HiveItemCollection<RefreshableJob> Jobs {
[6976]50      get { return jobs; }
51      set {
52        if (value != jobs) {
53          jobs = value;
[9363]54          OnHiveJobsChanged();
[6976]55        }
56      }
57    }
58
59    private List<Plugin> onlinePlugins;
60    public List<Plugin> OnlinePlugins {
61      get { return onlinePlugins; }
62      set { onlinePlugins = value; }
63    }
64
65    private List<Plugin> alreadyUploadedPlugins;
66    public List<Plugin> AlreadyUploadedPlugins {
67      get { return alreadyUploadedPlugins; }
68      set { alreadyUploadedPlugins = value; }
69    }
70
71    private bool isAllowedPrivileged;
72    public bool IsAllowedPrivileged {
73      get { return isAllowedPrivileged; }
74      set { isAllowedPrivileged = value; }
75    }
[9508]76
77    private IHiveServiceLocator serviceLocator;
78    public IHiveServiceLocator ServiceLocator
79    {
80      get {
81        if (serviceLocator == null)
82          serviceLocator = HiveServiceLocator.Instance;
83        return serviceLocator;
84      }
85      set { serviceLocator = value; }
86    }
87
[6976]88    #endregion
89
[9508]90
91
[9363]92    private HiveClient() {
93      //this will never be deregistered
94      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
95    }
[6976]96
[9363]97    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
98      e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property
99      throw new HiveException("Unobserved Exception in ConcurrentTaskDownloader", e.Exception);
100    }
101
102    public void ClearHiveClient() {
103      Jobs.ClearWithoutHiveDeletion();
104      foreach (var j in Jobs) {
105        if (j.RefreshAutomatically) {
106          j.RefreshAutomatically = false; // stop result polling
107        }
108        j.Dispose();
109      }
110      Jobs = null;
111
112      if (onlinePlugins != null)
113        onlinePlugins.Clear();
114      if (alreadyUploadedPlugins != null)
115        alreadyUploadedPlugins.Clear();
116    }
117
[6976]118    #region Refresh
119    public void Refresh() {
120      OnRefreshing();
121
122      try {
[9508]123        IsAllowedPrivileged = ServiceLocator.CallHiveService((s) => s.IsAllowedPrivileged());
[6976]124
125        jobs = new HiveItemCollection<RefreshableJob>();
[9508]126        var jobsLoaded = ServiceLocator.CallHiveService<IEnumerable<Job>>(s => s.GetJobs());
[6976]127
128        foreach (var j in jobsLoaded) {
[9363]129          jobs.Add(new RefreshableJob(j) { IsAllowedPrivileged = this.isAllowedPrivileged });
[6976]130        }
131      }
132      catch {
133        jobs = null;
134        throw;
[7582]135      }
136      finally {
[6976]137        OnRefreshed();
138      }
139    }
[9363]140
[6976]141    public void RefreshAsync(Action<Exception> exceptionCallback) {
142      var call = new Func<Exception>(delegate() {
143        try {
144          Refresh();
145        }
146        catch (Exception ex) {
147          return ex;
148        }
149        return null;
150      });
151      call.BeginInvoke(delegate(IAsyncResult result) {
152        Exception ex = call.EndInvoke(result);
153        if (ex != null) exceptionCallback(ex);
154      }, null);
155    }
156    #endregion
157
158    #region Store
159    public static void Store(IHiveItem item, CancellationToken cancellationToken) {
[9508]160      Store(item, cancellationToken, HiveServiceLocator.Instance);
161    }
162
163    public static void Store(IHiveItem item, CancellationToken cancellationToken, IHiveServiceLocator locator) {
[6976]164      if (item.Id == Guid.Empty) {
165        if (item is RefreshableJob) {
[9508]166          HiveClient.Instance.UploadJob((RefreshableJob)item, cancellationToken, locator);
[6976]167        }
168        if (item is JobPermission) {
169          var hep = (JobPermission)item;
[9508]170          hep.GrantedUserId = locator.CallHiveService((s) => s.GetUserIdByUsername(hep.GrantedUserName));
[6976]171          if (hep.GrantedUserId == Guid.Empty) {
172            throw new ArgumentException(string.Format("The user {0} was not found.", hep.GrantedUserName));
173          }
[9508]174          locator.CallHiveService((s) => s.GrantPermission(hep.JobId, hep.GrantedUserId, hep.Permission));
[6976]175        }
176      } else {
177        if (item is Job)
[9508]178          locator.CallHiveService(s => s.UpdateJob((Job)item));
[6976]179      }
180    }
[9508]181
[6976]182    public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken) {
[9508]183      StoreAsync(exceptionCallback, item, cancellationToken, HiveServiceLocator.Instance);
184    }
185
186    public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken, IHiveServiceLocator locator) {
[6976]187      var call = new Func<Exception>(delegate() {
188        try {
[9508]189          Store(item, cancellationToken, locator);
[6976]190        }
191        catch (Exception ex) {
192          return ex;
193        }
194        return null;
195      });
196      call.BeginInvoke(delegate(IAsyncResult result) {
197        Exception ex = call.EndInvoke(result);
198        if (ex != null) exceptionCallback(ex);
199      }, null);
200    }
201    #endregion
202
203    #region Delete
204    public static void Delete(IHiveItem item) {
[9508]205      Delete(item, HiveServiceLocator.Instance);
206    }
207
208    public static void Delete(IHiveItem item, IHiveServiceLocator locator) {
[7059]209      if (item.Id == Guid.Empty && item.GetType() != typeof(JobPermission))
[6976]210        return;
211
212      if (item is Job)
[9508]213        locator.CallHiveService(s => s.DeleteJob(item.Id));
[7144]214      if (item is RefreshableJob) {
215        RefreshableJob job = (RefreshableJob)item;
216        if (job.RefreshAutomatically) {
217          job.StopResultPolling();
218        }
[9508]219        locator.CallHiveService(s => s.DeleteJob(item.Id));
[7144]220      }
[6976]221      if (item is JobPermission) {
222        var hep = (JobPermission)item;
[9508]223        locator.CallHiveService(s => s.RevokePermission(hep.JobId, hep.GrantedUserId));
[6976]224      }
225      item.Id = Guid.Empty;
226    }
227    #endregion
228
229    #region Events
230    public event EventHandler Refreshing;
231    private void OnRefreshing() {
232      EventHandler handler = Refreshing;
233      if (handler != null) handler(this, EventArgs.Empty);
234    }
235    public event EventHandler Refreshed;
236    private void OnRefreshed() {
237      var handler = Refreshed;
238      if (handler != null) handler(this, EventArgs.Empty);
239    }
[9363]240    public event EventHandler HiveJobsChanged;
241    private void OnHiveJobsChanged() {
242      var handler = HiveJobsChanged;
[6976]243      if (handler != null) handler(this, EventArgs.Empty);
244    }
245    #endregion
246
247    public static void StartJob(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken) {
[9508]248      StartJob(exceptionCallback, refreshableJob, cancellationToken, HiveServiceLocator.Instance);
249    }
250
251    public static void StartJob(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken, IHiveServiceLocator locator) {
[6976]252      HiveClient.StoreAsync(
253        new Action<Exception>((Exception ex) => {
254          refreshableJob.ExecutionState = ExecutionState.Prepared;
255          exceptionCallback(ex);
[9508]256        }), refreshableJob, cancellationToken, locator);
[6976]257      refreshableJob.ExecutionState = ExecutionState.Started;
258    }
259
260    public static void PauseJob(RefreshableJob refreshableJob) {
[9508]261      PauseJob(refreshableJob, HiveServiceLocator.Instance);
262    }
263
264    public static void PauseJob(RefreshableJob refreshableJob, IHiveServiceLocator locator) {
265      locator.CallHiveService(service => {
[6976]266        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
267          if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
268            service.PauseTask(task.Task.Id);
269        }
270      });
271      refreshableJob.ExecutionState = ExecutionState.Paused;
272    }
273
274    public static void StopJob(RefreshableJob refreshableJob) {
[9508]275      StopJob(refreshableJob, HiveServiceLocator.Instance);
276    }
277
278    public static void StopJob(RefreshableJob refreshableJob, IHiveServiceLocator locator) {
279      locator.CallHiveService(service => {
[6976]280        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
281          if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
282            service.StopTask(task.Task.Id);
283        }
284      });
[7156]285      refreshableJob.ExecutionState = ExecutionState.Stopped;
[6976]286    }
287
[7156]288    public static void ResumeJob(RefreshableJob refreshableJob) {
[9508]289      ResumeJob(refreshableJob, HiveServiceLocator.Instance);
290    }
291
292    public static void ResumeJob(RefreshableJob refreshableJob, IHiveServiceLocator locator) {
293      locator.CallHiveService(service => {
[7156]294        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
[7165]295          if (task.Task.State == TaskState.Paused) {
296            service.RestartTask(task.Task.Id);
[7156]297          }
298        }
299      });
300      refreshableJob.ExecutionState = ExecutionState.Started;
301    }
302
[6976]303    #region Upload Job
304    private Semaphore taskUploadSemaphore = new Semaphore(Settings.Default.MaxParallelUploads, Settings.Default.MaxParallelUploads);
305    private static object jobCountLocker = new object();
306    private static object pluginLocker = new object();
[9508]307    private void UploadJob(RefreshableJob refreshableJob, CancellationToken cancellationToken)
308    {
309      UploadJob(refreshableJob, cancellationToken, HiveServiceLocator.Instance);
310    }
311
312    private void UploadJob(RefreshableJob refreshableJob, CancellationToken cancellationToken, IHiveServiceLocator locator) {
[6976]313      try {
[8156]314        refreshableJob.IsProgressing = true;
[6976]315        refreshableJob.Progress = new Progress("Connecting to server...");
316        IEnumerable<string> resourceNames = ToResourceNameList(refreshableJob.Job.ResourceNames);
317        var resourceIds = new List<Guid>();
318        foreach (var resourceName in resourceNames) {
[9508]319          Guid resourceId = locator.CallHiveService((s) => s.GetResourceId(resourceName));
[6976]320          if (resourceId == Guid.Empty) {
321            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
322          }
323          resourceIds.Add(resourceId);
324        }
325
326        foreach (OptimizerHiveTask hiveJob in refreshableJob.HiveTasks.OfType<OptimizerHiveTask>()) {
327          hiveJob.SetIndexInParentOptimizerList(null);
328        }
329
330        // upload Job
331        refreshableJob.Progress.Status = "Uploading Job...";
[9508]332        refreshableJob.Job.Id = locator.CallHiveService((s) => s.AddJob(refreshableJob.Job));
[6976]333        bool isPrivileged = refreshableJob.Job.IsPrivileged;
[9508]334        refreshableJob.Job = locator.CallHiveService((s) => s.GetJob(refreshableJob.Job.Id)); // update owner and permissions
[6976]335        refreshableJob.Job.IsPrivileged = isPrivileged;
336        cancellationToken.ThrowIfCancellationRequested();
337
338        int totalJobCount = refreshableJob.GetAllHiveTasks().Count();
339        int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library
340        cancellationToken.ThrowIfCancellationRequested();
341
342        // upload plugins
343        refreshableJob.Progress.Status = "Uploading plugins...";
[9508]344        this.OnlinePlugins = locator.CallHiveService((s) => s.GetPlugins());
[6976]345        this.AlreadyUploadedPlugins = new List<Plugin>();
[9508]346        Plugin configFilePlugin = locator.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins));
[6976]347        this.alreadyUploadedPlugins.Add(configFilePlugin);
348        cancellationToken.ThrowIfCancellationRequested();
349
350        // upload tasks
351        refreshableJob.Progress.Status = "Uploading tasks...";
352
353        var tasks = new List<TS.Task>();
354        foreach (HiveTask hiveTask in refreshableJob.HiveTasks) {
[9363]355          var task = TS.Task.Factory.StartNew((hj) => {
[9508]356            UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, refreshableJob.Job.IsPrivileged, cancellationToken, locator);
[9363]357          }, hiveTask);
358          task.ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
359          tasks.Add(task);
[6976]360        }
[9363]361        TS.Task.WaitAll(tasks.ToArray());
[7582]362      }
363      finally {
[9363]364        refreshableJob.Job.Modified = false;
[8159]365        refreshableJob.IsProgressing = false;
[8156]366        refreshableJob.Progress.Finish();
[6976]367      }
368    }
369
370    /// <summary>
371    /// Uploads the local configuration file as plugin
372    /// </summary>
373    private static Plugin UploadConfigurationFile(IHiveService service, List<Plugin> onlinePlugins) {
374      string exeFilePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, Settings.Default.HLBinaryName);
375      string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
376      string configFilePath = ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath;
377      byte[] hash;
378
379      byte[] data = File.ReadAllBytes(configFilePath);
380      using (SHA1 sha1 = SHA1.Create()) {
381        hash = sha1.ComputeHash(data);
382      }
383
384      Plugin configPlugin = new Plugin() { Name = "Configuration", Version = new Version(), Hash = hash };
385      PluginData configFile = new PluginData() { FileName = configFileName, Data = data };
386
387      IEnumerable<Plugin> onlineConfig = onlinePlugins.Where(p => p.Hash.SequenceEqual(hash));
388
389      if (onlineConfig.Count() > 0) {
390        return onlineConfig.First();
391      } else {
392        configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
393        return configPlugin;
394      }
395    }
396
[9508]397    private void UploadTaskWithChildren(Progress progress, HiveTask hiveTask, HiveTask parentHiveTask, IEnumerable<Guid> groups, int[] taskCount, int totalJobCount, Guid configPluginId, Guid jobId, ILog log, bool isPrivileged, CancellationToken cancellationToken)
398    {
399      UploadTaskWithChildren(progress, hiveTask, parentHiveTask, groups, taskCount, totalJobCount, configPluginId, jobId, log, isPrivileged, cancellationToken, HiveServiceLocator.Instance);
400    }
401
[6976]402    /// <summary>
403    /// Uploads the given task and all its child-jobs while setting the proper parentJobId values for the childs
404    /// </summary>
405    /// <param name="parentHiveTask">shall be null if its the root task</param>
[9508]406    private void UploadTaskWithChildren(Progress progress, HiveTask hiveTask, HiveTask parentHiveTask, IEnumerable<Guid> groups, int[] taskCount, int totalJobCount, Guid configPluginId, Guid jobId, ILog log, bool isPrivileged, CancellationToken cancellationToken, IHiveServiceLocator locator) {
[6976]407      taskUploadSemaphore.WaitOne();
408      bool semaphoreReleased = false;
409      try {
410        cancellationToken.ThrowIfCancellationRequested();
411        lock (jobCountLocker) {
412          taskCount[0]++;
413        }
414        TaskData taskData;
415        List<IPluginDescription> plugins;
416
417        if (hiveTask.ItemTask.ComputeInParallel && (hiveTask.ItemTask.Item is Optimization.Experiment || hiveTask.ItemTask.Item is Optimization.BatchRun)) {
418          hiveTask.Task.IsParentTask = true;
419          hiveTask.Task.FinishWhenChildJobsFinished = true;
420          taskData = hiveTask.GetAsTaskData(true, out plugins);
421        } else {
422          hiveTask.Task.IsParentTask = false;
423          hiveTask.Task.FinishWhenChildJobsFinished = false;
424          taskData = hiveTask.GetAsTaskData(false, out plugins);
425        }
426        cancellationToken.ThrowIfCancellationRequested();
427
428        TryAndRepeat(() => {
429          if (!cancellationToken.IsCancellationRequested) {
430            lock (pluginLocker) {
[9508]431              locator.CallHiveService((s) => hiveTask.Task.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));
[6976]432            }
433          }
[7125]434        }, Settings.Default.MaxRepeatServiceCalls, "Failed to upload plugins");
[6976]435        cancellationToken.ThrowIfCancellationRequested();
436        hiveTask.Task.PluginsNeededIds.Add(configPluginId);
437        hiveTask.Task.JobId = jobId;
438        hiveTask.Task.IsPrivileged = isPrivileged;
439
440        log.LogMessage(string.Format("Uploading task ({0} kb, {1} objects)", taskData.Data.Count() / 1024, hiveTask.ItemTask.GetObjectGraphObjects().Count()));
441        TryAndRepeat(() => {
442          if (!cancellationToken.IsCancellationRequested) {
443            if (parentHiveTask != null) {
[9508]444              hiveTask.Task.Id = locator.CallHiveService((s) => s.AddChildTask(parentHiveTask.Task.Id, hiveTask.Task, taskData));
[6976]445            } else {
[9508]446              hiveTask.Task.Id = locator.CallHiveService((s) => s.AddTask(hiveTask.Task, taskData, groups.ToList()));
[6976]447            }
448          }
[7125]449        }, Settings.Default.MaxRepeatServiceCalls, "Failed to add task", log);
[6976]450        cancellationToken.ThrowIfCancellationRequested();
451
452        lock (jobCountLocker) {
453          progress.ProgressValue = (double)taskCount[0] / totalJobCount;
454          progress.Status = string.Format("Uploaded task ({0} of {1})", taskCount[0], totalJobCount);
455        }
456
457        var tasks = new List<TS.Task>();
458        foreach (HiveTask child in hiveTask.ChildHiveTasks) {
[9363]459          var task = TS.Task.Factory.StartNew((tuple) => {
[6976]460            var arguments = (Tuple<HiveTask, HiveTask>)tuple;
[9508]461            UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, groups, taskCount, totalJobCount, configPluginId, jobId, log, isPrivileged, cancellationToken, locator);
[9363]462          }, new Tuple<HiveTask, HiveTask>(child, hiveTask));
463          task.ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
464          tasks.Add(task);
[6976]465        }
466        taskUploadSemaphore.Release(); semaphoreReleased = true; // the semaphore has to be release before waitall!
[9363]467        TS.Task.WaitAll(tasks.ToArray());
[7582]468      }
469      finally {
[6976]470        if (!semaphoreReleased) taskUploadSemaphore.Release();
471      }
472    }
473    #endregion
474
475    #region Download Experiment
[9508]476    public static void LoadJob(RefreshableJob refreshableJob)
477    {
478      LoadJob(refreshableJob, HiveServiceLocator.Instance);
479    }
480
481    public static void LoadJob(RefreshableJob refreshableJob, IHiveServiceLocator locator)
482    {
[6976]483      var hiveExperiment = refreshableJob.Job;
[8156]484      refreshableJob.IsProgressing = true;
[6976]485      refreshableJob.Progress = new Progress();
[9363]486      TaskDownloader downloader = null;
[6976]487
488      try {
489        int totalJobCount = 0;
490        IEnumerable<LightweightTask> allTasks;
491
492        refreshableJob.Progress.Status = "Connecting to Server...";
493        // fetch all task objects to create the full tree of tree of HiveTask objects
494        refreshableJob.Progress.Status = "Downloading list of tasks...";
[9508]495        allTasks = locator.CallHiveService(s => s.GetLightweightJobTasksWithoutStateLog(hiveExperiment.Id));
[6976]496        totalJobCount = allTasks.Count();
497
[7125]498        refreshableJob.Progress.Status = "Downloading tasks...";
[9508]499        downloader = new TaskDownloader(allTasks.Select(x => x.Id), locator);
[6976]500        downloader.StartAsync();
501
502        while (!downloader.IsFinished) {
503          refreshableJob.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
504          refreshableJob.Progress.Status = string.Format("Downloading/deserializing tasks... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
505          Thread.Sleep(500);
506
507          if (downloader.IsFaulted) {
508            throw downloader.Exception;
509          }
510        }
511        IDictionary<Guid, HiveTask> allHiveTasks = downloader.Results;
[7165]512        var parents = allHiveTasks.Values.Where(x => !x.Task.ParentTaskId.HasValue);
[6976]513
[7165]514        refreshableJob.Progress.Status = "Downloading/deserializing complete. Displaying tasks...";
515        // build child-task tree
516        foreach (HiveTask hiveTask in parents) {
517          BuildHiveJobTree(hiveTask, allTasks, allHiveTasks);
518        }
[6976]519
[7165]520        refreshableJob.HiveTasks = new ItemCollection<HiveTask>(parents);
[6976]521        if (refreshableJob.IsFinished()) {
522          refreshableJob.ExecutionState = Core.ExecutionState.Stopped;
523        } else {
524          refreshableJob.ExecutionState = Core.ExecutionState.Started;
525        }
526        refreshableJob.OnLoaded();
[7582]527      }
528      finally {
[8159]529        refreshableJob.IsProgressing = false;
[8156]530        refreshableJob.Progress.Finish();
[9363]531        if (downloader != null) {
532          downloader.Dispose();
533        }
[6976]534      }
535    }
536
[7125]537    private static void BuildHiveJobTree(HiveTask parentHiveTask, IEnumerable<LightweightTask> allTasks, IDictionary<Guid, HiveTask> allHiveTasks) {
538      IEnumerable<LightweightTask> childTasks = from job in allTasks
539                                                where job.ParentTaskId.HasValue && job.ParentTaskId.Value == parentHiveTask.Task.Id
[6976]540                                                orderby job.DateCreated ascending
541                                                select job;
542      foreach (LightweightTask task in childTasks) {
[7125]543        HiveTask childHiveTask = allHiveTasks[task.Id];
[9363]544        BuildHiveJobTree(childHiveTask, allTasks, allHiveTasks);
[7125]545        parentHiveTask.AddChildHiveTask(childHiveTask);
[6976]546      }
547    }
548    #endregion
549
550    /// <summary>
551    /// Converts a string which can contain Ids separated by ';' to a enumerable
552    /// </summary>
553    private static IEnumerable<string> ToResourceNameList(string resourceNames) {
554      if (!string.IsNullOrEmpty(resourceNames)) {
[8109]555        return resourceNames.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries);
[6976]556      } else {
557        return new List<string>();
558      }
559    }
560
561    public static ItemTask LoadItemJob(Guid jobId) {
[9508]562      return LoadItemJob(jobId, HiveServiceLocator.Instance);
563    }
564
565    public static ItemTask LoadItemJob(Guid jobId, IHiveServiceLocator locator) {
566      TaskData taskData = locator.CallHiveService(s => s.GetTaskData(jobId));
[6976]567      try {
568        return PersistenceUtil.Deserialize<ItemTask>(taskData.Data);
569      }
570      catch {
571        return null;
572      }
573    }
574
575    /// <summary>
576    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
577    /// If repetitions is -1, it is repeated infinitely.
578    /// </summary>
579    public static void TryAndRepeat(Action action, int repetitions, string errorMessage, ILog log = null) {
580      while (true) {
581        try { action(); return; }
582        catch (Exception e) {
583          if (repetitions == 0) throw new HiveException(errorMessage, e);
584          if (log != null) log.LogMessage(string.Format("{0}: {1} - will try again!", errorMessage, e.ToString()));
585          repetitions--;
586        }
587      }
588    }
589
590    public static HiveItemCollection<JobPermission> GetJobPermissions(Guid jobId) {
[9508]591      return GetJobPermissions(jobId, HiveServiceLocator.Instance);
592    }
593
594    public static HiveItemCollection<JobPermission> GetJobPermissions(Guid jobId, IHiveServiceLocator locator) {
595      return locator.CallHiveService((service) => {
[6976]596        IEnumerable<JobPermission> jps = service.GetJobPermissions(jobId);
597        foreach (var hep in jps) {
[7059]598          hep.UnmodifiedGrantedUserNameUpdate(service.GetUsernameByUserId(hep.GrantedUserId));
[6976]599        }
600        return new HiveItemCollection<JobPermission>(jps);
601      });
602    }
603  }
604}
Note: See TracBrowser for help on using the repository browser.