Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Clients.Hive/3.3/RefreshableJob.cs @ 8939

Last change on this file since 8939 was 8939, checked in by ascheibe, 11 years ago

#1950

  • added more aggressive locking so that the views don't read run collections that get modified in the meantime
  • start downloading of tasks after the job has been uploaded completely
  • fixed exceptions that got thrown when waiting for the threads that upload the tasks
File size: 22.7 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.ComponentModel;
25using System.Drawing;
26using System.Linq;
27using HeuristicLab.Collections;
28using HeuristicLab.Common;
29using HeuristicLab.Core;
30using HeuristicLab.MainForm;
31
32namespace HeuristicLab.Clients.Hive {
33  public class RefreshableJob : IHiveItem, IDeepCloneable, IContent, IComparable<RefreshableJob> {
34    private JobResultPoller jobResultPoller;
35    private ConcurrentTaskDownloader<ItemTask> jobDownloader;
36    private object locker = new object();
37    private object downloadFinishedLocker = new object();
38    object jobResultReceivedLocker = new object();
39
40    public bool IsProgressing { get; set; }
41
42    private Job job;
43    public Job Job {
44      get { return job; }
45      set {
46        if (value != job) {
47          if (value == null)
48            throw new ArgumentNullException();
49
50          if (job != null) DergisterJobEvents();
51          job = value;
52          if (job != null) {
53            RegisterJobEvents();
54            job_PropertyChanged(job, new PropertyChangedEventArgs("Id"));
55          }
56          OnJobChanged();
57          OnToStringChanged(this, EventArgs.Empty);
58          job_ItemImageChanged(this, EventArgs.Empty);
59        }
60      }
61    }
62
63    private ItemCollection<HiveTask> hiveTasks;
64    public ItemCollection<HiveTask> HiveTasks {
65      get { return hiveTasks; }
66      set {
67        if (hiveTasks != value) {
68          if (hiveTasks != null) DeregisterHiveJobsEvents();
69          hiveTasks = value;
70          if (hiveTasks != null) RegisterHiveJobsEvents();
71          OnHiveTasksChanged();
72        }
73      }
74    }
75
76    private ExecutionState executionState;
77    public ExecutionState ExecutionState {
78      get { return executionState; }
79      internal set {
80        if (executionState != value) {
81          executionState = value;
82          OnExecutionStateChanged();
83        }
84      }
85    }
86
87    private TimeSpan executionTime;
88    public TimeSpan ExecutionTime {
89      get { return executionTime; }
90      internal set {
91        if (executionTime != value) {
92          executionTime = value;
93          OnExecutionTimeChanged();
94        }
95      }
96    }
97
98    private bool refreshAutomatically;
99    public bool RefreshAutomatically {
100      get { return refreshAutomatically; }
101      set {
102        lock (locker) {
103          if (refreshAutomatically != value) {
104            refreshAutomatically = value;
105            OnRefreshAutomaticallyChanged();
106          }
107          if (RefreshAutomatically) {
108            if (this.HiveTasks != null && this.HiveTasks.Count > 0 && (jobResultPoller == null || !jobResultPoller.IsPolling)) {
109              StartResultPolling();
110            }
111          } else {
112            StopResultPolling();
113          }
114        }
115      }
116    }
117
118    // indicates if download button is enabled
119    private bool isDownloadable = true;
120    public bool IsDownloadable {
121      get { return isDownloadable; }
122      set {
123        if (value != isDownloadable) {
124          isDownloadable = value;
125          OnIsDownloadableChanged();
126        }
127      }
128    }
129
130    // if true, all control buttons should be enabled. otherwise disabled
131    private bool isControllable = true;
132    public bool IsControllable {
133      get { return isControllable; }
134      private set {
135        if (value != isControllable) {
136          isControllable = value;
137          OnIsControllableChanged();
138          if (this.hiveTasks != null) {
139            foreach (var hiveJob in this.hiveTasks) {
140              hiveJob.IsControllable = value;
141            }
142          }
143        }
144      }
145    }
146
147    // indicates if a user is allowed to share this experiment
148    private bool isSharable = true;
149    public bool IsSharable {
150      get { return isSharable; }
151      private set {
152        if (value != isSharable) {
153          isSharable = value;
154          OnIsSharableChanged();
155        }
156      }
157    }
158
159    // may execute jobs with privileged permissions on slaves
160    private bool isAllowedPrivileged = true;
161    public bool IsAllowedPrivileged {
162      get { return isAllowedPrivileged; }
163      set {
164        if (value != isAllowedPrivileged) {
165          isAllowedPrivileged = value;
166          OnIsAllowedPrivilegedChanged();
167        }
168      }
169    }
170
171    private Progress progress;
172    public Progress Progress {
173      get { return progress; }
174      set {
175        this.progress = value;
176        OnIsProgressingChanged();
177      }
178    }
179
180
181    private ThreadSafeLog log;
182    public ILog Log {
183      get { return log; }
184    }
185
186    public StateLogListList StateLogList {
187      get { return new StateLogListList(this.GetAllHiveTasks().Select(x => x.StateLog)); }
188    }
189
190    #region Constructors and Cloning
191    public RefreshableJob() {
192      this.refreshAutomatically = false;
193      this.Job = new Job();
194      this.log = new ThreadSafeLog();
195      this.jobDownloader = new ConcurrentTaskDownloader<ItemTask>(Settings.Default.MaxParallelDownloads, Settings.Default.MaxParallelDownloads);
196      this.jobDownloader.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobDownloader_ExceptionOccured);
197      this.HiveTasks = new ItemCollection<HiveTask>();
198    }
199    public RefreshableJob(Job hiveExperiment) {
200      this.refreshAutomatically = true;
201      this.Job = hiveExperiment;
202      this.log = new ThreadSafeLog();
203      this.jobDownloader = new ConcurrentTaskDownloader<ItemTask>(Settings.Default.MaxParallelDownloads, Settings.Default.MaxParallelDownloads);
204      this.jobDownloader.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobDownloader_ExceptionOccured);
205      this.HiveTasks = new ItemCollection<HiveTask>();
206    }
207    protected RefreshableJob(RefreshableJob original, Cloner cloner) {
208      cloner.RegisterClonedObject(original, this);
209      this.Job = cloner.Clone(original.Job);
210      this.IsControllable = original.IsControllable;
211      this.log = cloner.Clone(original.log);
212      this.RefreshAutomatically = false; // do not start results polling automatically
213      this.jobDownloader = new ConcurrentTaskDownloader<ItemTask>(Settings.Default.MaxParallelDownloads, Settings.Default.MaxParallelDownloads);
214      this.jobDownloader.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobDownloader_ExceptionOccured);
215      this.HiveTasks = cloner.Clone(original.HiveTasks);
216      this.ExecutionTime = original.ExecutionTime;
217      this.ExecutionState = original.ExecutionState;
218    }
219    public IDeepCloneable Clone(Cloner cloner) {
220      return new RefreshableJob(this, cloner);
221    }
222    public object Clone() {
223      return this.Clone(new Cloner());
224    }
225    #endregion
226
227    #region JobResultPoller Events
228    public void StartResultPolling() {
229      if (jobResultPoller == null) {
230        jobResultPoller = new JobResultPoller(job.Id, Settings.Default.ResultPollingInterval);
231        RegisterResultPollingEvents();
232        jobResultPoller.AutoResumeOnException = false;
233      }
234
235      if (!jobResultPoller.IsPolling) {
236        jobResultPoller.Start();
237      }
238    }
239
240    public void StopResultPolling() {
241      if (jobResultPoller != null && jobResultPoller.IsPolling) {
242        jobResultPoller.Stop();
243      }
244    }
245
246    private void RegisterResultPollingEvents() {
247      jobResultPoller.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
248      jobResultPoller.JobResultsReceived += new EventHandler<EventArgs<IEnumerable<LightweightTask>>>(jobResultPoller_JobResultReceived);
249      jobResultPoller.IsPollingChanged += new EventHandler(jobResultPoller_IsPollingChanged);
250    }
251    private void DeregisterResultPollingEvents() {
252      jobResultPoller.ExceptionOccured -= new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
253      jobResultPoller.JobResultsReceived -= new EventHandler<EventArgs<IEnumerable<LightweightTask>>>(jobResultPoller_JobResultReceived);
254      jobResultPoller.IsPollingChanged -= new EventHandler(jobResultPoller_IsPollingChanged);
255    }
256    private void jobResultPoller_IsPollingChanged(object sender, EventArgs e) {
257      if (this.refreshAutomatically != jobResultPoller.IsPolling) {
258        this.refreshAutomatically = jobResultPoller.IsPolling;
259        OnRefreshAutomaticallyChanged();
260      }
261    }
262
263    private void jobResultPoller_JobResultReceived(object sender, EventArgs<IEnumerable<LightweightTask>> e) {
264      lock (jobResultReceivedLocker) {
265        foreach (LightweightTask lightweightTask in e.Value) {
266          HiveTask hiveTask = GetHiveTaskById(lightweightTask.Id);
267          if (hiveTask != null) {
268            // lastJobDataUpdate equals DateTime.MinValue right after it was uploaded. When the first results are polled, this value is updated
269            if (hiveTask.Task.State == TaskState.Offline && lightweightTask.State != TaskState.Finished && lightweightTask.State != TaskState.Failed && lightweightTask.State != TaskState.Aborted) {
270              hiveTask.Task.LastTaskDataUpdate = lightweightTask.LastTaskDataUpdate;
271            }
272
273            hiveTask.UpdateFromLightweightJob(lightweightTask);
274
275            if (!hiveTask.IsFinishedTaskDownloaded && !hiveTask.IsDownloading && hiveTask.Task.LastTaskDataUpdate < lightweightTask.LastTaskDataUpdate) {
276              log.LogMessage(string.Format("Downloading task {0}", lightweightTask.Id));
277              hiveTask.IsDownloading = true;
278              jobDownloader.DownloadTaskData(hiveTask.Task, (localJob, itemJob) => {
279                lock (downloadFinishedLocker) {
280                  log.LogMessage(string.Format("Finished downloading task {0}", localJob.Id));
281                  HiveTask localHiveTask = GetHiveTaskById(localJob.Id);
282
283                  if (itemJob == null) {
284                    // something bad happened to this task. bad task, BAAAD task!
285                    localHiveTask.IsDownloading = false;
286                  } else {
287                    // if the task is paused, download but don't integrate into parent optimizer (to avoid Prepare)
288                    if (localJob.State == TaskState.Paused) {
289                      localHiveTask.ItemTask = itemJob;
290                    } else {
291                      if (localJob.ParentTaskId.HasValue) {
292                        HiveTask parentHiveTask = GetHiveTaskById(localJob.ParentTaskId.Value);
293                        parentHiveTask.IntegrateChild(itemJob, localJob.Id);
294                      } else {
295                        localHiveTask.ItemTask = itemJob;
296                      }
297                    }
298                    localHiveTask.IsDownloading = false;
299                    localHiveTask.Task.LastTaskDataUpdate = lightweightTask.LastTaskDataUpdate;
300                  }
301                }
302              });
303            }
304          } else
305            throw new Exception("This should not happen");
306        }
307        GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory)
308        if (AllJobsFinished()) {
309          this.ExecutionState = Core.ExecutionState.Stopped;
310          StopResultPolling();
311        }
312        UpdateTotalExecutionTime();
313        UpdateStatistics();
314        OnStateLogListChanged();
315        OnTaskReceived();
316      }
317    }
318
319    public HiveTask GetHiveTaskById(Guid jobId) {
320      foreach (HiveTask t in this.HiveTasks) {
321        var hj = t.GetHiveTaskByTaskId(jobId);
322        if (hj != null)
323          return hj;
324      }
325      return null;
326    }
327
328    private void UpdateStatistics() {
329      var jobs = this.GetAllHiveTasks();
330      job.JobCount = jobs.Count();
331      job.CalculatingCount = jobs.Count(j => j.Task.State == TaskState.Calculating);
332      job.FinishedCount = jobs.Count(j => j.Task.State == TaskState.Finished);
333      OnJobStatisticsChanged();
334    }
335
336    public bool AllJobsFinished() {
337      return this.GetAllHiveTasks().All(j => (j.Task.State == TaskState.Finished
338                                                   || j.Task.State == TaskState.Aborted
339                                                   || j.Task.State == TaskState.Failed)
340                                                   && !j.IsDownloading);
341    }
342
343    private void jobResultPoller_ExceptionOccured(object sender, EventArgs<Exception> e) {
344      OnExceptionOccured(sender, e.Value);
345    }
346    private void jobDownloader_ExceptionOccured(object sender, EventArgs<Exception> e) {
347      OnExceptionOccured(sender, e.Value);
348    }
349    public void UpdateTotalExecutionTime() {
350      this.ExecutionTime = TimeSpan.FromMilliseconds(this.GetAllHiveTasks().Sum(x => x.Task.ExecutionTime.TotalMilliseconds));
351    }
352    #endregion
353
354    #region Job Events
355    private void RegisterJobEvents() {
356      job.ToStringChanged += new EventHandler(OnToStringChanged);
357      job.PropertyChanged += new PropertyChangedEventHandler(job_PropertyChanged);
358      job.ItemImageChanged += new EventHandler(job_ItemImageChanged);
359      job.ModifiedChanged += new EventHandler(job_ModifiedChanged);
360    }
361
362    private void DergisterJobEvents() {
363      job.ToStringChanged -= new EventHandler(OnToStringChanged);
364      job.PropertyChanged -= new PropertyChangedEventHandler(job_PropertyChanged);
365      job.ItemImageChanged -= new EventHandler(job_ItemImageChanged);
366      job.ModifiedChanged -= new EventHandler(job_ModifiedChanged);
367    }
368    #endregion
369
370    #region Event Handler
371    public event EventHandler RefreshAutomaticallyChanged;
372    private void OnRefreshAutomaticallyChanged() {
373      var handler = RefreshAutomaticallyChanged;
374      if (handler != null) handler(this, EventArgs.Empty);
375    }
376
377    public event EventHandler JobChanged;
378    private void OnJobChanged() {
379      var handler = JobChanged;
380      if (handler != null) handler(this, EventArgs.Empty);
381    }
382
383    public event EventHandler ModifiedChanged;
384    private void job_ModifiedChanged(object sender, EventArgs e) {
385      var handler = ModifiedChanged;
386      if (handler != null) handler(sender, e);
387    }
388
389    public event EventHandler ItemImageChanged;
390    private void job_ItemImageChanged(object sender, EventArgs e) {
391      var handler = ItemImageChanged;
392      if (handler != null) handler(this, e);
393    }
394
395    public event PropertyChangedEventHandler PropertyChanged;
396    private void job_PropertyChanged(object sender, PropertyChangedEventArgs e) {
397      this.IsSharable = job.Permission == Permission.Full;
398      this.IsControllable = job.Permission == Permission.Full;
399
400      var handler = PropertyChanged;
401      if (handler != null) handler(sender, e);
402    }
403
404    public event EventHandler ToStringChanged;
405    private void OnToStringChanged(object sender, EventArgs e) {
406      var handler = ToStringChanged;
407      if (handler != null) handler(this, e);
408    }
409
410    public event EventHandler IsDownloadableChanged;
411    private void OnIsDownloadableChanged() {
412      var handler = IsDownloadableChanged;
413      if (handler != null) handler(this, EventArgs.Empty);
414    }
415
416    public event EventHandler IsControllableChanged;
417    private void OnIsControllableChanged() {
418      var handler = IsControllableChanged;
419      if (handler != null) handler(this, EventArgs.Empty);
420    }
421
422    public event EventHandler IsSharableChanged;
423    private void OnIsSharableChanged() {
424      var handler = IsSharableChanged;
425      if (handler != null) handler(this, EventArgs.Empty);
426    }
427
428    public event EventHandler IsAllowedPrivilegedChanged;
429    private void OnIsAllowedPrivilegedChanged() {
430      var handler = IsAllowedPrivilegedChanged;
431      if (handler != null) handler(this, EventArgs.Empty);
432    }
433
434    public event EventHandler JobStatisticsChanged;
435    private void OnJobStatisticsChanged() {
436      var handler = JobStatisticsChanged;
437      if (handler != null) handler(this, EventArgs.Empty);
438    }
439
440    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
441    private void OnExceptionOccured(object sender, Exception exception) {
442      log.LogException(exception);
443      var handler = ExceptionOccured;
444      if (handler != null) handler(sender, new EventArgs<Exception>(exception));
445    }
446
447    public event EventHandler StateLogListChanged;
448    private void OnStateLogListChanged() {
449      var handler = StateLogListChanged;
450      if (handler != null) handler(this, EventArgs.Empty);
451    }
452
453    public event EventHandler ExecutionTimeChanged;
454    protected virtual void OnExecutionTimeChanged() {
455      var handler = ExecutionTimeChanged;
456      if (handler != null) handler(this, EventArgs.Empty);
457    }
458
459    public event EventHandler ExecutionStateChanged;
460    protected virtual void OnExecutionStateChanged() {
461      var handler = ExecutionStateChanged;
462      if (handler != null) handler(this, EventArgs.Empty);
463    }
464    public event EventHandler TaskReceived;
465    protected virtual void OnTaskReceived() {
466      var handler = TaskReceived;
467      if (handler != null) handler(this, EventArgs.Empty);
468    }
469    public event EventHandler IsProgressingChanged;
470    private void OnIsProgressingChanged() {
471      var handler = IsProgressingChanged;
472      if (handler != null) handler(this, EventArgs.Empty);
473    }
474    #endregion
475
476    #region HiveTasks Events
477    private void RegisterHiveJobsEvents() {
478      this.hiveTasks.ItemsAdded += new CollectionItemsChangedEventHandler<HiveTask>(hivetasks_ItemsAdded);
479      this.hiveTasks.ItemsRemoved += new CollectionItemsChangedEventHandler<HiveTask>(hiveTasks_ItemsRemoved);
480      this.hiveTasks.CollectionReset += new CollectionItemsChangedEventHandler<HiveTask>(hiveTasks_CollectionReset);
481    }
482
483    private void DeregisterHiveJobsEvents() {
484      this.hiveTasks.ItemsAdded -= new CollectionItemsChangedEventHandler<HiveTask>(hivetasks_ItemsAdded);
485      this.hiveTasks.ItemsRemoved -= new CollectionItemsChangedEventHandler<HiveTask>(hiveTasks_ItemsRemoved);
486      this.hiveTasks.CollectionReset -= new CollectionItemsChangedEventHandler<HiveTask>(hiveTasks_CollectionReset);
487    }
488
489    private void hiveTasks_CollectionReset(object sender, CollectionItemsChangedEventArgs<HiveTask> e) {
490      foreach (var item in e.Items) {
491        item.StateLogChanged -= new EventHandler(item_StateLogChanged);
492      }
493      OnHiveTasksReset(e);
494    }
495
496    private void hiveTasks_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<HiveTask> e) {
497      foreach (var item in e.Items) {
498        item.StateLogChanged -= new EventHandler(item_StateLogChanged);
499      }
500      OnHiveTasksRemoved(e);
501    }
502
503    private void hivetasks_ItemsAdded(object sender, CollectionItemsChangedEventArgs<HiveTask> e) {
504      foreach (var item in e.Items) {
505        item.StateLogChanged += new EventHandler(item_StateLogChanged);
506        item.IsControllable = this.IsControllable;
507      }
508      OnHiveTasksAdded(e);
509    }
510
511    private void item_StateLogChanged(object sender, EventArgs e) {
512      OnStateLogListChanged();
513    }
514    #endregion
515
516    public event EventHandler HiveTasksChanged;
517    protected virtual void OnHiveTasksChanged() {
518      StopResultPolling();
519      if (this.HiveTasks != null && this.HiveTasks.Count > 0 && this.GetAllHiveTasks().All(x => x.Task.Id != Guid.Empty)) {
520        if (IsFinished()) {
521          this.ExecutionState = Core.ExecutionState.Stopped;
522          this.RefreshAutomatically = false;
523          if (jobResultPoller != null) DeregisterResultPollingEvents();
524        } else {
525          this.RefreshAutomatically = true;
526        }
527      }
528
529      var handler = HiveTasksChanged;
530      if (handler != null) handler(this, EventArgs.Empty);
531    }
532
533    public event EventHandler Loaded;
534    public virtual void OnLoaded() {
535      this.UpdateTotalExecutionTime();
536      this.OnStateLogListChanged();
537
538      if (this.ExecutionState != ExecutionState.Stopped) {
539        this.RefreshAutomatically = true;
540      }
541
542      var handler = Loaded;
543      if (handler != null) handler(this, EventArgs.Empty);
544    }
545
546    public event EventHandler<CollectionItemsChangedEventArgs<HiveTask>> HiveTasksAdded;
547    private void OnHiveTasksAdded(CollectionItemsChangedEventArgs<HiveTask> e) {
548      var handler = HiveTasksAdded;
549      if (handler != null) handler(this, e);
550    }
551
552    public event EventHandler<CollectionItemsChangedEventArgs<HiveTask>> HiveTasksRemoved;
553    private void OnHiveTasksRemoved(CollectionItemsChangedEventArgs<HiveTask> e) {
554      var handler = HiveTasksRemoved;
555      if (handler != null) handler(this, e);
556    }
557
558    public event EventHandler<CollectionItemsChangedEventArgs<HiveTask>> HiveTasksReset;
559    private void OnHiveTasksReset(CollectionItemsChangedEventArgs<HiveTask> e) {
560      var handler = HiveTasksReset;
561      if (handler != null) handler(this, e);
562    }
563
564    public Guid Id {
565      get { return job.Id; }
566      set { job.Id = value; }
567    }
568    public bool Modified {
569      get { return job.Modified; }
570    }
571    public void Store() {
572      job.Store();
573    }
574    public string ItemDescription {
575      get { return job.ItemDescription; }
576    }
577    public Image ItemImage {
578      get { return job.ItemImage; }
579    }
580    public string ItemName {
581      get { return job.ItemName; }
582    }
583    public Version ItemVersion {
584      get { return job.ItemVersion; }
585    }
586
587    public override string ToString() {
588      return string.Format("{0} {1}", Job.DateCreated.ToString("MM.dd.yyyy HH:mm"), Job.ToString());
589    }
590
591    public bool IsFinished() {
592      return HiveTasks != null
593        && HiveTasks.All(x => x.Task.DateFinished.HasValue && x.Task.DateCreated.HasValue);
594    }
595
596    public IEnumerable<HiveTask> GetAllHiveTasks() {
597      if (hiveTasks == null) return Enumerable.Empty<HiveTask>();
598
599      var tasks = new List<HiveTask>();
600      foreach (HiveTask task in HiveTasks) {
601        tasks.AddRange(task.GetAllHiveTasks());
602      }
603      return tasks;
604    }
605
606    public int CompareTo(RefreshableJob other) {
607      return this.ToString().CompareTo(other.ToString());
608    }
609  }
610}
Note: See TracBrowser for help on using the repository browser.