Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Clients.Hive/3.3/RefreshableJob.cs @ 7533

Last change on this file since 7533 was 7409, checked in by ascheibe, 13 years ago

#1766 fixed crash when the hive job manager can't deserialize a task

File size: 22.3 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.ComponentModel;
25using System.Drawing;
26using System.Linq;
27using HeuristicLab.Collections;
28using HeuristicLab.Common;
29using HeuristicLab.Core;
30
31namespace HeuristicLab.Clients.Hive {
32  public class RefreshableJob : IHiveItem, IDeepCloneable, IContent, IProgressReporter, IComparable<RefreshableJob> {
33    private JobResultPoller jobResultPoller;
34    private ConcurrentTaskDownloader<ItemTask> jobDownloader;
35    private static object locker = new object();
36
37    private Job job;
38    public Job Job {
39      get { return job; }
40      set {
41        if (value != job) {
42          if (value == null)
43            throw new ArgumentNullException();
44
45          if (job != null) DergisterJobEvents();
46          job = value;
47          if (job != null) {
48            RegisterJobEvents();
49            job_PropertyChanged(job, new PropertyChangedEventArgs("Id"));
50          }
51          OnJobChanged();
52          OnToStringChanged(this, EventArgs.Empty);
53          job_ItemImageChanged(this, EventArgs.Empty);
54        }
55      }
56    }
57
58    private ItemCollection<HiveTask> hiveTasks;
59    public ItemCollection<HiveTask> HiveTasks {
60      get { return hiveTasks; }
61      set {
62        if (hiveTasks != value) {
63          if (hiveTasks != null) DeregisterHiveJobsEvents();
64          hiveTasks = value;
65          if (hiveTasks != null) RegisterHiveJobsEvents();
66          OnHiveTasksChanged();
67        }
68      }
69    }
70
71    private ExecutionState executionState;
72    public ExecutionState ExecutionState {
73      get { return executionState; }
74      internal set {
75        if (executionState != value) {
76          executionState = value;
77          OnExecutionStateChanged();
78        }
79      }
80    }
81
82    private TimeSpan executionTime;
83    public TimeSpan ExecutionTime {
84      get { return executionTime; }
85      internal set {
86        if (executionTime != value) {
87          executionTime = value;
88          OnExecutionTimeChanged();
89        }
90      }
91    }
92
93    private bool refreshAutomatically;
94    public bool RefreshAutomatically {
95      get { return refreshAutomatically; }
96      set {
97        lock (locker) {
98          if (refreshAutomatically != value) {
99            refreshAutomatically = value;
100            OnRefreshAutomaticallyChanged();
101          }
102          if (RefreshAutomatically) {
103            if (this.HiveTasks != null && this.HiveTasks.Count > 0 && (jobResultPoller == null || !jobResultPoller.IsPolling)) {
104              StartResultPolling();
105            }
106          } else {
107            StopResultPolling();
108          }
109        }
110      }
111    }
112
113    // indicates if download button is enabled
114    private bool isDownloadable = true;
115    public bool IsDownloadable {
116      get { return isDownloadable; }
117      set {
118        if (value != isDownloadable) {
119          isDownloadable = value;
120          OnIsDownloadableChanged();
121        }
122      }
123    }
124
125    // if true, all control buttons should be enabled. otherwise disabled
126    private bool isControllable = true;
127    public bool IsControllable {
128      get { return isControllable; }
129      private set {
130        if (value != isControllable) {
131          isControllable = value;
132          OnIsControllableChanged();
133          if (this.hiveTasks != null) {
134            foreach (var hiveJob in this.hiveTasks) {
135              hiveJob.IsControllable = value;
136            }
137          }
138        }
139      }
140    }
141
142    // indicates if a user is allowed to share this experiment
143    private bool isSharable = true;
144    public bool IsSharable {
145      get { return isSharable; }
146      private set {
147        if (value != isSharable) {
148          isSharable = value;
149          OnIsSharableChanged();
150        }
151      }
152    }
153
154    // may execute jobs with privileged permissions on slaves
155    private bool isAllowedPrivileged = true;
156    public bool IsAllowedPrivileged {
157      get { return isAllowedPrivileged; }
158      set {
159        if (value != isAllowedPrivileged) {
160          isAllowedPrivileged = value;
161          OnIsAllowedPrivilegedChanged();
162        }
163      }
164    }
165
166    private bool isProgressing;
167    public bool IsProgressing {
168      get { return isProgressing; }
169      set {
170        if (isProgressing != value) {
171          isProgressing = value;
172          OnIsProgressingChanged();
173        }
174      }
175    }
176
177    private IProgress progress;
178    public IProgress Progress {
179      get { return progress; }
180      set { this.progress = value; }
181    }
182
183    private ThreadSafeLog log;
184    public ILog Log {
185      get { return log; }
186    }
187
188    public StateLogListList StateLogList {
189      get { return new StateLogListList(this.GetAllHiveTasks().Select(x => x.StateLog)); }
190    }
191
192    #region Constructors and Cloning
193    public RefreshableJob() {
194      this.refreshAutomatically = true;
195      this.Job = new Job();
196      this.log = new ThreadSafeLog();
197      this.jobDownloader = new ConcurrentTaskDownloader<ItemTask>(Settings.Default.MaxParallelDownloads, Settings.Default.MaxParallelDownloads);
198      this.jobDownloader.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobDownloader_ExceptionOccured);
199      this.HiveTasks = new ItemCollection<HiveTask>();
200    }
201    public RefreshableJob(Job hiveExperiment) {
202      this.refreshAutomatically = true;
203      this.Job = hiveExperiment;
204      this.log = new ThreadSafeLog();
205      this.jobDownloader = new ConcurrentTaskDownloader<ItemTask>(Settings.Default.MaxParallelDownloads, Settings.Default.MaxParallelDownloads);
206      this.jobDownloader.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobDownloader_ExceptionOccured);
207      this.HiveTasks = new ItemCollection<HiveTask>();
208    }
209    protected RefreshableJob(RefreshableJob original, Cloner cloner) {
210      cloner.RegisterClonedObject(original, this);
211      this.Job = cloner.Clone(original.Job);
212      this.IsControllable = original.IsControllable;
213      this.log = cloner.Clone(original.log);
214      this.RefreshAutomatically = false; // do not start results polling automatically
215      this.jobDownloader = new ConcurrentTaskDownloader<ItemTask>(Settings.Default.MaxParallelDownloads, Settings.Default.MaxParallelDownloads);
216      this.jobDownloader.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobDownloader_ExceptionOccured);
217      this.HiveTasks = cloner.Clone(original.HiveTasks);
218      this.ExecutionTime = original.ExecutionTime;
219      this.ExecutionState = original.ExecutionState;
220    }
221    public IDeepCloneable Clone(Cloner cloner) {
222      return new RefreshableJob(this, cloner);
223    }
224    public object Clone() {
225      return this.Clone(new Cloner());
226    }
227    #endregion
228
229    #region JobResultPoller Events
230
231    public void StartResultPolling() {
232      if (jobResultPoller == null) {
233        jobResultPoller = new JobResultPoller(job.Id, Settings.Default.ResultPollingInterval);
234        RegisterResultPollingEvents();
235        jobResultPoller.AutoResumeOnException = true;
236      }
237
238      if (!jobResultPoller.IsPolling) {
239        jobResultPoller.Start();
240      }
241    }
242
243    public void StopResultPolling() {
244      if (jobResultPoller != null && jobResultPoller.IsPolling) {
245        jobResultPoller.Stop();
246      }
247    }
248
249    private void RegisterResultPollingEvents() {
250      jobResultPoller.ExceptionOccured += new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
251      jobResultPoller.JobResultsReceived += new EventHandler<EventArgs<IEnumerable<LightweightTask>>>(jobResultPoller_JobResultReceived);
252      jobResultPoller.IsPollingChanged += new EventHandler(jobResultPoller_IsPollingChanged);
253    }
254    private void DeregisterResultPollingEvents() {
255      jobResultPoller.ExceptionOccured -= new EventHandler<EventArgs<Exception>>(jobResultPoller_ExceptionOccured);
256      jobResultPoller.JobResultsReceived -= new EventHandler<EventArgs<IEnumerable<LightweightTask>>>(jobResultPoller_JobResultReceived);
257      jobResultPoller.IsPollingChanged -= new EventHandler(jobResultPoller_IsPollingChanged);
258    }
259    private void jobResultPoller_IsPollingChanged(object sender, EventArgs e) {
260      if (this.refreshAutomatically != jobResultPoller.IsPolling) {
261        this.refreshAutomatically = jobResultPoller.IsPolling;
262        OnRefreshAutomaticallyChanged();
263      }
264    }
265    private void jobResultPoller_JobResultReceived(object sender, EventArgs<IEnumerable<LightweightTask>> e) {
266      foreach (LightweightTask lightweightTask in e.Value) {
267        HiveTask hiveTask = GetHiveTaskById(lightweightTask.Id);
268        if (hiveTask != null) {
269          // lastJobDataUpdate equals DateTime.MinValue right after it was uploaded. When the first results are polled, this value is updated
270          if (hiveTask.Task.State == TaskState.Offline && lightweightTask.State != TaskState.Finished && lightweightTask.State != TaskState.Failed && lightweightTask.State != TaskState.Aborted) {
271            hiveTask.Task.LastTaskDataUpdate = lightweightTask.LastTaskDataUpdate;
272          }
273
274          hiveTask.UpdateFromLightweightJob(lightweightTask);
275
276          if (!hiveTask.IsFinishedTaskDownloaded && !hiveTask.IsDownloading && hiveTask.Task.LastTaskDataUpdate < lightweightTask.LastTaskDataUpdate) {
277            log.LogMessage(string.Format("Downloading task {0}", lightweightTask.Id));
278            hiveTask.IsDownloading = true;
279            jobDownloader.DownloadTaskData(hiveTask.Task, (localJob, itemJob) => {
280              log.LogMessage(string.Format("Finished downloading task {0}", localJob.Id));
281              HiveTask localHiveTask = GetHiveTaskById(localJob.Id);
282
283              if (itemJob == null) {
284                localHiveTask.IsDownloading = false;
285              }
286
287              if (itemJob == null) {
288                // something bad happened to this task. bad task, BAAAD task!
289              } else {
290                // if the task is paused, download but don't integrate into parent optimizer (to avoid Prepare)
291
292                if (localJob.State == TaskState.Paused) {
293                  localHiveTask.ItemTask = itemJob;
294                } else {
295                  if (localJob.ParentTaskId.HasValue) {
296                    HiveTask parentHiveTask = GetHiveTaskById(localJob.ParentTaskId.Value);
297                    parentHiveTask.IntegrateChild(itemJob, localJob.Id);
298                  } else {
299                    localHiveTask.ItemTask = itemJob;
300                  }
301                }
302                localHiveTask.IsDownloading = false;
303                localHiveTask.Task.LastTaskDataUpdate = localJob.LastTaskDataUpdate;
304              }
305            });
306          }
307        }
308      }
309      GC.Collect(); // force GC, because .NET is too lazy here (deserialization takes a lot of memory)
310      if (AllJobsFinished()) {
311        this.ExecutionState = Core.ExecutionState.Stopped;
312        StopResultPolling();
313      }
314      UpdateTotalExecutionTime();
315      UpdateStatistics();
316      OnStateLogListChanged();
317    }
318
319    public HiveTask GetHiveTaskById(Guid jobId) {
320      foreach (HiveTask t in this.HiveTasks) {
321        var hj = t.GetHiveTaskByTaskId(jobId);
322        if (hj != null)
323          return hj;
324      }
325      return null;
326    }
327
328    private void UpdateStatistics() {
329      var jobs = this.GetAllHiveTasks();
330      job.JobCount = jobs.Count();
331      job.CalculatingCount = jobs.Count(j => j.Task.State == TaskState.Calculating);
332      job.FinishedCount = jobs.Count(j => j.Task.State == TaskState.Finished);
333      OnJobStatisticsChanged();
334    }
335
336    public bool AllJobsFinished() {
337      return this.GetAllHiveTasks().All(j => (j.Task.State == TaskState.Finished
338                                                   || j.Task.State == TaskState.Aborted
339                                                   || j.Task.State == TaskState.Failed)
340                                                   && j.IsFinishedTaskDownloaded);
341    }
342
343    private void jobResultPoller_ExceptionOccured(object sender, EventArgs<Exception> e) {
344      OnExceptionOccured(sender, e.Value);
345    }
346    private void jobDownloader_ExceptionOccured(object sender, EventArgs<Exception> e) {
347      OnExceptionOccured(sender, e.Value);
348    }
349    public void UpdateTotalExecutionTime() {
350      this.ExecutionTime = TimeSpan.FromMilliseconds(this.GetAllHiveTasks().Sum(x => x.Task.ExecutionTime.TotalMilliseconds));
351    }
352    #endregion
353
354    #region Job Events
355    private void RegisterJobEvents() {
356      job.ToStringChanged += new EventHandler(OnToStringChanged);
357      job.PropertyChanged += new PropertyChangedEventHandler(job_PropertyChanged);
358      job.ItemImageChanged += new EventHandler(job_ItemImageChanged);
359      job.ModifiedChanged += new EventHandler(job_ModifiedChanged);
360    }
361
362    private void DergisterJobEvents() {
363      job.ToStringChanged -= new EventHandler(OnToStringChanged);
364      job.PropertyChanged -= new PropertyChangedEventHandler(job_PropertyChanged);
365      job.ItemImageChanged -= new EventHandler(job_ItemImageChanged);
366      job.ModifiedChanged -= new EventHandler(job_ModifiedChanged);
367    }
368    #endregion
369
370    #region Event Handler
371    public event EventHandler RefreshAutomaticallyChanged;
372    private void OnRefreshAutomaticallyChanged() {
373      var handler = RefreshAutomaticallyChanged;
374      if (handler != null) handler(this, EventArgs.Empty);
375    }
376
377    public event EventHandler JobChanged;
378    private void OnJobChanged() {
379      var handler = JobChanged;
380      if (handler != null) handler(this, EventArgs.Empty);
381    }
382
383    public event EventHandler ModifiedChanged;
384    private void job_ModifiedChanged(object sender, EventArgs e) {
385      var handler = ModifiedChanged;
386      if (handler != null) handler(sender, e);
387    }
388
389    public event EventHandler ItemImageChanged;
390    private void job_ItemImageChanged(object sender, EventArgs e) {
391      var handler = ItemImageChanged;
392      if (handler != null) handler(this, e);
393    }
394
395    public event PropertyChangedEventHandler PropertyChanged;
396    private void job_PropertyChanged(object sender, PropertyChangedEventArgs e) {
397      this.IsSharable = job.Permission == Permission.Full;
398      this.IsControllable = job.Permission == Permission.Full;
399
400      var handler = PropertyChanged;
401      if (handler != null) handler(sender, e);
402    }
403
404    public event EventHandler ToStringChanged;
405    private void OnToStringChanged(object sender, EventArgs e) {
406      var handler = ToStringChanged;
407      if (handler != null) handler(this, e);
408    }
409
410    public event EventHandler IsProgressingChanged;
411    protected virtual void OnIsProgressingChanged() {
412      var handler = IsProgressingChanged;
413      if (handler != null) handler(this, EventArgs.Empty);
414    }
415
416    public event EventHandler IsDownloadableChanged;
417    private void OnIsDownloadableChanged() {
418      var handler = IsDownloadableChanged;
419      if (handler != null) handler(this, EventArgs.Empty);
420    }
421
422    public event EventHandler IsControllableChanged;
423    private void OnIsControllableChanged() {
424      var handler = IsControllableChanged;
425      if (handler != null) handler(this, EventArgs.Empty);
426    }
427
428    public event EventHandler IsSharableChanged;
429    private void OnIsSharableChanged() {
430      var handler = IsSharableChanged;
431      if (handler != null) handler(this, EventArgs.Empty);
432    }
433
434    public event EventHandler IsAllowedPrivilegedChanged;
435    private void OnIsAllowedPrivilegedChanged() {
436      var handler = IsAllowedPrivilegedChanged;
437      if (handler != null) handler(this, EventArgs.Empty);
438    }
439
440    public event EventHandler JobStatisticsChanged;
441    private void OnJobStatisticsChanged() {
442      var handler = JobStatisticsChanged;
443      if (handler != null) handler(this, EventArgs.Empty);
444    }
445
446    public event EventHandler<EventArgs<Exception>> ExceptionOccured;
447    private void OnExceptionOccured(object sender, Exception exception) {
448      log.LogException(exception);
449      var handler = ExceptionOccured;
450      if (handler != null) handler(sender, new EventArgs<Exception>(exception));
451    }
452
453    public event EventHandler StateLogListChanged;
454    private void OnStateLogListChanged() {
455      var handler = StateLogListChanged;
456      if (handler != null) handler(this, EventArgs.Empty);
457    }
458
459    public event EventHandler ExecutionTimeChanged;
460    protected virtual void OnExecutionTimeChanged() {
461      var handler = ExecutionTimeChanged;
462      if (handler != null) handler(this, EventArgs.Empty);
463    }
464
465    public event EventHandler ExecutionStateChanged;
466    protected virtual void OnExecutionStateChanged() {
467      var handler = ExecutionStateChanged;
468      if (handler != null) handler(this, EventArgs.Empty);
469    }
470    #endregion
471
472    #region HiveTasks Events
473    private void RegisterHiveJobsEvents() {
474      this.hiveTasks.ItemsAdded += new CollectionItemsChangedEventHandler<HiveTask>(hivetasks_ItemsAdded);
475      this.hiveTasks.ItemsRemoved += new CollectionItemsChangedEventHandler<HiveTask>(hiveTasks_ItemsRemoved);
476      this.hiveTasks.CollectionReset += new CollectionItemsChangedEventHandler<HiveTask>(hiveTasks_CollectionReset);
477    }
478
479    private void DeregisterHiveJobsEvents() {
480      this.hiveTasks.ItemsAdded -= new CollectionItemsChangedEventHandler<HiveTask>(hivetasks_ItemsAdded);
481      this.hiveTasks.ItemsRemoved -= new CollectionItemsChangedEventHandler<HiveTask>(hiveTasks_ItemsRemoved);
482      this.hiveTasks.CollectionReset -= new CollectionItemsChangedEventHandler<HiveTask>(hiveTasks_CollectionReset);
483    }
484
485    private void hiveTasks_CollectionReset(object sender, CollectionItemsChangedEventArgs<HiveTask> e) {
486      foreach (var item in e.Items) {
487        item.StateLogChanged -= new EventHandler(item_StateLogChanged);
488      }
489      OnHiveTasksReset(e);
490    }
491
492    private void hiveTasks_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<HiveTask> e) {
493      foreach (var item in e.Items) {
494        item.StateLogChanged -= new EventHandler(item_StateLogChanged);
495      }
496      OnHiveTasksRemoved(e);
497    }
498
499    private void hivetasks_ItemsAdded(object sender, CollectionItemsChangedEventArgs<HiveTask> e) {
500      foreach (var item in e.Items) {
501        item.StateLogChanged += new EventHandler(item_StateLogChanged);
502        item.IsControllable = this.IsControllable;
503      }
504      OnHiveTasksAdded(e);
505    }
506
507    private void item_StateLogChanged(object sender, EventArgs e) {
508      OnStateLogListChanged();
509    }
510    #endregion
511
512    public event EventHandler HiveTasksChanged;
513    protected virtual void OnHiveTasksChanged() {
514      if (jobResultPoller != null && jobResultPoller.IsPolling) {
515        jobResultPoller.Stop();
516        DeregisterResultPollingEvents();
517      }
518      if (this.HiveTasks != null && this.HiveTasks.Count > 0 && this.GetAllHiveTasks().All(x => x.Task.Id != Guid.Empty)) {
519        if (IsFinished()) {
520          this.ExecutionState = Core.ExecutionState.Stopped;
521          this.RefreshAutomatically = false;
522        }
523
524        if (this.RefreshAutomatically) {
525          StartResultPolling();
526        }
527      }
528
529      var handler = HiveTasksChanged;
530      if (handler != null) handler(this, EventArgs.Empty);
531    }
532
533    public event EventHandler Loaded;
534    public virtual void OnLoaded() {
535      this.UpdateTotalExecutionTime();
536      this.OnStateLogListChanged();
537
538      if (this.ExecutionState != ExecutionState.Stopped) {
539        this.RefreshAutomatically = true;
540      }
541
542      var handler = Loaded;
543      if (handler != null) handler(this, EventArgs.Empty);
544    }
545
546    public event EventHandler<CollectionItemsChangedEventArgs<HiveTask>> HiveTasksAdded;
547    private void OnHiveTasksAdded(CollectionItemsChangedEventArgs<HiveTask> e) {
548      var handler = HiveTasksAdded;
549      if (handler != null) handler(this, e);
550    }
551
552    public event EventHandler<CollectionItemsChangedEventArgs<HiveTask>> HiveTasksRemoved;
553    private void OnHiveTasksRemoved(CollectionItemsChangedEventArgs<HiveTask> e) {
554      var handler = HiveTasksRemoved;
555      if (handler != null) handler(this, e);
556    }
557
558    public event EventHandler<CollectionItemsChangedEventArgs<HiveTask>> HiveTasksReset;
559    private void OnHiveTasksReset(CollectionItemsChangedEventArgs<HiveTask> e) {
560      var handler = HiveTasksReset;
561      if (handler != null) handler(this, e);
562    }
563
564    public Guid Id {
565      get { return job.Id; }
566      set { job.Id = value; }
567    }
568    public bool Modified {
569      get { return job.Modified; }
570    }
571    public void Store() {
572      job.Store();
573    }
574    public string ItemDescription {
575      get { return job.ItemDescription; }
576    }
577    public Image ItemImage {
578      get { return job.ItemImage; }
579    }
580    public string ItemName {
581      get { return job.ItemName; }
582    }
583    public Version ItemVersion {
584      get { return job.ItemVersion; }
585    }
586
587    public override string ToString() {
588      return string.Format("{0} {1}", Job.DateCreated.ToString("MM.dd.yyyy HH:mm"), Job.ToString());
589    }
590
591    public bool IsFinished() {
592      return HiveTasks != null
593        && HiveTasks.All(x => x.Task.DateFinished.HasValue && x.Task.DateCreated.HasValue);
594    }
595
596    public IEnumerable<HiveTask> GetAllHiveTasks() {
597      if (hiveTasks == null) return Enumerable.Empty<HiveTask>();
598
599      var tasks = new List<HiveTask>();
600      foreach (HiveTask task in HiveTasks) {
601        tasks.AddRange(task.GetAllHiveTasks());
602      }
603      return tasks;
604    }
605
606    public int CompareTo(RefreshableJob other) {
607      return this.ToString().CompareTo(other.ToString());
608    }
609  }
610}
Note: See TracBrowser for help on using the repository browser.