Free cookie consent management tool by TermsFeed Policy Generator

source: branches/WebJobManager/HeuristicLab.Clients.Hive.WebJobManager/Services/HiveClientWeb.cs @ 13689

Last change on this file since 13689 was 13689, checked in by jlodewyc, 9 years ago

#2582 Implemented uploading

File size: 24.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2015 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Configuration;
25using System.IO;
26using System.Linq;
27using System.Security.Cryptography;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32using HeuristicLab.MainForm;
33using HeuristicLab.PluginInfrastructure;
34using TS = System.Threading.Tasks;
35using HeuristicLab.Clients.Hive.WebJobManager.Services;
36using HeuristicLab.Clients.Common.Properties;
37using Microsoft.AspNet.Hosting;
38
39namespace HeuristicLab.Clients.Hive.WebJobManager
40{
41    [Item("HiveClientWeb", "Hive client.")]
42
43
44    public sealed class HiveClientWeb : IContent
45    {
46        private static HiveClientWeb instance;
47        public static HiveClientWeb Instance
48        {
49            get
50            {
51                if (instance == null) instance = new HiveClientWeb();
52                return instance;
53            }
54        }
55        public static IHostingEnvironment CurrentEnv { get; set; } //Current building area
56        #region Properties
57        private HiveItemCollection<RefreshableJob> jobs;
58        public HiveItemCollection<RefreshableJob> Jobs
59        {
60            get { return jobs; }
61            set
62            {
63                if (value != jobs)
64                {
65                    jobs = value;
66                    OnHiveJobsChanged();
67                }
68            }
69        }
70
71        private List<Plugin> onlinePlugins;
72        public List<Plugin> OnlinePlugins
73        {
74            get { return onlinePlugins; }
75            set { onlinePlugins = value; }
76        }
77
78        private List<Plugin> alreadyUploadedPlugins;
79        public List<Plugin> AlreadyUploadedPlugins
80        {
81            get { return alreadyUploadedPlugins; }
82            set { alreadyUploadedPlugins = value; }
83        }
84        #endregion
85
86        private HiveClientWeb()
87        {
88        }
89
90        public void ClearHiveClientWeb()
91        {
92            Jobs.ClearWithoutHiveDeletion();
93            foreach (var j in Jobs)
94            {
95                if (j.RefreshAutomatically)
96                {
97                    j.RefreshAutomatically = false; // stop result polling
98                }
99                j.Dispose();
100            }
101            Jobs = null;
102
103            if (onlinePlugins != null)
104                onlinePlugins.Clear();
105            if (alreadyUploadedPlugins != null)
106                alreadyUploadedPlugins.Clear();
107        }
108
109        #region Refresh
110        public void Refresh()
111        {
112            OnRefreshing();
113
114            try
115            {
116                jobs = new HiveItemCollection<RefreshableJob>();
117                var jobsLoaded = HiveServiceLocatorWebManagerService.Instance.CallHiveService<IEnumerable<Job>>(s => s.GetJobs());
118
119                foreach (var j in jobsLoaded)
120                {
121                    jobs.Add(new RefreshableJob(j));
122                }
123            }
124            catch
125            {
126                jobs = null;
127                throw;
128            }
129            finally
130            {
131                OnRefreshed();
132            }
133        }
134
135        public void RefreshAsync(Action<Exception> exceptionCallback)
136        {
137            var call = new Func<Exception>(delegate ()
138            {
139                try
140                {
141                    Refresh();
142                }
143                catch (Exception ex)
144                {
145                    return ex;
146                }
147                return null;
148            });
149            call.BeginInvoke(delegate (IAsyncResult result)
150            {
151                Exception ex = call.EndInvoke(result);
152                if (ex != null) exceptionCallback(ex);
153            }, null);
154        }
155        #endregion
156
157        #region Store
158        public static void Store(IHiveItem item, CancellationToken cancellationToken)
159        {
160            if (item.Id == Guid.Empty)
161            {
162                if (item is RefreshableJob)
163                {
164                    HiveClientWeb.Instance.UploadJob((RefreshableJob)item, cancellationToken);
165                }
166                if (item is JobPermission)
167                {
168                    var hep = (JobPermission)item;
169                    hep.GrantedUserId = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.GetUserIdByUsername(hep.GrantedUserName));
170                    if (hep.GrantedUserId == Guid.Empty)
171                    {
172                        throw new ArgumentException(string.Format("The user {0} was not found.", hep.GrantedUserName));
173                    }
174                    HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.GrantPermission(hep.JobId, hep.GrantedUserId, hep.Permission));
175                }
176            }
177            else {
178                if (item is Job)
179                    HiveServiceLocatorWebManagerService.Instance.CallHiveService(s => s.UpdateJob((Job)item));
180            }
181        }
182        public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken)
183        {
184            var call = new Func<Exception>(delegate ()
185            {
186                try
187                {
188                    Store(item, cancellationToken);
189                }
190                catch (Exception ex)
191                {
192                    return ex;
193                }
194                return null;
195            });
196            call.BeginInvoke(delegate (IAsyncResult result)
197            {
198                Exception ex = call.EndInvoke(result);
199                if (ex != null) exceptionCallback(ex);
200            }, null);
201        }
202        #endregion
203
204        #region Delete
205        public static void Delete(IHiveItem item)
206        {
207            if (item.Id == Guid.Empty && item.GetType() != typeof(JobPermission))
208                return;
209
210            if (item is Job)
211                HiveServiceLocatorWebManagerService.Instance.CallHiveService(s => s.DeleteJob(item.Id));
212            if (item is RefreshableJob)
213            {
214                RefreshableJob job = (RefreshableJob)item;
215                if (job.RefreshAutomatically)
216                {
217                    job.StopResultPolling();
218                }
219                HiveServiceLocatorWebManagerService.Instance.CallHiveService(s => s.DeleteJob(item.Id));
220            }
221            if (item is JobPermission)
222            {
223                var hep = (JobPermission)item;
224                HiveServiceLocatorWebManagerService.Instance.CallHiveService(s => s.RevokePermission(hep.JobId, hep.GrantedUserId));
225            }
226            item.Id = Guid.Empty;
227        }
228        #endregion
229
230        #region Events
231        public event EventHandler Refreshing;
232        private void OnRefreshing()
233        {
234            EventHandler handler = Refreshing;
235            if (handler != null) handler(this, EventArgs.Empty);
236        }
237        public event EventHandler Refreshed;
238        private void OnRefreshed()
239        {
240            var handler = Refreshed;
241            if (handler != null) handler(this, EventArgs.Empty);
242        }
243        public event EventHandler HiveJobsChanged;
244        private void OnHiveJobsChanged()
245        {
246            var handler = HiveJobsChanged;
247            if (handler != null) handler(this, EventArgs.Empty);
248        }
249        #endregion
250
251        public static void StartJob(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken)
252        {
253            HiveClientWeb.StoreAsync(
254              new Action<Exception>((Exception ex) =>
255              {
256
257                  exceptionCallback(ex);
258              }), refreshableJob, cancellationToken);
259
260        }
261
262        public static void PauseJob(RefreshableJob refreshableJob)
263        {
264            HiveServiceLocatorWebManagerService.Instance.CallHiveService(service =>
265            {
266                foreach (HiveTask task in refreshableJob.GetAllHiveTasks())
267                {
268                    if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
269                        service.PauseTask(task.Task.Id);
270                }
271            });
272        }
273
274        public static void StopJob(RefreshableJob refreshableJob)
275        {
276            HiveServiceLocatorWebManagerService.Instance.CallHiveService(service =>
277            {
278                foreach (HiveTask task in refreshableJob.GetAllHiveTasks())
279                {
280                    if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
281                        service.StopTask(task.Task.Id);
282                }
283            });
284        }
285
286        public static void ResumeJob(RefreshableJob refreshableJob)
287        {
288            HiveServiceLocatorWebManagerService.Instance.CallHiveService(service =>
289            {
290                foreach (HiveTask task in refreshableJob.GetAllHiveTasks())
291                {
292                    if (task.Task.State == TaskState.Paused)
293                    {
294                        service.RestartTask(task.Task.Id);
295                    }
296                }
297            });
298        }
299
300        #region Upload Job
301        private Semaphore taskUploadSemaphore = new Semaphore(5, 5);
302        private static object jobCountLocker = new object();
303        private static object pluginLocker = new object();
304        private void UploadJob(RefreshableJob refreshableJob, CancellationToken cancellationToken)
305        {
306            try
307            {
308                refreshableJob.IsProgressing = true;
309                refreshableJob.Progress.Start("Connecting to server...");
310                IEnumerable<string> resourceNames = ToResourceNameList(refreshableJob.Job.ResourceNames);
311                var resourceIds = new List<Guid>();
312                foreach (var resourceName in resourceNames)
313                {
314                    Guid resourceId = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.GetResourceId(resourceName));
315                    if (resourceId == Guid.Empty)
316                    {
317                        throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
318                    }
319                    resourceIds.Add(resourceId);
320                }
321
322
323
324                // upload Job
325                refreshableJob.Progress.Status = "Uploading Job...";
326                refreshableJob.Job.Id = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.AddJob(refreshableJob.Job));
327                refreshableJob.Job = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.GetJob(refreshableJob.Job.Id)); // update owner and permissions
328                cancellationToken.ThrowIfCancellationRequested();
329
330                int totalJobCount = refreshableJob.GetAllHiveTasks().Count();
331                int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library
332                cancellationToken.ThrowIfCancellationRequested();
333
334                // upload plugins
335                refreshableJob.Progress.Status = "Uploading plugins...";
336                this.OnlinePlugins = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.GetPlugins());
337                this.AlreadyUploadedPlugins = new List<Plugin>();
338                Plugin configFilePlugin = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins));
339                this.alreadyUploadedPlugins.Add(configFilePlugin);
340                cancellationToken.ThrowIfCancellationRequested();
341
342                // upload tasks
343                refreshableJob.Progress.Status = "Uploading tasks...";
344
345                var tasks = new List<TS.Task>();
346                foreach (HiveTask hiveTask in refreshableJob.HiveTasks)
347                {
348                    var task = TS.Task.Factory.StartNew((hj) =>
349                    {
350                        UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, cancellationToken);
351                    }, hiveTask);
352                    task.ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
353                    tasks.Add(task);
354                }
355                TS.Task.WaitAll(tasks.ToArray());
356            }
357            finally
358            {
359                refreshableJob.IsProgressing = false;
360                refreshableJob.Progress.Finish();
361            }
362        }
363
364        /// <summary>
365        /// Uploads the local configuration file as plugin
366        /// </summary>
367        private static Plugin UploadConfigurationFile(IHiveService service, List<Plugin> onlinePlugins)
368        {
369            string exeFilePath = Path.Combine( CurrentEnv.WebRootPath, "bin"  , HeuristicLab.Clients.Hive.Settings.Default.HLBinaryName);
370            string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
371            string configFilePath = ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath;
372            byte[] hash;
373
374            byte[] data = File.ReadAllBytes(configFilePath);
375            using (SHA1 sha1 = SHA1.Create())
376            {
377                hash = sha1.ComputeHash(data);
378            }
379
380            Plugin configPlugin = new Plugin() { Name = "Configuration", Version = new Version(), Hash = hash };
381            PluginData configFile = new PluginData() { FileName = configFileName, Data = data };
382
383            IEnumerable<Plugin> onlineConfig = onlinePlugins.Where(p => p.Hash.SequenceEqual(hash));
384
385            if (onlineConfig.Count() > 0)
386            {
387                return onlineConfig.First();
388            }
389            else {
390                configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
391                return configPlugin;
392            }
393        }
394
395        /// <summary>
396        /// Uploads the given task and all its child-jobs while setting the proper parentJobId values for the childs
397        /// </summary>
398        /// <param name="parentHiveTask">shall be null if its the root task</param>
399        private void UploadTaskWithChildren(IProgress progress, HiveTask hiveTask, HiveTask parentHiveTask, IEnumerable<Guid> groups, int[] taskCount, int totalJobCount, Guid configPluginId, Guid jobId, ILog log, CancellationToken cancellationToken)
400        {
401            taskUploadSemaphore.WaitOne();
402            bool semaphoreReleased = false;
403            try
404            {
405                cancellationToken.ThrowIfCancellationRequested();
406                lock (jobCountLocker)
407                {
408                    taskCount[0]++;
409                }
410                TaskData taskData;
411                List<IPluginDescription> plugins;
412
413                if (hiveTask.ItemTask.ComputeInParallel)
414                {
415                    hiveTask.Task.IsParentTask = true;
416                    hiveTask.Task.FinishWhenChildJobsFinished = true;
417                    taskData = hiveTask.GetAsTaskData(true, out plugins);
418                }
419                else {
420                    hiveTask.Task.IsParentTask = false;
421                    hiveTask.Task.FinishWhenChildJobsFinished = false;
422                    taskData = hiveTask.GetAsTaskData(false, out plugins);
423                }
424                cancellationToken.ThrowIfCancellationRequested();
425
426                TryAndRepeat(() =>
427                {
428                    if (!cancellationToken.IsCancellationRequested)
429                    {
430                        lock (pluginLocker)
431                        {
432                            HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => hiveTask.Task.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));
433                        }
434                    }
435                }, 5, "Failed to upload plugins");
436                cancellationToken.ThrowIfCancellationRequested();
437                hiveTask.Task.PluginsNeededIds.Add(configPluginId);
438                hiveTask.Task.JobId = jobId;
439
440                log.LogMessage(string.Format("Uploading task ({0} kb, {1} objects)", taskData.Data.Count() / 1024, hiveTask.ItemTask.GetObjectGraphObjects().Count()));
441                TryAndRepeat(() =>
442                {
443                    if (!cancellationToken.IsCancellationRequested)
444                    {
445                        if (parentHiveTask != null)
446                        {
447                            hiveTask.Task.Id = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.AddChildTask(parentHiveTask.Task.Id, hiveTask.Task, taskData));
448                        }
449                        else {
450                            hiveTask.Task.Id = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.AddTask(hiveTask.Task, taskData, groups.ToList()));
451                        }
452                    }
453                }, 5, "Failed to add task", log);
454                cancellationToken.ThrowIfCancellationRequested();
455
456                lock (jobCountLocker)
457                {
458                    progress.ProgressValue = (double)taskCount[0] / totalJobCount;
459                    progress.Status = string.Format("Uploaded task ({0} of {1})", taskCount[0], totalJobCount);
460                }
461
462                var tasks = new List<TS.Task>();
463                foreach (HiveTask child in hiveTask.ChildHiveTasks)
464                {
465                    var task = TS.Task.Factory.StartNew((tuple) =>
466                    {
467                        var arguments = (Tuple<HiveTask, HiveTask>)tuple;
468                        UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, groups, taskCount, totalJobCount, configPluginId, jobId, log, cancellationToken);
469                    }, new Tuple<HiveTask, HiveTask>(child, hiveTask));
470                    task.ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
471                    tasks.Add(task);
472                }
473                taskUploadSemaphore.Release(); semaphoreReleased = true; // the semaphore has to be release before waitall!
474                TS.Task.WaitAll(tasks.ToArray());
475            }
476            finally
477            {
478                if (!semaphoreReleased) taskUploadSemaphore.Release();
479            }
480        }
481        #endregion
482
483        #region Download Experiment
484        public static void LoadJob(RefreshableJob refreshableJob)
485        {
486            var hiveExperiment = refreshableJob.Job;
487            refreshableJob.IsProgressing = true;
488            TaskDownloader downloader = null;
489
490            try
491            {
492                int totalJobCount = 0;
493                IEnumerable<LightweightTask> allTasks;
494
495                // fetch all task objects to create the full tree of tree of HiveTask objects
496                refreshableJob.Progress.Start("Downloading list of tasks...");
497                allTasks = HiveServiceLocatorWebManagerService.Instance.CallHiveService(s => s.GetLightweightJobTasksWithoutStateLog(hiveExperiment.Id));
498                totalJobCount = allTasks.Count();
499
500                refreshableJob.Progress.Status = "Downloading tasks...";
501                downloader = new TaskDownloader(allTasks.Select(x => x.Id));
502                downloader.StartAsync();
503
504                while (!downloader.IsFinished)
505                {
506                    refreshableJob.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
507                    refreshableJob.Progress.Status = string.Format("Downloading/deserializing tasks... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
508                    Thread.Sleep(500);
509
510                    if (downloader.IsFaulted)
511                    {
512                        throw downloader.Exception;
513                    }
514                }
515                IDictionary<Guid, HiveTask> allHiveTasks = downloader.Results;
516                var parents = allHiveTasks.Values.Where(x => !x.Task.ParentTaskId.HasValue);
517
518                refreshableJob.Progress.Status = "Downloading/deserializing complete. Displaying tasks...";
519                // build child-task tree
520                foreach (HiveTask hiveTask in parents)
521                {
522                    BuildHiveJobTree(hiveTask, allTasks, allHiveTasks);
523                }
524
525                refreshableJob.HiveTasks = new ItemCollection<HiveTask>(parents);
526                refreshableJob.OnLoaded();
527            }
528            finally
529            {
530                refreshableJob.IsProgressing = false;
531                refreshableJob.Progress.Finish();
532                if (downloader != null)
533                {
534                    downloader.Dispose();
535                }
536            }
537        }
538
539        private static void BuildHiveJobTree(HiveTask parentHiveTask, IEnumerable<LightweightTask> allTasks, IDictionary<Guid, HiveTask> allHiveTasks)
540        {
541            IEnumerable<LightweightTask> childTasks = from job in allTasks
542                                                      where job.ParentTaskId.HasValue && job.ParentTaskId.Value == parentHiveTask.Task.Id
543                                                      orderby job.DateCreated ascending
544                                                      select job;
545            foreach (LightweightTask task in childTasks)
546            {
547                HiveTask childHiveTask = allHiveTasks[task.Id];
548                BuildHiveJobTree(childHiveTask, allTasks, allHiveTasks);
549                parentHiveTask.AddChildHiveTask(childHiveTask);
550            }
551        }
552        #endregion
553
554        /// <summary>
555        /// Converts a string which can contain Ids separated by ';' to a enumerable
556        /// </summary>
557        private static IEnumerable<string> ToResourceNameList(string resourceNames)
558        {
559            if (!string.IsNullOrEmpty(resourceNames))
560            {
561                return resourceNames.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries);
562            }
563            else {
564                return new List<string>();
565            }
566        }
567
568        public static ItemTask LoadItemJob(Guid jobId)
569        {
570            TaskData taskData = HiveServiceLocatorWebManagerService.Instance.CallHiveService(s => s.GetTaskData(jobId));
571            try
572            {
573                return PersistenceUtil.Deserialize<ItemTask>(taskData.Data);
574            }
575            catch
576            {
577                return null;
578            }
579        }
580
581        /// <summary>
582        /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
583        /// If repetitions is -1, it is repeated infinitely.
584        /// </summary>
585        public static void TryAndRepeat(Action action, int repetitions, string errorMessage, ILog log = null)
586        {
587            while (true)
588            {
589                try { action(); return; }
590                catch (Exception e)
591                {
592                    if (repetitions == 0) throw new HiveException(errorMessage, e);
593                    if (log != null) log.LogMessage(string.Format("{0}: {1} - will try again!", errorMessage, e.ToString()));
594                    repetitions--;
595                }
596            }
597        }
598
599        public static HiveItemCollection<JobPermission> GetJobPermissions(Guid jobId)
600        {
601            return HiveServiceLocatorWebManagerService.Instance.CallHiveService((service) =>
602            {
603                IEnumerable<JobPermission> jps = service.GetJobPermissions(jobId);
604                foreach (var hep in jps)
605                {
606                    hep.UnmodifiedGrantedUserNameUpdate(service.GetUsernameByUserId(hep.GrantedUserId));
607                }
608                return new HiveItemCollection<JobPermission>(jps);
609            });
610        }
611    }
612}
Note: See TracBrowser for help on using the repository browser.