Free cookie consent management tool by TermsFeed Policy Generator

source: branches/WebJobManager/HeuristicLab.Clients.Hive.WebJobManager/Services/Imports/HiveClientWeb.cs @ 13860

Last change on this file since 13860 was 13860, checked in by jlodewyc, 8 years ago

#2582 RC2 migration fixed. OKB query implemented. Preparing for OKB manager

File size: 24.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2015 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Configuration;
25using System.IO;
26using System.Linq;
27using System.Security.Cryptography;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32using HeuristicLab.MainForm;
33using HeuristicLab.PluginInfrastructure;
34using TS = System.Threading.Tasks;
35using HeuristicLab.Clients.Hive.WebJobManager.Services;
36using HeuristicLab.Clients.Common.Properties;
37using Microsoft.AspNetCore.Hosting;
38using HeuristicLab.Clients.Hive.WebJobManager.Services.Imports;
39
40namespace HeuristicLab.Clients.Hive.WebJobManager
41{
42    /// <summary>
43    /// Rewritten to use the HiveServiceLocatorWeb
44    /// </summary>
45    [Item("HiveClientWeb", "Hive client.")]
46    public sealed class HiveClientWeb : IContent
47    {
48       
49        public IHostingEnvironment CurrentEnv { get; set; } //Current building area
50        #region Properties
51        private HiveItemCollection<RefreshableJob> jobs;
52        public HiveItemCollection<RefreshableJob> Jobs
53        {
54            get { return jobs; }
55            set
56            {
57                if (value != jobs)
58                {
59                    jobs = value;
60                    OnHiveJobsChanged();
61                }
62            }
63        }
64
65        private List<Plugin> onlinePlugins;
66        public List<Plugin> OnlinePlugins
67        {
68            get { return onlinePlugins; }
69            set { onlinePlugins = value; }
70        }
71
72        private List<Plugin> alreadyUploadedPlugins;
73        public List<Plugin> AlreadyUploadedPlugins
74        {
75            get { return alreadyUploadedPlugins; }
76            set { alreadyUploadedPlugins = value; }
77        }
78        #endregion
79
80        private HiveServiceLocatorWeb serviceLocator;
81        public  Guid UserId { get; set; }
82        public HiveClientWeb(HiveServiceLocatorWeb serv, Guid u)
83        {
84            serviceLocator = serv;
85            UserId = u;
86        }
87        /// <summary>
88        /// Reaches to the webloginservice to get the current instance of the servicelocator.
89        /// </summary>
90        public void updateServiceLocator()
91        {// No idea if needed
92            serviceLocator = WebLoginService.Instance.getServiceLocator(UserId);
93        }
94
95
96        public void ClearHiveClientWeb()
97        {
98            Jobs.ClearWithoutHiveDeletion();
99            foreach (var j in Jobs)
100            {
101                if (j.RefreshAutomatically)
102                {
103                    j.RefreshAutomatically = false; // stop result polling
104                }
105                j.Dispose();
106            }
107            Jobs = null;
108
109            if (onlinePlugins != null)
110                onlinePlugins.Clear();
111            if (alreadyUploadedPlugins != null)
112                alreadyUploadedPlugins.Clear();
113        }
114
115        #region Refresh
116        public void Refresh()
117        {
118            OnRefreshing();
119
120            try
121            {
122                jobs = new HiveItemCollection<RefreshableJob>();
123                var jobsLoaded = serviceLocator.CallHiveService<IEnumerable<Job>>(s => s.GetJobs());
124
125                foreach (var j in jobsLoaded)
126                {
127                    jobs.Add(new RefreshableJob(j));
128                }
129            }
130            catch
131            {
132                jobs = null;
133                throw;
134            }
135            finally
136            {
137                OnRefreshed();
138            }
139        }
140
141        public void RefreshAsync(Action<Exception> exceptionCallback)
142        {
143            var call = new Func<Exception>(delegate ()
144            {
145                try
146                {
147                    Refresh();
148                }
149                catch (Exception ex)
150                {
151                    return ex;
152                }
153                return null;
154            });
155            call.BeginInvoke(delegate (IAsyncResult result)
156            {
157                Exception ex = call.EndInvoke(result);
158                if (ex != null) exceptionCallback(ex);
159            }, null);
160        }
161        #endregion
162
163        #region Store
164        public void Store(IHiveItem item, CancellationToken cancellationToken)
165        {
166            if (item.Id == Guid.Empty)
167            {
168                if (item is RefreshableJob)
169                {
170                    this.UploadJob((RefreshableJob)item, cancellationToken);
171                }
172                if (item is JobPermission)
173                {
174                    var hep = (JobPermission)item;
175                    hep.GrantedUserId = serviceLocator.CallHiveService((s) => s.GetUserIdByUsername(hep.GrantedUserName));
176                    if (hep.GrantedUserId == Guid.Empty)
177                    {
178                        throw new ArgumentException(string.Format("The user {0} was not found.", hep.GrantedUserName));
179                    }
180                    serviceLocator.CallHiveService((s) => s.GrantPermission(hep.JobId, hep.GrantedUserId, hep.Permission));
181                }
182            }
183            else {
184                if (item is Job)
185                    serviceLocator.CallHiveService(s => s.UpdateJob((Job)item));
186            }
187        }
188        public void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken)
189        {
190            var call = new Func<Exception>(delegate ()
191            {
192                try
193                {
194                    Store(item, cancellationToken);
195                }
196                catch (Exception ex)
197                {
198                    return ex;
199                }
200                return null;
201            });
202            call.BeginInvoke(delegate (IAsyncResult result)
203            {
204                Exception ex = call.EndInvoke(result);
205                if (ex != null) exceptionCallback(ex);
206            }, null);
207        }
208        #endregion
209
210        #region Delete
211        public void Delete(IHiveItem item)
212        {
213            if (item.Id == Guid.Empty && item.GetType() != typeof(JobPermission))
214                return;
215
216            if (item is Job)
217                serviceLocator.CallHiveService(s => s.DeleteJob(item.Id));
218            if (item is RefreshableJob)
219            {
220                RefreshableJob job = (RefreshableJob)item;
221                if (job.RefreshAutomatically)
222                {
223                    job.StopResultPolling();
224                }
225                serviceLocator.CallHiveService(s => s.DeleteJob(item.Id));
226            }
227            if (item is JobPermission)
228            {
229                var hep = (JobPermission)item;
230                serviceLocator.CallHiveService(s => s.RevokePermission(hep.JobId, hep.GrantedUserId));
231            }
232            item.Id = Guid.Empty;
233        }
234        #endregion
235
236        #region Events
237        public event EventHandler Refreshing;
238        private void OnRefreshing()
239        {
240            EventHandler handler = Refreshing;
241            if (handler != null) handler(this, EventArgs.Empty);
242        }
243        public event EventHandler Refreshed;
244        private void OnRefreshed()
245        {
246            var handler = Refreshed;
247            if (handler != null) handler(this, EventArgs.Empty);
248        }
249        public event EventHandler HiveJobsChanged;
250        private void OnHiveJobsChanged()
251        {
252            var handler = HiveJobsChanged;
253            if (handler != null) handler(this, EventArgs.Empty);
254        }
255        #endregion
256
257        public void StartJob(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken)
258        {
259            this.StoreAsync(
260              new Action<Exception>((Exception ex) =>
261              {
262
263                  exceptionCallback(ex);
264              }), refreshableJob, cancellationToken);
265
266        }
267
268        public void PauseJob(RefreshableJob refreshableJob)
269        {
270            serviceLocator.CallHiveService(service =>
271            {
272                foreach (HiveTask task in refreshableJob.GetAllHiveTasks())
273                {
274                    if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
275                        service.PauseTask(task.Task.Id);
276                }
277            });
278        }
279
280        public void StopJob(RefreshableJob refreshableJob)
281        {
282            serviceLocator.CallHiveService(service =>
283            {
284                foreach (HiveTask task in refreshableJob.GetAllHiveTasks())
285                {
286                    if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
287                        service.StopTask(task.Task.Id);
288                }
289            });
290        }
291
292        public  void ResumeJob(RefreshableJob refreshableJob)
293        {
294            serviceLocator.CallHiveService(service =>
295            {
296                foreach (HiveTask task in refreshableJob.GetAllHiveTasks())
297                {
298                    if (task.Task.State == TaskState.Paused)
299                    {
300                        service.RestartTask(task.Task.Id);
301                    }
302                }
303            });
304        }
305
306        #region Upload Job
307        private Semaphore taskUploadSemaphore = new Semaphore(5, 5);
308        private static object jobCountLocker = new object();
309        private static object pluginLocker = new object();
310       
311        private void UploadJob(RefreshableJob refreshableJob, CancellationToken cancellationToken)
312        {
313            try
314            {
315                refreshableJob.IsProgressing = true;
316                refreshableJob.Progress.Start("Connecting to server...");
317                IEnumerable<string> resourceNames = ToResourceNameList(refreshableJob.Job.ResourceNames);
318                var resourceIds = new List<Guid>();
319                foreach (var resourceName in resourceNames)
320                {
321                    Guid resourceId = serviceLocator.CallHiveService((s) => s.GetResourceId(resourceName));
322                    if (resourceId == Guid.Empty)
323                    {
324                        throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
325                    }
326                    resourceIds.Add(resourceId);
327                }
328
329                foreach (OptimizerHiveTask hiveJob in refreshableJob.HiveTasks.OfType<OptimizerHiveTask>())
330                {
331                    hiveJob.SetIndexInParentOptimizerList(null);
332                }
333
334                // upload Job
335                refreshableJob.Progress.Status = "Uploading Job...";
336                refreshableJob.Job.Id = serviceLocator.CallHiveService((s) => s.AddJob(refreshableJob.Job));
337                refreshableJob.Job = serviceLocator.CallHiveService((s) => s.GetJob(refreshableJob.Job.Id)); // update owner and permissions
338                cancellationToken.ThrowIfCancellationRequested();
339
340                int totalJobCount = refreshableJob.GetAllHiveTasks().Count();
341                int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library
342                cancellationToken.ThrowIfCancellationRequested();
343
344                // upload plugins
345                refreshableJob.Progress.Status = "Uploading plugins...";
346                this.OnlinePlugins = serviceLocator.CallHiveService((s) => s.GetPlugins());
347                this.AlreadyUploadedPlugins = new List<Plugin>();
348                Plugin configFilePlugin = serviceLocator.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins));
349                this.alreadyUploadedPlugins.Add(configFilePlugin);
350                cancellationToken.ThrowIfCancellationRequested();
351
352                // upload tasks
353                refreshableJob.Progress.Status = "Uploading tasks...";
354
355                var tasks = new List<TS.Task>();
356                foreach (HiveTask hiveTask in refreshableJob.HiveTasks)
357                {
358                    var task = TS.Task.Factory.StartNew((hj) =>
359                    {
360                        UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, cancellationToken);
361                    }, hiveTask);
362                    task.ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
363                    tasks.Add(task);
364                }
365                TS.Task.WaitAll(tasks.ToArray());
366            }
367            finally
368            {
369                refreshableJob.IsProgressing = false;
370                refreshableJob.Progress.Finish();
371            }
372        }
373
374        /// <summary>
375        /// Uploads the local configuration file as plugin
376        /// </summary>
377        private Plugin UploadConfigurationFile(IHiveService service, List<Plugin> onlinePlugins)
378        {
379            string exeFilePath = Path.Combine( CurrentEnv.WebRootPath, "bin"  , HeuristicLab.Clients.Hive.Settings.Default.HLBinaryName);
380            string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
381            string configFilePath = ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath;
382            byte[] hash;
383
384            byte[] data = File.ReadAllBytes(configFilePath);
385            using (SHA1 sha1 = SHA1.Create())
386            {
387                hash = sha1.ComputeHash(data);
388            }
389
390            Plugin configPlugin = new Plugin() { Name = "Configuration", Version = new Version(), Hash = hash };
391            PluginData configFile = new PluginData() { FileName = configFileName, Data = data };
392
393            IEnumerable<Plugin> onlineConfig = onlinePlugins.Where(p => p.Hash.SequenceEqual(hash));
394
395            if (onlineConfig.Count() > 0)
396            {
397                return onlineConfig.First();
398            }
399            else {
400                configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
401                return configPlugin;
402            }
403        }
404
405        /// <summary>
406        /// Uploads the given task and all its child-jobs while setting the proper parentJobId values for the childs
407        /// </summary>
408        /// <param name="parentHiveTask">shall be null if its the root task</param>
409        private void UploadTaskWithChildren(IProgress progress, HiveTask hiveTask, HiveTask parentHiveTask, IEnumerable<Guid> groups, int[] taskCount, int totalJobCount, Guid configPluginId, Guid jobId, ILog log, CancellationToken cancellationToken)
410        {
411            taskUploadSemaphore.WaitOne();
412            bool semaphoreReleased = false;
413            try
414            {
415                cancellationToken.ThrowIfCancellationRequested();
416                lock (jobCountLocker)
417                {
418                    taskCount[0]++;
419                }
420                TaskData taskData;
421                List<IPluginDescription> plugins;
422
423                if (hiveTask.ItemTask.ComputeInParallel)
424                {
425                    hiveTask.Task.IsParentTask = true;
426                    hiveTask.Task.FinishWhenChildJobsFinished = true;
427                    taskData = hiveTask.GetAsTaskData(true, out plugins);
428                }
429                else {
430                    hiveTask.Task.IsParentTask = false;
431                    hiveTask.Task.FinishWhenChildJobsFinished = false;
432                    taskData = hiveTask.GetAsTaskData(false, out plugins);
433                }
434                cancellationToken.ThrowIfCancellationRequested();
435
436                TryAndRepeat(() =>
437                {
438                    if (!cancellationToken.IsCancellationRequested)
439                    {
440                        lock (pluginLocker)
441                        {
442                            serviceLocator.CallHiveService((s) => hiveTask.Task.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));
443                        }
444                    }
445                }, 5, "Failed to upload plugins");
446                cancellationToken.ThrowIfCancellationRequested();
447                hiveTask.Task.PluginsNeededIds.Add(configPluginId);
448                hiveTask.Task.JobId = jobId;
449
450                log.LogMessage(string.Format("Uploading task ({0} kb, {1} objects)", taskData.Data.Count() / 1024, hiveTask.ItemTask.GetObjectGraphObjects().Count()));
451                TryAndRepeat(() =>
452                {
453                    if (!cancellationToken.IsCancellationRequested)
454                    {
455                        if (parentHiveTask != null)
456                        {
457                            hiveTask.Task.Id = serviceLocator.CallHiveService((s) => s.AddChildTask(parentHiveTask.Task.Id, hiveTask.Task, taskData));
458                        }
459                        else {
460                            hiveTask.Task.Id = serviceLocator.CallHiveService((s) => s.AddTask(hiveTask.Task, taskData, groups.ToList()));
461                        }
462                    }
463                }, 5, "Failed to add task", log);
464                cancellationToken.ThrowIfCancellationRequested();
465
466                lock (jobCountLocker)
467                {
468                    progress.ProgressValue = (double)taskCount[0] / totalJobCount;
469                    progress.Status = string.Format("Uploaded task ({0} of {1})", taskCount[0], totalJobCount);
470                }
471
472                var tasks = new List<TS.Task>();
473                foreach (HiveTask child in hiveTask.ChildHiveTasks)
474                {
475                    var task = TS.Task.Factory.StartNew((tuple) =>
476                    {
477                        var arguments = (Tuple<HiveTask, HiveTask>)tuple;
478                        UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, groups, taskCount, totalJobCount, configPluginId, jobId, log, cancellationToken);
479                    }, new Tuple<HiveTask, HiveTask>(child, hiveTask));
480                    task.ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
481                    tasks.Add(task);
482                }
483                taskUploadSemaphore.Release(); semaphoreReleased = true; // the semaphore has to be release before waitall!
484                TS.Task.WaitAll(tasks.ToArray());
485            }
486            finally
487            {
488                if (!semaphoreReleased) taskUploadSemaphore.Release();
489            }
490        }
491        #endregion
492
493        #region Download Experiment
494        public void LoadJob(RefreshableJob refreshableJob)
495        {
496            var hiveExperiment = refreshableJob.Job;
497            refreshableJob.IsProgressing = true;
498            TaskDownloaderWeb downloader = null;
499
500            try
501            {
502                int totalJobCount = 0;
503                IEnumerable<LightweightTask> allTasks;
504
505                // fetch all task objects to create the full tree of tree of HiveTask objects
506                refreshableJob.Progress.Start("Downloading list of tasks...");
507                allTasks = serviceLocator.CallHiveService(s => s.GetLightweightJobTasksWithoutStateLog(hiveExperiment.Id));
508                totalJobCount = allTasks.Count();
509
510                refreshableJob.Progress.Status = "Downloading tasks...";
511                downloader = new TaskDownloaderWeb(allTasks.Select(x => x.Id), serviceLocator);
512                downloader.StartAsync();
513
514                while (!downloader.IsFinished)
515                {
516                    refreshableJob.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
517                    refreshableJob.Progress.Status = string.Format("Downloading/deserializing tasks... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
518                    Thread.Sleep(500);
519
520                    if (downloader.IsFaulted)
521                    {
522                        throw downloader.Exception;
523                    }
524                }
525                IDictionary<Guid, HiveTask> allHiveTasks = downloader.Results;
526                var parents = allHiveTasks.Values.Where(x => !x.Task.ParentTaskId.HasValue);
527
528                refreshableJob.Progress.Status = "Downloading/deserializing complete. Displaying tasks...";
529                // build child-task tree
530                foreach (HiveTask hiveTask in parents)
531                {
532                    BuildHiveJobTree(hiveTask, allTasks, allHiveTasks);
533                }
534
535                refreshableJob.HiveTasks = new ItemCollection<HiveTask>(parents);
536                refreshableJob.OnLoaded();
537            }
538            finally
539            {
540                refreshableJob.IsProgressing = false;
541                refreshableJob.Progress.Finish();
542                if (downloader != null)
543                {
544                    downloader.Dispose();
545                }
546            }
547        }
548
549        private static void BuildHiveJobTree(HiveTask parentHiveTask, IEnumerable<LightweightTask> allTasks, IDictionary<Guid, HiveTask> allHiveTasks)
550        {
551            IEnumerable<LightweightTask> childTasks = from job in allTasks
552                                                      where job.ParentTaskId.HasValue && job.ParentTaskId.Value == parentHiveTask.Task.Id
553                                                      orderby job.DateCreated ascending
554                                                      select job;
555            foreach (LightweightTask task in childTasks)
556            {
557                HiveTask childHiveTask = allHiveTasks[task.Id];
558                BuildHiveJobTree(childHiveTask, allTasks, allHiveTasks);
559                parentHiveTask.AddChildHiveTask(childHiveTask);
560            }
561        }
562        #endregion
563
564        /// <summary>
565        /// Converts a string which can contain Ids separated by ';' to a enumerable
566        /// </summary>
567        private static IEnumerable<string> ToResourceNameList(string resourceNames)
568        {
569            if (!string.IsNullOrEmpty(resourceNames))
570            {
571                return resourceNames.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries);
572            }
573            else {
574                return new List<string>();
575            }
576        }
577
578        public ItemTask LoadItemJob(Guid jobId)
579        {
580            TaskData taskData = serviceLocator.CallHiveService(s => s.GetTaskData(jobId));
581            try
582            {
583                return PersistenceUtil.Deserialize<ItemTask>(taskData.Data);
584            }
585            catch
586            {
587                return null;
588            }
589        }
590
591        /// <summary>
592        /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
593        /// If repetitions is -1, it is repeated infinitely.
594        /// </summary>
595        public static void TryAndRepeat(Action action, int repetitions, string errorMessage, ILog log = null)
596        {
597            while (true)
598            {
599                try { action(); return; }
600                catch (Exception e)
601                {
602                    if (repetitions == 0) throw new HiveException(errorMessage, e);
603                    if (log != null) log.LogMessage(string.Format("{0}: {1} - will try again!", errorMessage, e.ToString()));
604                    repetitions--;
605                }
606            }
607        }
608
609        public  HiveItemCollection<JobPermission> GetJobPermissions(Guid jobId)
610        {
611            return serviceLocator.CallHiveService((service) =>
612            {
613                IEnumerable<JobPermission> jps = service.GetJobPermissions(jobId);
614                foreach (var hep in jps)
615                {
616                    hep.UnmodifiedGrantedUserNameUpdate(service.GetUsernameByUserId(hep.GrantedUserId));
617                }
618                return new HiveItemCollection<JobPermission>(jps);
619            });
620        }
621    }
622}
Note: See TracBrowser for help on using the repository browser.