source: branches/UnloadJobs/HeuristicLab.Clients.Hive/3.3/HiveClient.cs @ 9170

Last change on this file since 9170 was 9170, checked in by ascheibe, 9 years ago

#2005 fixed handling of unobservable exceptions

File size: 22.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Configuration;
25using System.IO;
26using System.Linq;
27using System.Security.Cryptography;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32using HeuristicLab.MainForm;
33using HeuristicLab.PluginInfrastructure;
34using TS = System.Threading.Tasks;
35
36namespace HeuristicLab.Clients.Hive {
37  [Item("HiveClient", "Hive client.")]
38  public sealed class HiveClient : IContent {
39    private static HiveClient instance;
40    public static HiveClient Instance {
41      get {
42        if (instance == null) instance = new HiveClient();
43        return instance;
44      }
45    }
46
47    #region Properties
48    private ItemCollection<RefreshableJob> jobs;
49    public ItemCollection<RefreshableJob> Jobs {
50      get { return jobs; }
51      set {
52        if (value != jobs) {
53          jobs = value;
54          OnHiveJobsChanged();
55        }
56      }
57    }
58
59    private List<Plugin> onlinePlugins;
60    public List<Plugin> OnlinePlugins {
61      get { return onlinePlugins; }
62      set { onlinePlugins = value; }
63    }
64
65    private List<Plugin> alreadyUploadedPlugins;
66    public List<Plugin> AlreadyUploadedPlugins {
67      get { return alreadyUploadedPlugins; }
68      set { alreadyUploadedPlugins = value; }
69    }
70
71    private bool isAllowedPrivileged;
72    public bool IsAllowedPrivileged {
73      get { return isAllowedPrivileged; }
74      set { isAllowedPrivileged = value; }
75    }
76    #endregion
77
78    private HiveClient() {
79      //this will never be deregistered
80      TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
81    }
82
83    private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
84      e.SetObserved(); // avoid crash of process because task crashes. first exception found is handled in Results property
85      throw new HiveException("Unobserved Exception in ConcurrentTaskDownloader", e.Exception);
86    }
87
88    public void ClearHiveClient() {
89      foreach (var j in Jobs) {
90        if (j.RefreshAutomatically) {
91          j.RefreshAutomatically = false; // stop result polling
92          j.Dispose();
93        }
94      }
95      Jobs = null;
96      if (onlinePlugins != null)
97        onlinePlugins.Clear();
98      if (alreadyUploadedPlugins != null)
99        alreadyUploadedPlugins.Clear();
100    }
101
102    #region Refresh
103    public void Refresh() {
104      OnRefreshing();
105
106      try {
107        IsAllowedPrivileged = HiveServiceLocator.Instance.CallHiveService((s) => s.IsAllowedPrivileged());
108
109        var oldJobs = jobs ?? new ItemCollection<RefreshableJob>();
110        jobs = new HiveItemCollection<RefreshableJob>();
111        var jobsLoaded = HiveServiceLocator.Instance.CallHiveService<IEnumerable<Job>>(s => s.GetJobs());
112
113        foreach (var j in jobsLoaded) {
114          var job = oldJobs.SingleOrDefault(x => x.Id == j.Id);
115          if (job == null) {
116            // new
117            jobs.Add(new RefreshableJob(j) { IsAllowedPrivileged = this.isAllowedPrivileged });
118          } else {
119            // update
120            job.Job = j;
121            job.IsAllowedPrivileged = this.isAllowedPrivileged;
122            jobs.Add(job);
123          }
124        }
125        // remove those which were not in the list of loaded hiveexperiments
126        foreach (var job in oldJobs) {
127          if (job.Id == Guid.Empty) {
128            // experiment not uploaded... keep
129            jobs.Add(job);
130          } else {
131            job.RefreshAutomatically = false; // stop results polling
132          }
133        }
134      }
135      catch {
136        jobs = null;
137        throw;
138      }
139      finally {
140        OnRefreshed();
141      }
142    }
143    public void RefreshAsync(Action<Exception> exceptionCallback) {
144      var call = new Func<Exception>(delegate() {
145        try {
146          Refresh();
147        }
148        catch (Exception ex) {
149          return ex;
150        }
151        return null;
152      });
153      call.BeginInvoke(delegate(IAsyncResult result) {
154        Exception ex = call.EndInvoke(result);
155        if (ex != null) exceptionCallback(ex);
156      }, null);
157    }
158    #endregion
159
160    #region Store
161    public static void Store(IHiveItem item, CancellationToken cancellationToken) {
162      if (item.Id == Guid.Empty) {
163        if (item is RefreshableJob) {
164          HiveClient.Instance.UploadJob((RefreshableJob)item, cancellationToken);
165        }
166        if (item is JobPermission) {
167          var hep = (JobPermission)item;
168          hep.GrantedUserId = HiveServiceLocator.Instance.CallHiveService((s) => s.GetUserIdByUsername(hep.GrantedUserName));
169          if (hep.GrantedUserId == Guid.Empty) {
170            throw new ArgumentException(string.Format("The user {0} was not found.", hep.GrantedUserName));
171          }
172          HiveServiceLocator.Instance.CallHiveService((s) => s.GrantPermission(hep.JobId, hep.GrantedUserId, hep.Permission));
173        }
174      } else {
175        if (item is Job)
176          HiveServiceLocator.Instance.CallHiveService(s => s.UpdateJob((Job)item));
177      }
178    }
179    public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken) {
180      var call = new Func<Exception>(delegate() {
181        try {
182          Store(item, cancellationToken);
183        }
184        catch (Exception ex) {
185          return ex;
186        }
187        return null;
188      });
189      call.BeginInvoke(delegate(IAsyncResult result) {
190        Exception ex = call.EndInvoke(result);
191        if (ex != null) exceptionCallback(ex);
192      }, null);
193    }
194    #endregion
195
196    #region Delete
197    public static void Delete(IHiveItem item) {
198      if (item.Id == Guid.Empty && item.GetType() != typeof(JobPermission))
199        return;
200
201      if (item is Job)
202        HiveServiceLocator.Instance.CallHiveService(s => s.DeleteJob(item.Id));
203      if (item is RefreshableJob) {
204        RefreshableJob job = (RefreshableJob)item;
205        if (job.RefreshAutomatically) {
206          job.StopResultPolling();
207        }
208        HiveServiceLocator.Instance.CallHiveService(s => s.DeleteJob(item.Id));
209      }
210      if (item is JobPermission) {
211        var hep = (JobPermission)item;
212        HiveServiceLocator.Instance.CallHiveService(s => s.RevokePermission(hep.JobId, hep.GrantedUserId));
213      }
214      item.Id = Guid.Empty;
215    }
216    #endregion
217
218    #region Events
219    public event EventHandler Refreshing;
220    private void OnRefreshing() {
221      EventHandler handler = Refreshing;
222      if (handler != null) handler(this, EventArgs.Empty);
223    }
224    public event EventHandler Refreshed;
225    private void OnRefreshed() {
226      var handler = Refreshed;
227      if (handler != null) handler(this, EventArgs.Empty);
228    }
229    public event EventHandler HiveJobsChanged;
230    private void OnHiveJobsChanged() {
231      var handler = HiveJobsChanged;
232      if (handler != null) handler(this, EventArgs.Empty);
233    }
234    #endregion
235
236    public static void StartJob(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken) {
237      HiveClient.StoreAsync(
238        new Action<Exception>((Exception ex) => {
239          refreshableJob.ExecutionState = ExecutionState.Prepared;
240          exceptionCallback(ex);
241        }), refreshableJob, cancellationToken);
242      refreshableJob.ExecutionState = ExecutionState.Started;
243    }
244
245    public static void PauseJob(RefreshableJob refreshableJob) {
246      HiveServiceLocator.Instance.CallHiveService(service => {
247        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
248          if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
249            service.PauseTask(task.Task.Id);
250        }
251      });
252      refreshableJob.ExecutionState = ExecutionState.Paused;
253    }
254
255    public static void StopJob(RefreshableJob refreshableJob) {
256      HiveServiceLocator.Instance.CallHiveService(service => {
257        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
258          if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
259            service.StopTask(task.Task.Id);
260        }
261      });
262      refreshableJob.ExecutionState = ExecutionState.Stopped;
263    }
264
265    public static void ResumeJob(RefreshableJob refreshableJob) {
266      HiveServiceLocator.Instance.CallHiveService(service => {
267        foreach (HiveTask task in refreshableJob.GetAllHiveTasks()) {
268          if (task.Task.State == TaskState.Paused) {
269            service.RestartTask(task.Task.Id);
270          }
271        }
272      });
273      refreshableJob.ExecutionState = ExecutionState.Started;
274    }
275
276    #region Upload Job
277    private Semaphore taskUploadSemaphore = new Semaphore(Settings.Default.MaxParallelUploads, Settings.Default.MaxParallelUploads);
278    private static object jobCountLocker = new object();
279    private static object pluginLocker = new object();
280    private void UploadJob(RefreshableJob refreshableJob, CancellationToken cancellationToken) {
281      try {
282        refreshableJob.IsProgressing = true;
283        refreshableJob.Progress = new Progress("Connecting to server...");
284        IEnumerable<string> resourceNames = ToResourceNameList(refreshableJob.Job.ResourceNames);
285        var resourceIds = new List<Guid>();
286        foreach (var resourceName in resourceNames) {
287          Guid resourceId = HiveServiceLocator.Instance.CallHiveService((s) => s.GetResourceId(resourceName));
288          if (resourceId == Guid.Empty) {
289            throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
290          }
291          resourceIds.Add(resourceId);
292        }
293
294        foreach (OptimizerHiveTask hiveJob in refreshableJob.HiveTasks.OfType<OptimizerHiveTask>()) {
295          hiveJob.SetIndexInParentOptimizerList(null);
296        }
297
298        // upload Job
299        refreshableJob.Progress.Status = "Uploading Job...";
300        refreshableJob.Job.Id = HiveServiceLocator.Instance.CallHiveService((s) => s.AddJob(refreshableJob.Job));
301        bool isPrivileged = refreshableJob.Job.IsPrivileged;
302        refreshableJob.Job = HiveServiceLocator.Instance.CallHiveService((s) => s.GetJob(refreshableJob.Job.Id)); // update owner and permissions
303        refreshableJob.Job.IsPrivileged = isPrivileged;
304        cancellationToken.ThrowIfCancellationRequested();
305
306        int totalJobCount = refreshableJob.GetAllHiveTasks().Count();
307        int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library
308        cancellationToken.ThrowIfCancellationRequested();
309
310        // upload plugins
311        refreshableJob.Progress.Status = "Uploading plugins...";
312        this.OnlinePlugins = HiveServiceLocator.Instance.CallHiveService((s) => s.GetPlugins());
313        this.AlreadyUploadedPlugins = new List<Plugin>();
314        Plugin configFilePlugin = HiveServiceLocator.Instance.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins));
315        this.alreadyUploadedPlugins.Add(configFilePlugin);
316        cancellationToken.ThrowIfCancellationRequested();
317
318        // upload tasks
319        refreshableJob.Progress.Status = "Uploading tasks...";
320
321        var tasks = new List<TS.Task>();
322        foreach (HiveTask hiveTask in refreshableJob.HiveTasks) {
323          var task = TS.Task.Factory.StartNew((hj) => {
324            UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, refreshableJob.Job.IsPrivileged, cancellationToken);
325          }, hiveTask);
326          task.ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
327          tasks.Add(task);
328        }
329        TS.Task.WaitAll(tasks.ToArray());
330      }
331      finally {
332        //refreshableJob.RefreshAutomatically = true;       
333        refreshableJob.Job.Modified = false;
334        refreshableJob.IsProgressing = false;
335        refreshableJob.Progress.Finish();
336      }
337    }
338
339    /// <summary>
340    /// Uploads the local configuration file as plugin
341    /// </summary>
342    private static Plugin UploadConfigurationFile(IHiveService service, List<Plugin> onlinePlugins) {
343      string exeFilePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, Settings.Default.HLBinaryName);
344      string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
345      string configFilePath = ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath;
346      byte[] hash;
347
348      byte[] data = File.ReadAllBytes(configFilePath);
349      using (SHA1 sha1 = SHA1.Create()) {
350        hash = sha1.ComputeHash(data);
351      }
352
353      Plugin configPlugin = new Plugin() { Name = "Configuration", Version = new Version(), Hash = hash };
354      PluginData configFile = new PluginData() { FileName = configFileName, Data = data };
355
356      IEnumerable<Plugin> onlineConfig = onlinePlugins.Where(p => p.Hash.SequenceEqual(hash));
357
358      if (onlineConfig.Count() > 0) {
359        return onlineConfig.First();
360      } else {
361        configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
362        return configPlugin;
363      }
364    }
365
366    /// <summary>
367    /// Uploads the given task and all its child-jobs while setting the proper parentJobId values for the childs
368    /// </summary>
369    /// <param name="parentHiveTask">shall be null if its the root task</param>
370    private void UploadTaskWithChildren(Progress progress, HiveTask hiveTask, HiveTask parentHiveTask, IEnumerable<Guid> groups, int[] taskCount, int totalJobCount, Guid configPluginId, Guid jobId, ILog log, bool isPrivileged, CancellationToken cancellationToken) {
371      taskUploadSemaphore.WaitOne();
372      bool semaphoreReleased = false;
373      try {
374        cancellationToken.ThrowIfCancellationRequested();
375        lock (jobCountLocker) {
376          taskCount[0]++;
377        }
378        TaskData taskData;
379        List<IPluginDescription> plugins;
380
381        if (hiveTask.ItemTask.ComputeInParallel && (hiveTask.ItemTask.Item is Optimization.Experiment || hiveTask.ItemTask.Item is Optimization.BatchRun)) {
382          hiveTask.Task.IsParentTask = true;
383          hiveTask.Task.FinishWhenChildJobsFinished = true;
384          taskData = hiveTask.GetAsTaskData(true, out plugins);
385        } else {
386          hiveTask.Task.IsParentTask = false;
387          hiveTask.Task.FinishWhenChildJobsFinished = false;
388          taskData = hiveTask.GetAsTaskData(false, out plugins);
389        }
390        cancellationToken.ThrowIfCancellationRequested();
391
392        TryAndRepeat(() => {
393          if (!cancellationToken.IsCancellationRequested) {
394            lock (pluginLocker) {
395              HiveServiceLocator.Instance.CallHiveService((s) => hiveTask.Task.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));
396            }
397          }
398        }, Settings.Default.MaxRepeatServiceCalls, "Failed to upload plugins");
399        cancellationToken.ThrowIfCancellationRequested();
400        hiveTask.Task.PluginsNeededIds.Add(configPluginId);
401        hiveTask.Task.JobId = jobId;
402        hiveTask.Task.IsPrivileged = isPrivileged;
403
404        log.LogMessage(string.Format("Uploading task ({0} kb, {1} objects)", taskData.Data.Count() / 1024, hiveTask.ItemTask.GetObjectGraphObjects().Count()));
405        TryAndRepeat(() => {
406          if (!cancellationToken.IsCancellationRequested) {
407            if (parentHiveTask != null) {
408              hiveTask.Task.Id = HiveServiceLocator.Instance.CallHiveService((s) => s.AddChildTask(parentHiveTask.Task.Id, hiveTask.Task, taskData));
409            } else {
410              hiveTask.Task.Id = HiveServiceLocator.Instance.CallHiveService((s) => s.AddTask(hiveTask.Task, taskData, groups.ToList()));
411            }
412          }
413        }, Settings.Default.MaxRepeatServiceCalls, "Failed to add task", log);
414        cancellationToken.ThrowIfCancellationRequested();
415
416        lock (jobCountLocker) {
417          progress.ProgressValue = (double)taskCount[0] / totalJobCount;
418          progress.Status = string.Format("Uploaded task ({0} of {1})", taskCount[0], totalJobCount);
419        }
420
421        var tasks = new List<TS.Task>();
422        foreach (HiveTask child in hiveTask.ChildHiveTasks) {
423          var task = TS.Task.Factory.StartNew((tuple) => {
424            var arguments = (Tuple<HiveTask, HiveTask>)tuple;
425            UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, groups, taskCount, totalJobCount, configPluginId, jobId, log, isPrivileged, cancellationToken);
426          }, new Tuple<HiveTask, HiveTask>(child, hiveTask));
427          task.ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
428          tasks.Add(task);
429        }
430        taskUploadSemaphore.Release(); semaphoreReleased = true; // the semaphore has to be release before waitall!
431        TS.Task.WaitAll(tasks.ToArray());
432      }
433      finally {
434        if (!semaphoreReleased) taskUploadSemaphore.Release();
435      }
436    }
437    #endregion
438
439    #region Download Experiment
440    public static void LoadJob(RefreshableJob refreshableJob) {
441      var hiveExperiment = refreshableJob.Job;
442      refreshableJob.IsProgressing = true;
443      refreshableJob.Progress = new Progress();
444      TaskDownloader downloader = null;
445
446      try {
447        int totalJobCount = 0;
448        IEnumerable<LightweightTask> allTasks;
449
450        refreshableJob.Progress.Status = "Connecting to Server...";
451        // fetch all task objects to create the full tree of tree of HiveTask objects
452        refreshableJob.Progress.Status = "Downloading list of tasks...";
453        allTasks = HiveServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobTasks(hiveExperiment.Id));
454        totalJobCount = allTasks.Count();
455
456        refreshableJob.Progress.Status = "Downloading tasks...";
457        downloader = new TaskDownloader(allTasks.Select(x => x.Id));
458        downloader.StartAsync();
459
460        while (!downloader.IsFinished) {
461          refreshableJob.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
462          refreshableJob.Progress.Status = string.Format("Downloading/deserializing tasks... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
463          Thread.Sleep(500);
464
465          if (downloader.IsFaulted) {
466            throw downloader.Exception;
467          }
468        }
469        IDictionary<Guid, HiveTask> allHiveTasks = downloader.Results;
470        var parents = allHiveTasks.Values.Where(x => !x.Task.ParentTaskId.HasValue);
471
472        refreshableJob.Progress.Status = "Downloading/deserializing complete. Displaying tasks...";
473        // build child-task tree
474        foreach (HiveTask hiveTask in parents) {
475          BuildHiveJobTree(hiveTask, allTasks, allHiveTasks);
476        }
477
478        refreshableJob.HiveTasks = new ItemCollection<HiveTask>(parents);
479        if (refreshableJob.IsFinished()) {
480          refreshableJob.ExecutionState = Core.ExecutionState.Stopped;
481        } else {
482          refreshableJob.ExecutionState = Core.ExecutionState.Started;
483        }
484        refreshableJob.OnLoaded();
485      }
486      finally {
487        refreshableJob.IsProgressing = false;
488        refreshableJob.Progress.Finish();
489        if (downloader != null) {
490          downloader.Dispose();
491        }
492      }
493    }
494
495    private static void BuildHiveJobTree(HiveTask parentHiveTask, IEnumerable<LightweightTask> allTasks, IDictionary<Guid, HiveTask> allHiveTasks) {
496      IEnumerable<LightweightTask> childTasks = from job in allTasks
497                                                where job.ParentTaskId.HasValue && job.ParentTaskId.Value == parentHiveTask.Task.Id
498                                                orderby job.DateCreated ascending
499                                                select job;
500      foreach (LightweightTask task in childTasks) {
501        HiveTask childHiveTask = allHiveTasks[task.Id];
502        BuildHiveJobTree(childHiveTask, allTasks, allHiveTasks);
503        parentHiveTask.AddChildHiveTask(childHiveTask);
504      }
505    }
506    #endregion
507
508    /// <summary>
509    /// Converts a string which can contain Ids separated by ';' to a enumerable
510    /// </summary>
511    private static IEnumerable<string> ToResourceNameList(string resourceNames) {
512      if (!string.IsNullOrEmpty(resourceNames)) {
513        return resourceNames.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries);
514      } else {
515        return new List<string>();
516      }
517    }
518
519    public static ItemTask LoadItemJob(Guid jobId) {
520      TaskData taskData = HiveServiceLocator.Instance.CallHiveService(s => s.GetTaskData(jobId));
521      try {
522        return PersistenceUtil.Deserialize<ItemTask>(taskData.Data);
523      }
524      catch {
525        return null;
526      }
527    }
528
529    /// <summary>
530    /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
531    /// If repetitions is -1, it is repeated infinitely.
532    /// </summary>
533    public static void TryAndRepeat(Action action, int repetitions, string errorMessage, ILog log = null) {
534      while (true) {
535        try { action(); return; }
536        catch (Exception e) {
537          if (repetitions == 0) throw new HiveException(errorMessage, e);
538          if (log != null) log.LogMessage(string.Format("{0}: {1} - will try again!", errorMessage, e.ToString()));
539          repetitions--;
540        }
541      }
542    }
543
544    public static HiveItemCollection<JobPermission> GetJobPermissions(Guid jobId) {
545      return HiveServiceLocator.Instance.CallHiveService((service) => {
546        IEnumerable<JobPermission> jps = service.GetJobPermissions(jobId);
547        foreach (var hep in jps) {
548          hep.UnmodifiedGrantedUserNameUpdate(service.GetUsernameByUserId(hep.GrantedUserId));
549        }
550        return new HiveItemCollection<JobPermission>(jps);
551      });
552    }
553  }
554}
Note: See TracBrowser for help on using the repository browser.