Free cookie consent management tool by TermsFeed Policy Generator

source: branches/WebJobManager/HeuristicLab.Clients.Hive.WebJobManager/Services/Imports/HiveClientWeb.cs

Last change on this file was 13862, checked in by jlodewyc, 9 years ago

#2582 Start angular OKB manager, data loaded

File size: 24.6 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2015 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Configuration;
25using System.IO;
26using System.Linq;
27using System.Security.Cryptography;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32using HeuristicLab.MainForm;
33using HeuristicLab.PluginInfrastructure;
34using TS = System.Threading.Tasks;
35using Microsoft.AspNetCore.Hosting;
36
37namespace HeuristicLab.Clients.Hive.WebJobManager.Services.Imports
38{
39    /// <summary>
40    /// Rewritten to use the HiveServiceLocatorWeb
41    /// </summary>
42    [Item("HiveClientWeb", "Hive client.")]
43    public sealed class HiveClientWeb : IContent
44    {
45       
46        public IHostingEnvironment CurrentEnv { get; set; } //Current building area
47        #region Properties
48        private HiveItemCollection<RefreshableJob> jobs;
49        public HiveItemCollection<RefreshableJob> Jobs
50        {
51            get { return jobs; }
52            set
53            {
54                if (value != jobs)
55                {
56                    jobs = value;
57                    OnHiveJobsChanged();
58                }
59            }
60        }
61
62        private List<Plugin> onlinePlugins;
63        public List<Plugin> OnlinePlugins
64        {
65            get { return onlinePlugins; }
66            set { onlinePlugins = value; }
67        }
68
69        private List<Plugin> alreadyUploadedPlugins;
70        public List<Plugin> AlreadyUploadedPlugins
71        {
72            get { return alreadyUploadedPlugins; }
73            set { alreadyUploadedPlugins = value; }
74        }
75        #endregion
76
77        private HiveServiceLocatorWeb serviceLocator;
78        public  Guid UserId { get; set; }
79        public HiveClientWeb(HiveServiceLocatorWeb serv, Guid u)
80        {
81            serviceLocator = serv;
82            UserId = u;
83        }
84        /// <summary>
85        /// Reaches to the webloginservice to get the current instance of the servicelocator.
86        /// </summary>
87        public void updateServiceLocator()
88        {// No idea if needed
89            serviceLocator = WebLoginService.Instance.getServiceLocator(UserId);
90        }
91
92
93        public void ClearHiveClientWeb()
94        {
95            Jobs.ClearWithoutHiveDeletion();
96            foreach (var j in Jobs)
97            {
98                if (j.RefreshAutomatically)
99                {
100                    j.RefreshAutomatically = false; // stop result polling
101                }
102                j.Dispose();
103            }
104            Jobs = null;
105
106            if (onlinePlugins != null)
107                onlinePlugins.Clear();
108            if (alreadyUploadedPlugins != null)
109                alreadyUploadedPlugins.Clear();
110        }
111
112        #region Refresh
113        public void Refresh()
114        {
115            OnRefreshing();
116
117            try
118            {
119                jobs = new HiveItemCollection<RefreshableJob>();
120                var jobsLoaded = serviceLocator.CallHiveService<IEnumerable<Job>>(s => s.GetJobs());
121
122                foreach (var j in jobsLoaded)
123                {
124                    jobs.Add(new RefreshableJob(j));
125                }
126            }
127            catch
128            {
129                jobs = null;
130                throw;
131            }
132            finally
133            {
134                OnRefreshed();
135            }
136        }
137
138        public void RefreshAsync(Action<Exception> exceptionCallback)
139        {
140            var call = new Func<Exception>(delegate ()
141            {
142                try
143                {
144                    Refresh();
145                }
146                catch (Exception ex)
147                {
148                    return ex;
149                }
150                return null;
151            });
152            call.BeginInvoke(delegate (IAsyncResult result)
153            {
154                Exception ex = call.EndInvoke(result);
155                if (ex != null) exceptionCallback(ex);
156            }, null);
157        }
158        #endregion
159
160        #region Store
161        public void Store(IHiveItem item, CancellationToken cancellationToken)
162        {
163            if (item.Id == Guid.Empty)
164            {
165                if (item is RefreshableJob)
166                {
167                    this.UploadJob((RefreshableJob)item, cancellationToken);
168                }
169                if (item is JobPermission)
170                {
171                    var hep = (JobPermission)item;
172                    hep.GrantedUserId = serviceLocator.CallHiveService((s) => s.GetUserIdByUsername(hep.GrantedUserName));
173                    if (hep.GrantedUserId == Guid.Empty)
174                    {
175                        throw new ArgumentException(string.Format("The user {0} was not found.", hep.GrantedUserName));
176                    }
177                    serviceLocator.CallHiveService((s) => s.GrantPermission(hep.JobId, hep.GrantedUserId, hep.Permission));
178                }
179            }
180            else {
181                if (item is Job)
182                    serviceLocator.CallHiveService(s => s.UpdateJob((Job)item));
183            }
184        }
185        public void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken)
186        {
187            var call = new Func<Exception>(delegate ()
188            {
189                try
190                {
191                    Store(item, cancellationToken);
192                }
193                catch (Exception ex)
194                {
195                    return ex;
196                }
197                return null;
198            });
199            call.BeginInvoke(delegate (IAsyncResult result)
200            {
201                Exception ex = call.EndInvoke(result);
202                if (ex != null) exceptionCallback(ex);
203            }, null);
204        }
205        #endregion
206
207        #region Delete
208        public void Delete(IHiveItem item)
209        {
210            if (item.Id == Guid.Empty && item.GetType() != typeof(JobPermission))
211                return;
212
213            if (item is Job)
214                serviceLocator.CallHiveService(s => s.DeleteJob(item.Id));
215            if (item is RefreshableJob)
216            {
217                RefreshableJob job = (RefreshableJob)item;
218                if (job.RefreshAutomatically)
219                {
220                    job.StopResultPolling();
221                }
222                serviceLocator.CallHiveService(s => s.DeleteJob(item.Id));
223            }
224            if (item is JobPermission)
225            {
226                var hep = (JobPermission)item;
227                serviceLocator.CallHiveService(s => s.RevokePermission(hep.JobId, hep.GrantedUserId));
228            }
229            item.Id = Guid.Empty;
230        }
231        #endregion
232
233        #region Events
234        public event EventHandler Refreshing;
235        private void OnRefreshing()
236        {
237            EventHandler handler = Refreshing;
238            if (handler != null) handler(this, EventArgs.Empty);
239        }
240        public event EventHandler Refreshed;
241        private void OnRefreshed()
242        {
243            var handler = Refreshed;
244            if (handler != null) handler(this, EventArgs.Empty);
245        }
246        public event EventHandler HiveJobsChanged;
247        private void OnHiveJobsChanged()
248        {
249            var handler = HiveJobsChanged;
250            if (handler != null) handler(this, EventArgs.Empty);
251        }
252        #endregion
253
254        public void StartJob(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken)
255        {
256            this.StoreAsync(
257              new Action<Exception>((Exception ex) =>
258              {
259
260                  exceptionCallback(ex);
261              }), refreshableJob, cancellationToken);
262
263        }
264
265        public void PauseJob(RefreshableJob refreshableJob)
266        {
267            serviceLocator.CallHiveService(service =>
268            {
269                foreach (HiveTask task in refreshableJob.GetAllHiveTasks())
270                {
271                    if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
272                        service.PauseTask(task.Task.Id);
273                }
274            });
275        }
276
277        public void StopJob(RefreshableJob refreshableJob)
278        {
279            serviceLocator.CallHiveService(service =>
280            {
281                foreach (HiveTask task in refreshableJob.GetAllHiveTasks())
282                {
283                    if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
284                        service.StopTask(task.Task.Id);
285                }
286            });
287        }
288
289        public  void ResumeJob(RefreshableJob refreshableJob)
290        {
291            serviceLocator.CallHiveService(service =>
292            {
293                foreach (HiveTask task in refreshableJob.GetAllHiveTasks())
294                {
295                    if (task.Task.State == TaskState.Paused)
296                    {
297                        service.RestartTask(task.Task.Id);
298                    }
299                }
300            });
301        }
302
303        #region Upload Job
304        private Semaphore taskUploadSemaphore = new Semaphore(5, 5);
305        private static object jobCountLocker = new object();
306        private static object pluginLocker = new object();
307       
308        private void UploadJob(RefreshableJob refreshableJob, CancellationToken cancellationToken)
309        {
310            try
311            {
312                refreshableJob.IsProgressing = true;
313                refreshableJob.Progress.Start("Connecting to server...");
314                IEnumerable<string> resourceNames = ToResourceNameList(refreshableJob.Job.ResourceNames);
315                var resourceIds = new List<Guid>();
316                foreach (var resourceName in resourceNames)
317                {
318                    Guid resourceId = serviceLocator.CallHiveService((s) => s.GetResourceId(resourceName));
319                    if (resourceId == Guid.Empty)
320                    {
321                        throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
322                    }
323                    resourceIds.Add(resourceId);
324                }
325
326                foreach (OptimizerHiveTask hiveJob in refreshableJob.HiveTasks.OfType<OptimizerHiveTask>())
327                {
328                    hiveJob.SetIndexInParentOptimizerList(null);
329                }
330
331                // upload Job
332                refreshableJob.Progress.Status = "Uploading Job...";
333                refreshableJob.Job.Id = serviceLocator.CallHiveService((s) => s.AddJob(refreshableJob.Job));
334                refreshableJob.Job = serviceLocator.CallHiveService((s) => s.GetJob(refreshableJob.Job.Id)); // update owner and permissions
335                cancellationToken.ThrowIfCancellationRequested();
336
337                int totalJobCount = refreshableJob.GetAllHiveTasks().Count();
338                int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library
339                cancellationToken.ThrowIfCancellationRequested();
340
341                // upload plugins
342                refreshableJob.Progress.Status = "Uploading plugins...";
343                this.OnlinePlugins = serviceLocator.CallHiveService((s) => s.GetPlugins());
344                this.AlreadyUploadedPlugins = new List<Plugin>();
345                Plugin configFilePlugin = serviceLocator.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins));
346                this.alreadyUploadedPlugins.Add(configFilePlugin);
347                cancellationToken.ThrowIfCancellationRequested();
348
349                // upload tasks
350                refreshableJob.Progress.Status = "Uploading tasks...";
351
352                var tasks = new List<TS.Task>();
353                foreach (HiveTask hiveTask in refreshableJob.HiveTasks)
354                {
355                    var task = TS.Task.Factory.StartNew((hj) =>
356                    {
357                        UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, cancellationToken);
358                    }, hiveTask);
359                    task.ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
360                    tasks.Add(task);
361                }
362                TS.Task.WaitAll(tasks.ToArray());
363            }
364            finally
365            {
366                refreshableJob.IsProgressing = false;
367                refreshableJob.Progress.Finish();
368            }
369        }
370
371        /// <summary>
372        /// Uploads the local configuration file as plugin
373        /// </summary>
374        private Plugin UploadConfigurationFile(IHiveService service, List<Plugin> onlinePlugins)
375        {
376            string exeFilePath = Path.Combine( CurrentEnv.WebRootPath, "bin"  , HeuristicLab.Clients.Hive.Settings.Default.HLBinaryName);
377            string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
378            string configFilePath = ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath;
379            byte[] hash;
380
381            byte[] data = File.ReadAllBytes(configFilePath);
382            using (SHA1 sha1 = SHA1.Create())
383            {
384                hash = sha1.ComputeHash(data);
385            }
386
387            Plugin configPlugin = new Plugin() { Name = "Configuration", Version = new Version(), Hash = hash };
388            PluginData configFile = new PluginData() { FileName = configFileName, Data = data };
389
390            IEnumerable<Plugin> onlineConfig = onlinePlugins.Where(p => p.Hash.SequenceEqual(hash));
391
392            if (onlineConfig.Count() > 0)
393            {
394                return onlineConfig.First();
395            }
396            else {
397                configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
398                return configPlugin;
399            }
400        }
401
402        /// <summary>
403        /// Uploads the given task and all its child-jobs while setting the proper parentJobId values for the childs
404        /// </summary>
405        /// <param name="parentHiveTask">shall be null if its the root task</param>
406        private void UploadTaskWithChildren(IProgress progress, HiveTask hiveTask, HiveTask parentHiveTask, IEnumerable<Guid> groups, int[] taskCount, int totalJobCount, Guid configPluginId, Guid jobId, ILog log, CancellationToken cancellationToken)
407        {
408            taskUploadSemaphore.WaitOne();
409            bool semaphoreReleased = false;
410            try
411            {
412                cancellationToken.ThrowIfCancellationRequested();
413                lock (jobCountLocker)
414                {
415                    taskCount[0]++;
416                }
417                TaskData taskData;
418                List<IPluginDescription> plugins;
419
420                if (hiveTask.ItemTask.ComputeInParallel)
421                {
422                    hiveTask.Task.IsParentTask = true;
423                    hiveTask.Task.FinishWhenChildJobsFinished = true;
424                    taskData = hiveTask.GetAsTaskData(true, out plugins);
425                }
426                else {
427                    hiveTask.Task.IsParentTask = false;
428                    hiveTask.Task.FinishWhenChildJobsFinished = false;
429                    taskData = hiveTask.GetAsTaskData(false, out plugins);
430                }
431                cancellationToken.ThrowIfCancellationRequested();
432
433                TryAndRepeat(() =>
434                {
435                    if (!cancellationToken.IsCancellationRequested)
436                    {
437                        lock (pluginLocker)
438                        {
439                            serviceLocator.CallHiveService((s) => hiveTask.Task.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));
440                        }
441                    }
442                }, 5, "Failed to upload plugins");
443                cancellationToken.ThrowIfCancellationRequested();
444                hiveTask.Task.PluginsNeededIds.Add(configPluginId);
445                hiveTask.Task.JobId = jobId;
446
447                log.LogMessage(string.Format("Uploading task ({0} kb, {1} objects)", taskData.Data.Count() / 1024, hiveTask.ItemTask.GetObjectGraphObjects().Count()));
448                TryAndRepeat(() =>
449                {
450                    if (!cancellationToken.IsCancellationRequested)
451                    {
452                        if (parentHiveTask != null)
453                        {
454                            hiveTask.Task.Id = serviceLocator.CallHiveService((s) => s.AddChildTask(parentHiveTask.Task.Id, hiveTask.Task, taskData));
455                        }
456                        else {
457                            hiveTask.Task.Id = serviceLocator.CallHiveService((s) => s.AddTask(hiveTask.Task, taskData, groups.ToList()));
458                        }
459                    }
460                }, 5, "Failed to add task", log);
461                cancellationToken.ThrowIfCancellationRequested();
462
463                lock (jobCountLocker)
464                {
465                    progress.ProgressValue = (double)taskCount[0] / totalJobCount;
466                    progress.Status = string.Format("Uploaded task ({0} of {1})", taskCount[0], totalJobCount);
467                }
468
469                var tasks = new List<TS.Task>();
470                foreach (HiveTask child in hiveTask.ChildHiveTasks)
471                {
472                    var task = TS.Task.Factory.StartNew((tuple) =>
473                    {
474                        var arguments = (Tuple<HiveTask, HiveTask>)tuple;
475                        UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, groups, taskCount, totalJobCount, configPluginId, jobId, log, cancellationToken);
476                    }, new Tuple<HiveTask, HiveTask>(child, hiveTask));
477                    task.ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
478                    tasks.Add(task);
479                }
480                taskUploadSemaphore.Release(); semaphoreReleased = true; // the semaphore has to be release before waitall!
481                TS.Task.WaitAll(tasks.ToArray());
482            }
483            finally
484            {
485                if (!semaphoreReleased) taskUploadSemaphore.Release();
486            }
487        }
488        #endregion
489
490        #region Download Experiment
491        public void LoadJob(RefreshableJob refreshableJob)
492        {
493            var hiveExperiment = refreshableJob.Job;
494            refreshableJob.IsProgressing = true;
495            TaskDownloaderWeb downloader = null;
496
497            try
498            {
499                int totalJobCount = 0;
500                IEnumerable<LightweightTask> allTasks;
501
502                // fetch all task objects to create the full tree of tree of HiveTask objects
503                refreshableJob.Progress.Start("Downloading list of tasks...");
504                allTasks = serviceLocator.CallHiveService(s => s.GetLightweightJobTasksWithoutStateLog(hiveExperiment.Id));
505                totalJobCount = allTasks.Count();
506
507                refreshableJob.Progress.Status = "Downloading tasks...";
508                downloader = new TaskDownloaderWeb(allTasks.Select(x => x.Id), serviceLocator);
509                downloader.StartAsync();
510
511                while (!downloader.IsFinished)
512                {
513                    refreshableJob.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
514                    refreshableJob.Progress.Status = string.Format("Downloading/deserializing tasks... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
515                    Thread.Sleep(500);
516
517                    if (downloader.IsFaulted)
518                    {
519                        throw downloader.Exception;
520                    }
521                }
522                IDictionary<Guid, HiveTask> allHiveTasks = downloader.Results;
523                var parents = allHiveTasks.Values.Where(x => !x.Task.ParentTaskId.HasValue);
524
525                refreshableJob.Progress.Status = "Downloading/deserializing complete. Displaying tasks...";
526                // build child-task tree
527                foreach (HiveTask hiveTask in parents)
528                {
529                    BuildHiveJobTree(hiveTask, allTasks, allHiveTasks);
530                }
531
532                refreshableJob.HiveTasks = new ItemCollection<HiveTask>(parents);
533                refreshableJob.OnLoaded();
534            }
535            finally
536            {
537                refreshableJob.IsProgressing = false;
538                refreshableJob.Progress.Finish();
539                if (downloader != null)
540                {
541                    downloader.Dispose();
542                }
543            }
544        }
545
546        private static void BuildHiveJobTree(HiveTask parentHiveTask, IEnumerable<LightweightTask> allTasks, IDictionary<Guid, HiveTask> allHiveTasks)
547        {
548            IEnumerable<LightweightTask> childTasks = from job in allTasks
549                                                      where job.ParentTaskId.HasValue && job.ParentTaskId.Value == parentHiveTask.Task.Id
550                                                      orderby job.DateCreated ascending
551                                                      select job;
552            foreach (LightweightTask task in childTasks)
553            {
554                HiveTask childHiveTask = allHiveTasks[task.Id];
555                BuildHiveJobTree(childHiveTask, allTasks, allHiveTasks);
556                parentHiveTask.AddChildHiveTask(childHiveTask);
557            }
558        }
559        #endregion
560
561        /// <summary>
562        /// Converts a string which can contain Ids separated by ';' to a enumerable
563        /// </summary>
564        private static IEnumerable<string> ToResourceNameList(string resourceNames)
565        {
566            if (!string.IsNullOrEmpty(resourceNames))
567            {
568                return resourceNames.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries);
569            }
570            else {
571                return new List<string>();
572            }
573        }
574
575        public ItemTask LoadItemJob(Guid jobId)
576        {
577            TaskData taskData = serviceLocator.CallHiveService(s => s.GetTaskData(jobId));
578            try
579            {
580                return PersistenceUtil.Deserialize<ItemTask>(taskData.Data);
581            }
582            catch
583            {
584                return null;
585            }
586        }
587
588        /// <summary>
589        /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
590        /// If repetitions is -1, it is repeated infinitely.
591        /// </summary>
592        public static void TryAndRepeat(Action action, int repetitions, string errorMessage, ILog log = null)
593        {
594            while (true)
595            {
596                try { action(); return; }
597                catch (Exception e)
598                {
599                    if (repetitions == 0) throw new HiveException(errorMessage, e);
600                    if (log != null) log.LogMessage(string.Format("{0}: {1} - will try again!", errorMessage, e.ToString()));
601                    repetitions--;
602                }
603            }
604        }
605
606        public  HiveItemCollection<JobPermission> GetJobPermissions(Guid jobId)
607        {
608            return serviceLocator.CallHiveService((service) =>
609            {
610                IEnumerable<JobPermission> jps = service.GetJobPermissions(jobId);
611                foreach (var hep in jps)
612                {
613                    hep.UnmodifiedGrantedUserNameUpdate(service.GetUsernameByUserId(hep.GrantedUserId));
614                }
615                return new HiveItemCollection<JobPermission>(jps);
616            });
617        }
618    }
619}
Note: See TracBrowser for help on using the repository browser.