Free cookie consent management tool by TermsFeed Policy Generator

source: branches/WebJobManager/HeuristicLab.Clients.Hive.WebJobManager/Services/HiveClientWeb.cs @ 13733

Last change on this file since 13733 was 13712, checked in by jlodewyc, 8 years ago

#2582 Distribution childs and priority done. Display current jobs and start graphs

File size: 25.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2015 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Configuration;
25using System.IO;
26using System.Linq;
27using System.Security.Cryptography;
28using System.Threading;
29using System.Threading.Tasks;
30using HeuristicLab.Common;
31using HeuristicLab.Core;
32using HeuristicLab.MainForm;
33using HeuristicLab.PluginInfrastructure;
34using TS = System.Threading.Tasks;
35using HeuristicLab.Clients.Hive.WebJobManager.Services;
36using HeuristicLab.Clients.Common.Properties;
37using Microsoft.AspNet.Hosting;
38using HeuristicLab.Clients.Hive.WebJobManager.Services.Imports;
39
40namespace HeuristicLab.Clients.Hive.WebJobManager
41{
42    [Item("HiveClientWeb", "Hive client.")]
43
44
45    public sealed class HiveClientWeb : IContent
46    {
47        private static HiveClientWeb instance;
48        public static HiveClientWeb Instance
49        {
50            get
51            {
52                if (instance == null) instance = new HiveClientWeb();
53                return instance;
54            }
55        }
56        public static IHostingEnvironment CurrentEnv { get; set; } //Current building area
57        #region Properties
58        private HiveItemCollection<RefreshableJob> jobs;
59        public HiveItemCollection<RefreshableJob> Jobs
60        {
61            get { return jobs; }
62            set
63            {
64                if (value != jobs)
65                {
66                    jobs = value;
67                    OnHiveJobsChanged();
68                }
69            }
70        }
71
72        private List<Plugin> onlinePlugins;
73        public List<Plugin> OnlinePlugins
74        {
75            get { return onlinePlugins; }
76            set { onlinePlugins = value; }
77        }
78
79        private List<Plugin> alreadyUploadedPlugins;
80        public List<Plugin> AlreadyUploadedPlugins
81        {
82            get { return alreadyUploadedPlugins; }
83            set { alreadyUploadedPlugins = value; }
84        }
85        #endregion
86
87        private HiveClientWeb()
88        {
89        }
90
91        public void ClearHiveClientWeb()
92        {
93            Jobs.ClearWithoutHiveDeletion();
94            foreach (var j in Jobs)
95            {
96                if (j.RefreshAutomatically)
97                {
98                    j.RefreshAutomatically = false; // stop result polling
99                }
100                j.Dispose();
101            }
102            Jobs = null;
103
104            if (onlinePlugins != null)
105                onlinePlugins.Clear();
106            if (alreadyUploadedPlugins != null)
107                alreadyUploadedPlugins.Clear();
108        }
109
110        #region Refresh
111        public void Refresh()
112        {
113            OnRefreshing();
114
115            try
116            {
117                jobs = new HiveItemCollection<RefreshableJob>();
118                var jobsLoaded = HiveServiceLocatorWebManagerService.Instance.CallHiveService<IEnumerable<Job>>(s => s.GetJobs());
119
120                foreach (var j in jobsLoaded)
121                {
122                    jobs.Add(new RefreshableJob(j));
123                }
124            }
125            catch
126            {
127                jobs = null;
128                throw;
129            }
130            finally
131            {
132                OnRefreshed();
133            }
134        }
135
136        public void RefreshAsync(Action<Exception> exceptionCallback)
137        {
138            var call = new Func<Exception>(delegate ()
139            {
140                try
141                {
142                    Refresh();
143                }
144                catch (Exception ex)
145                {
146                    return ex;
147                }
148                return null;
149            });
150            call.BeginInvoke(delegate (IAsyncResult result)
151            {
152                Exception ex = call.EndInvoke(result);
153                if (ex != null) exceptionCallback(ex);
154            }, null);
155        }
156        #endregion
157
158        #region Store
159        public static void Store(IHiveItem item, CancellationToken cancellationToken)
160        {
161            if (item.Id == Guid.Empty)
162            {
163                if (item is RefreshableJob)
164                {
165                    HiveClientWeb.Instance.UploadJob((RefreshableJob)item, cancellationToken);
166                }
167                if (item is JobPermission)
168                {
169                    var hep = (JobPermission)item;
170                    hep.GrantedUserId = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.GetUserIdByUsername(hep.GrantedUserName));
171                    if (hep.GrantedUserId == Guid.Empty)
172                    {
173                        throw new ArgumentException(string.Format("The user {0} was not found.", hep.GrantedUserName));
174                    }
175                    HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.GrantPermission(hep.JobId, hep.GrantedUserId, hep.Permission));
176                }
177            }
178            else {
179                if (item is Job)
180                    HiveServiceLocatorWebManagerService.Instance.CallHiveService(s => s.UpdateJob((Job)item));
181            }
182        }
183        public static void StoreAsync(Action<Exception> exceptionCallback, IHiveItem item, CancellationToken cancellationToken)
184        {
185            var call = new Func<Exception>(delegate ()
186            {
187                try
188                {
189                    Store(item, cancellationToken);
190                }
191                catch (Exception ex)
192                {
193                    return ex;
194                }
195                return null;
196            });
197            call.BeginInvoke(delegate (IAsyncResult result)
198            {
199                Exception ex = call.EndInvoke(result);
200                if (ex != null) exceptionCallback(ex);
201            }, null);
202        }
203        #endregion
204
205        #region Delete
206        public static void Delete(IHiveItem item)
207        {
208            if (item.Id == Guid.Empty && item.GetType() != typeof(JobPermission))
209                return;
210
211            if (item is Job)
212                HiveServiceLocatorWebManagerService.Instance.CallHiveService(s => s.DeleteJob(item.Id));
213            if (item is RefreshableJob)
214            {
215                RefreshableJob job = (RefreshableJob)item;
216                if (job.RefreshAutomatically)
217                {
218                    job.StopResultPolling();
219                }
220                HiveServiceLocatorWebManagerService.Instance.CallHiveService(s => s.DeleteJob(item.Id));
221            }
222            if (item is JobPermission)
223            {
224                var hep = (JobPermission)item;
225                HiveServiceLocatorWebManagerService.Instance.CallHiveService(s => s.RevokePermission(hep.JobId, hep.GrantedUserId));
226            }
227            item.Id = Guid.Empty;
228        }
229        #endregion
230
231        #region Events
232        public event EventHandler Refreshing;
233        private void OnRefreshing()
234        {
235            EventHandler handler = Refreshing;
236            if (handler != null) handler(this, EventArgs.Empty);
237        }
238        public event EventHandler Refreshed;
239        private void OnRefreshed()
240        {
241            var handler = Refreshed;
242            if (handler != null) handler(this, EventArgs.Empty);
243        }
244        public event EventHandler HiveJobsChanged;
245        private void OnHiveJobsChanged()
246        {
247            var handler = HiveJobsChanged;
248            if (handler != null) handler(this, EventArgs.Empty);
249        }
250        #endregion
251
252        public static void StartJob(Action<Exception> exceptionCallback, RefreshableJob refreshableJob, CancellationToken cancellationToken)
253        {
254            HiveClientWeb.StoreAsync(
255              new Action<Exception>((Exception ex) =>
256              {
257
258                  exceptionCallback(ex);
259              }), refreshableJob, cancellationToken);
260
261        }
262
263        public static void PauseJob(RefreshableJob refreshableJob)
264        {
265            HiveServiceLocatorWebManagerService.Instance.CallHiveService(service =>
266            {
267                foreach (HiveTask task in refreshableJob.GetAllHiveTasks())
268                {
269                    if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
270                        service.PauseTask(task.Task.Id);
271                }
272            });
273        }
274
275        public static void StopJob(RefreshableJob refreshableJob)
276        {
277            HiveServiceLocatorWebManagerService.Instance.CallHiveService(service =>
278            {
279                foreach (HiveTask task in refreshableJob.GetAllHiveTasks())
280                {
281                    if (task.Task.State != TaskState.Finished && task.Task.State != TaskState.Aborted && task.Task.State != TaskState.Failed)
282                        service.StopTask(task.Task.Id);
283                }
284            });
285        }
286
287        public static void ResumeJob(RefreshableJob refreshableJob)
288        {
289            HiveServiceLocatorWebManagerService.Instance.CallHiveService(service =>
290            {
291                foreach (HiveTask task in refreshableJob.GetAllHiveTasks())
292                {
293                    if (task.Task.State == TaskState.Paused)
294                    {
295                        service.RestartTask(task.Task.Id);
296                    }
297                }
298            });
299        }
300
301        #region Upload Job
302        private Semaphore taskUploadSemaphore = new Semaphore(5, 5);
303        private static object jobCountLocker = new object();
304        private static object pluginLocker = new object();
305        private void UploadJob(RefreshableJob refreshableJob, CancellationToken cancellationToken)
306        {
307            try
308            {
309                refreshableJob.IsProgressing = true;
310                refreshableJob.Progress.Start("Connecting to server...");
311                IEnumerable<string> resourceNames = ToResourceNameList(refreshableJob.Job.ResourceNames);
312                var resourceIds = new List<Guid>();
313                foreach (var resourceName in resourceNames)
314                {
315                    Guid resourceId = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.GetResourceId(resourceName));
316                    if (resourceId == Guid.Empty)
317                    {
318                        throw new ResourceNotFoundException(string.Format("Could not find the resource '{0}'", resourceName));
319                    }
320                    resourceIds.Add(resourceId);
321                }
322
323                foreach (OptimizerHiveTask hiveJob in refreshableJob.HiveTasks.OfType<OptimizerHiveTask>())
324                {
325                    hiveJob.SetIndexInParentOptimizerList(null);
326                }
327
328                // upload Job
329                refreshableJob.Progress.Status = "Uploading Job...";
330                refreshableJob.Job.Id = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.AddJob(refreshableJob.Job));
331                refreshableJob.Job = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.GetJob(refreshableJob.Job.Id)); // update owner and permissions
332                cancellationToken.ThrowIfCancellationRequested();
333
334                int totalJobCount = refreshableJob.GetAllHiveTasks().Count();
335                int[] jobCount = new int[1]; // use a reference type (int-array) instead of value type (int) in order to pass the value via a delegate to task-parallel-library
336                cancellationToken.ThrowIfCancellationRequested();
337
338                // upload plugins
339                refreshableJob.Progress.Status = "Uploading plugins...";
340                this.OnlinePlugins = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.GetPlugins());
341                this.AlreadyUploadedPlugins = new List<Plugin>();
342                Plugin configFilePlugin = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => UploadConfigurationFile(s, onlinePlugins));
343                this.alreadyUploadedPlugins.Add(configFilePlugin);
344                cancellationToken.ThrowIfCancellationRequested();
345
346                // upload tasks
347                refreshableJob.Progress.Status = "Uploading tasks...";
348
349                var tasks = new List<TS.Task>();
350                foreach (HiveTask hiveTask in refreshableJob.HiveTasks)
351                {
352                    var task = TS.Task.Factory.StartNew((hj) =>
353                    {
354                        UploadTaskWithChildren(refreshableJob.Progress, (HiveTask)hj, null, resourceIds, jobCount, totalJobCount, configFilePlugin.Id, refreshableJob.Job.Id, refreshableJob.Log, cancellationToken);
355                    }, hiveTask);
356                    task.ContinueWith((x) => refreshableJob.Log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
357                    tasks.Add(task);
358                }
359                TS.Task.WaitAll(tasks.ToArray());
360            }
361            finally
362            {
363                refreshableJob.IsProgressing = false;
364                refreshableJob.Progress.Finish();
365            }
366        }
367
368        /// <summary>
369        /// Uploads the local configuration file as plugin
370        /// </summary>
371        private static Plugin UploadConfigurationFile(IHiveService service, List<Plugin> onlinePlugins)
372        {
373            string exeFilePath = Path.Combine( CurrentEnv.WebRootPath, "bin"  , HeuristicLab.Clients.Hive.Settings.Default.HLBinaryName);
374            string configFileName = Path.GetFileName(ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath);
375            string configFilePath = ConfigurationManager.OpenExeConfiguration(exeFilePath).FilePath;
376            byte[] hash;
377
378            byte[] data = File.ReadAllBytes(configFilePath);
379            using (SHA1 sha1 = SHA1.Create())
380            {
381                hash = sha1.ComputeHash(data);
382            }
383
384            Plugin configPlugin = new Plugin() { Name = "Configuration", Version = new Version(), Hash = hash };
385            PluginData configFile = new PluginData() { FileName = configFileName, Data = data };
386
387            IEnumerable<Plugin> onlineConfig = onlinePlugins.Where(p => p.Hash.SequenceEqual(hash));
388
389            if (onlineConfig.Count() > 0)
390            {
391                return onlineConfig.First();
392            }
393            else {
394                configPlugin.Id = service.AddPlugin(configPlugin, new List<PluginData> { configFile });
395                return configPlugin;
396            }
397        }
398
399        /// <summary>
400        /// Uploads the given task and all its child-jobs while setting the proper parentJobId values for the childs
401        /// </summary>
402        /// <param name="parentHiveTask">shall be null if its the root task</param>
403        private void UploadTaskWithChildren(IProgress progress, HiveTask hiveTask, HiveTask parentHiveTask, IEnumerable<Guid> groups, int[] taskCount, int totalJobCount, Guid configPluginId, Guid jobId, ILog log, CancellationToken cancellationToken)
404        {
405            taskUploadSemaphore.WaitOne();
406            bool semaphoreReleased = false;
407            try
408            {
409                cancellationToken.ThrowIfCancellationRequested();
410                lock (jobCountLocker)
411                {
412                    taskCount[0]++;
413                }
414                TaskData taskData;
415                List<IPluginDescription> plugins;
416
417                if (hiveTask.ItemTask.ComputeInParallel)
418                {
419                    hiveTask.Task.IsParentTask = true;
420                    hiveTask.Task.FinishWhenChildJobsFinished = true;
421                    taskData = hiveTask.GetAsTaskData(true, out plugins);
422                }
423                else {
424                    hiveTask.Task.IsParentTask = false;
425                    hiveTask.Task.FinishWhenChildJobsFinished = false;
426                    taskData = hiveTask.GetAsTaskData(false, out plugins);
427                }
428                cancellationToken.ThrowIfCancellationRequested();
429
430                TryAndRepeat(() =>
431                {
432                    if (!cancellationToken.IsCancellationRequested)
433                    {
434                        lock (pluginLocker)
435                        {
436                            HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => hiveTask.Task.PluginsNeededIds = PluginUtil.GetPluginDependencies(s, this.onlinePlugins, this.alreadyUploadedPlugins, plugins));
437                        }
438                    }
439                }, 5, "Failed to upload plugins");
440                cancellationToken.ThrowIfCancellationRequested();
441                hiveTask.Task.PluginsNeededIds.Add(configPluginId);
442                hiveTask.Task.JobId = jobId;
443
444                log.LogMessage(string.Format("Uploading task ({0} kb, {1} objects)", taskData.Data.Count() / 1024, hiveTask.ItemTask.GetObjectGraphObjects().Count()));
445                TryAndRepeat(() =>
446                {
447                    if (!cancellationToken.IsCancellationRequested)
448                    {
449                        if (parentHiveTask != null)
450                        {
451                            hiveTask.Task.Id = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.AddChildTask(parentHiveTask.Task.Id, hiveTask.Task, taskData));
452                        }
453                        else {
454                            hiveTask.Task.Id = HiveServiceLocatorWebManagerService.Instance.CallHiveService((s) => s.AddTask(hiveTask.Task, taskData, groups.ToList()));
455                        }
456                    }
457                }, 5, "Failed to add task", log);
458                cancellationToken.ThrowIfCancellationRequested();
459
460                lock (jobCountLocker)
461                {
462                    progress.ProgressValue = (double)taskCount[0] / totalJobCount;
463                    progress.Status = string.Format("Uploaded task ({0} of {1})", taskCount[0], totalJobCount);
464                }
465
466                var tasks = new List<TS.Task>();
467                foreach (HiveTask child in hiveTask.ChildHiveTasks)
468                {
469                    var task = TS.Task.Factory.StartNew((tuple) =>
470                    {
471                        var arguments = (Tuple<HiveTask, HiveTask>)tuple;
472                        UploadTaskWithChildren(progress, arguments.Item1, arguments.Item2, groups, taskCount, totalJobCount, configPluginId, jobId, log, cancellationToken);
473                    }, new Tuple<HiveTask, HiveTask>(child, hiveTask));
474                    task.ContinueWith((x) => log.LogException(x.Exception), TaskContinuationOptions.OnlyOnFaulted);
475                    tasks.Add(task);
476                }
477                taskUploadSemaphore.Release(); semaphoreReleased = true; // the semaphore has to be release before waitall!
478                TS.Task.WaitAll(tasks.ToArray());
479            }
480            finally
481            {
482                if (!semaphoreReleased) taskUploadSemaphore.Release();
483            }
484        }
485        #endregion
486
487        #region Download Experiment
488        public static void LoadJob(RefreshableJob refreshableJob)
489        {
490            var hiveExperiment = refreshableJob.Job;
491            refreshableJob.IsProgressing = true;
492            TaskDownloaderWeb downloader = null;
493
494            try
495            {
496                int totalJobCount = 0;
497                IEnumerable<LightweightTask> allTasks;
498
499                // fetch all task objects to create the full tree of tree of HiveTask objects
500                refreshableJob.Progress.Start("Downloading list of tasks...");
501                allTasks = HiveServiceLocatorWebManagerService.Instance.CallHiveService(s => s.GetLightweightJobTasksWithoutStateLog(hiveExperiment.Id));
502                totalJobCount = allTasks.Count();
503
504                refreshableJob.Progress.Status = "Downloading tasks...";
505                downloader = new TaskDownloaderWeb(allTasks.Select(x => x.Id));
506                downloader.StartAsync();
507
508                while (!downloader.IsFinished)
509                {
510                    refreshableJob.Progress.ProgressValue = downloader.FinishedCount / (double)totalJobCount;
511                    refreshableJob.Progress.Status = string.Format("Downloading/deserializing tasks... ({0}/{1} finished)", downloader.FinishedCount, totalJobCount);
512                    Thread.Sleep(500);
513
514                    if (downloader.IsFaulted)
515                    {
516                        throw downloader.Exception;
517                    }
518                }
519                IDictionary<Guid, HiveTask> allHiveTasks = downloader.Results;
520                var parents = allHiveTasks.Values.Where(x => !x.Task.ParentTaskId.HasValue);
521
522                refreshableJob.Progress.Status = "Downloading/deserializing complete. Displaying tasks...";
523                // build child-task tree
524                foreach (HiveTask hiveTask in parents)
525                {
526                    BuildHiveJobTree(hiveTask, allTasks, allHiveTasks);
527                }
528
529                refreshableJob.HiveTasks = new ItemCollection<HiveTask>(parents);
530                refreshableJob.OnLoaded();
531            }
532            finally
533            {
534                refreshableJob.IsProgressing = false;
535                refreshableJob.Progress.Finish();
536                if (downloader != null)
537                {
538                    downloader.Dispose();
539                }
540            }
541        }
542
543        private static void BuildHiveJobTree(HiveTask parentHiveTask, IEnumerable<LightweightTask> allTasks, IDictionary<Guid, HiveTask> allHiveTasks)
544        {
545            IEnumerable<LightweightTask> childTasks = from job in allTasks
546                                                      where job.ParentTaskId.HasValue && job.ParentTaskId.Value == parentHiveTask.Task.Id
547                                                      orderby job.DateCreated ascending
548                                                      select job;
549            foreach (LightweightTask task in childTasks)
550            {
551                HiveTask childHiveTask = allHiveTasks[task.Id];
552                BuildHiveJobTree(childHiveTask, allTasks, allHiveTasks);
553                parentHiveTask.AddChildHiveTask(childHiveTask);
554            }
555        }
556        #endregion
557
558        /// <summary>
559        /// Converts a string which can contain Ids separated by ';' to a enumerable
560        /// </summary>
561        private static IEnumerable<string> ToResourceNameList(string resourceNames)
562        {
563            if (!string.IsNullOrEmpty(resourceNames))
564            {
565                return resourceNames.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries);
566            }
567            else {
568                return new List<string>();
569            }
570        }
571
572        public static ItemTask LoadItemJob(Guid jobId)
573        {
574            TaskData taskData = HiveServiceLocatorWebManagerService.Instance.CallHiveService(s => s.GetTaskData(jobId));
575            try
576            {
577                return PersistenceUtil.Deserialize<ItemTask>(taskData.Data);
578            }
579            catch
580            {
581                return null;
582            }
583        }
584
585        /// <summary>
586        /// Executes the action. If it throws an exception it is repeated until repetition-count is reached.
587        /// If repetitions is -1, it is repeated infinitely.
588        /// </summary>
589        public static void TryAndRepeat(Action action, int repetitions, string errorMessage, ILog log = null)
590        {
591            while (true)
592            {
593                try { action(); return; }
594                catch (Exception e)
595                {
596                    if (repetitions == 0) throw new HiveException(errorMessage, e);
597                    if (log != null) log.LogMessage(string.Format("{0}: {1} - will try again!", errorMessage, e.ToString()));
598                    repetitions--;
599                }
600            }
601        }
602
603        public static HiveItemCollection<JobPermission> GetJobPermissions(Guid jobId)
604        {
605            return HiveServiceLocatorWebManagerService.Instance.CallHiveService((service) =>
606            {
607                IEnumerable<JobPermission> jps = service.GetJobPermissions(jobId);
608                foreach (var hep in jps)
609                {
610                    hep.UnmodifiedGrantedUserNameUpdate(service.GetUsernameByUserId(hep.GrantedUserId));
611                }
612                return new HiveItemCollection<JobPermission>(jps);
613            });
614        }
615    }
616}
Note: See TracBrowser for help on using the repository browser.