source: trunk/sources/HeuristicLab.Clients.Hive/3.3/HiveTasks/OptimizerHiveTask.cs @ 8939

Last change on this file since 8939 was 8939, checked in by ascheibe, 9 years ago

#1950

  • added more aggressive locking so that the views don't read run collections that get modified in the meantime
  • start downloading of tasks after the job has been uploaded completely
  • fixed exceptions that got thrown when waiting for the threads that upload the tasks
File size: 18.4 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2012 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using HeuristicLab.Clients.Hive.Jobs;
26using HeuristicLab.Collections;
27using HeuristicLab.Common;
28using HeuristicLab.Optimization;
29using HeuristicLab.PluginInfrastructure;
30
31namespace HeuristicLab.Clients.Hive {
32  public class OptimizerHiveTask : HiveTask<OptimizerTask> {
33    #region Constructors and Cloning
34    public OptimizerHiveTask() { }
35    public OptimizerHiveTask(IOptimizer optimizer)
36      : this() {
37      this.ItemTask = new OptimizerTask(optimizer);
38    }
39    public OptimizerHiveTask(OptimizerTask optimizerJob)
40      : this() {
41      this.ItemTask = optimizerJob;
42    }
43    protected OptimizerHiveTask(OptimizerHiveTask original, Cloner cloner)
44      : base(original, cloner) {
45    }
46    public override IDeepCloneable Clone(Cloner cloner) {
47      return new OptimizerHiveTask(this, cloner);
48    }
49    #endregion
50
51    /// <summary>
52    /// if this.Optimizer is an experiment
53    ///   Uses the child-optimizers of this.HiveTask and creates HiveTask-childs
54    /// if this.Optimizer is a batchrun
55    ///   Creates a number of child-jobs according to repetitions
56    /// </summary>
57    protected override void UpdateChildHiveTasks() {
58      base.UpdateChildHiveTasks();
59      childHiveTasksLock.EnterWriteLock();
60      try {
61        if (Task != null && syncTasksWithOptimizers) {
62          if (!ItemTask.ComputeInParallel) {
63            this.childHiveTasks.Clear();
64          } else {
65            if (ItemTask.Item is Optimization.Experiment) {
66              Optimization.Experiment experiment = (Optimization.Experiment)ItemTask.Item;
67              foreach (IOptimizer childOpt in experiment.Optimizers) {
68                var optimizerHiveTask = new OptimizerHiveTask(childOpt);
69                optimizerHiveTask.Task.Priority = Task.Priority; //inherit priority from parent
70                this.childHiveTasks.Add(optimizerHiveTask);
71              }
72            } else if (ItemTask.Item is Optimization.BatchRun) {
73              Optimization.BatchRun batchRun = ItemTask.OptimizerAsBatchRun;
74              if (batchRun.Optimizer != null) {
75                while (this.childHiveTasks.Count < batchRun.Repetitions) {
76                  var optimizerHiveTask = new OptimizerHiveTask(batchRun.Optimizer);
77                  optimizerHiveTask.Task.Priority = Task.Priority;
78                  this.childHiveTasks.Add(optimizerHiveTask);
79                }
80                while (this.childHiveTasks.Count > batchRun.Repetitions) {
81                  this.childHiveTasks.Remove(this.childHiveTasks.Last());
82                }
83              }
84            }
85          }
86        }
87      }
88      finally {
89        childHiveTasksLock.ExitWriteLock();
90      }
91    }
92
93    protected override void RegisterItemTaskEvents() {
94      base.RegisterItemTaskEvents();
95      if (ItemTask != null) {
96        if (ItemTask.Item is Optimization.Experiment) {
97          Optimization.Experiment experiment = ItemTask.OptimizerAsExperiment;
98          experiment.Optimizers.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_ItemsAdded);
99          experiment.Optimizers.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_ItemsReplaced);
100          experiment.Optimizers.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_ItemsRemoved);
101          experiment.Optimizers.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_CollectionReset);
102        } else if (ItemTask.Item is Optimization.BatchRun) {
103          Optimization.BatchRun batchRun = ItemTask.OptimizerAsBatchRun;
104          batchRun.RepetitionsChanged += new EventHandler(batchRun_RepetitionsChanged);
105          batchRun.OptimizerChanged += new EventHandler(batchRun_OptimizerChanged);
106        }
107      }
108    }
109    protected override void DergisterItemTaskEvents() {
110      base.DergisterItemTaskEvents();
111      if (ItemTask != null) {
112        if (ItemTask.Item is Optimization.Experiment) {
113          Optimization.Experiment experiment = ItemTask.OptimizerAsExperiment;
114          experiment.Optimizers.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_ItemsAdded);
115          experiment.Optimizers.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_ItemsReplaced);
116          experiment.Optimizers.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_ItemsRemoved);
117          experiment.Optimizers.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_CollectionReset);
118        } else if (ItemTask.Item is Optimization.BatchRun) {
119          Optimization.BatchRun batchRun = ItemTask.OptimizerAsBatchRun;
120          batchRun.RepetitionsChanged -= new EventHandler(batchRun_RepetitionsChanged);
121          batchRun.OptimizerChanged -= new EventHandler(batchRun_OptimizerChanged);
122        }
123      }
124    }
125
126    private void batchRun_OptimizerChanged(object sender, EventArgs e) {
127      if (syncTasksWithOptimizers) {
128        UpdateChildHiveTasks();
129      }
130    }
131
132    private void batchRun_RepetitionsChanged(object sender, EventArgs e) {
133      if (syncTasksWithOptimizers) {
134        UpdateChildHiveTasks();
135      }
136    }
137
138    private void Optimizers_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<IOptimizer>> e) {
139      if (syncTasksWithOptimizers && this.ItemTask.ComputeInParallel) {
140        childHiveTasksLock.EnterWriteLock();
141        try {
142          foreach (var item in e.Items) {
143            if (GetChildByOptimizer(item.Value) == null && item.Value.Name != "Placeholder") {
144              this.childHiveTasks.Add(new OptimizerHiveTask(item.Value));
145            }
146          }
147        }
148        finally { childHiveTasksLock.ExitWriteLock(); }
149      }
150    }
151    private void Optimizers_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<IOptimizer>> e) {
152      if (syncTasksWithOptimizers && this.ItemTask.ComputeInParallel) {
153        childHiveTasksLock.EnterWriteLock();
154        try {
155          foreach (var item in e.OldItems) {
156            this.childHiveTasks.Remove(this.GetChildByOptimizer(item.Value));
157          }
158          foreach (var item in e.Items) {
159            if (GetChildByOptimizer(item.Value) == null && item.Value.Name != "Placeholder") {
160              this.childHiveTasks.Add(new OptimizerHiveTask(item.Value));
161            }
162          }
163        }
164        finally { childHiveTasksLock.ExitWriteLock(); }
165      }
166    }
167    private void Optimizers_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<IOptimizer>> e) {
168      if (syncTasksWithOptimizers && this.ItemTask.ComputeInParallel) {
169        childHiveTasksLock.EnterWriteLock();
170        try {
171          foreach (var item in e.Items) {
172            this.childHiveTasks.Remove(this.GetChildByOptimizer(item.Value));
173          }
174        }
175        finally { childHiveTasksLock.ExitWriteLock(); }
176      }
177    }
178    private void Optimizers_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<IOptimizer>> e) {
179      if (syncTasksWithOptimizers && this.ItemTask.ComputeInParallel) {
180        childHiveTasksLock.EnterWriteLock();
181        try {
182          foreach (var item in e.Items) {
183            this.childHiveTasks.Remove(this.GetChildByOptimizer(item.Value));
184          }
185        }
186        finally { childHiveTasksLock.ExitWriteLock(); }
187      }
188    }
189
190    /// <summary>
191    /// if this.Optimizer is Experiment
192    ///   replace the child-optimizer in the experiment
193    /// if this.Optimizer is BatchRun
194    ///   add the runs from the optimizerTask to the batchrun and replace the Optimizer
195    /// </summary>
196    public override void IntegrateChild(ItemTask task, Guid childTaskId) {
197      var optimizerTask = (OptimizerTask)task;
198      syncTasksWithOptimizers = false; // don't sync with optimizers during this method
199
200      if (this.ItemTask != null && this.ItemTask.Item != null) {
201        if (this.ItemTask.Item is Optimization.Experiment) {
202          UpdateOptimizerInExperiment(this.ItemTask.OptimizerAsExperiment, optimizerTask);
203        } else if (this.ItemTask.Item is Optimization.BatchRun) {
204          UpdateOptimizerInBatchRun(this.ItemTask.OptimizerAsBatchRun, optimizerTask);
205        }
206      }
207
208      IEnumerable<HiveTask> childs = this.ChildHiveTasks.Where(j => j.Task.Id == childTaskId);
209      //TODO: in very rare cases childs is empty. This shouldn't be the case and should be further investigated.
210      if (childs.Count() > 0) {
211        OptimizerHiveTask child = childs.First() as OptimizerHiveTask;
212
213        if (child != null && !optimizerTask.ComputeInParallel) {
214          child.syncTasksWithOptimizers = false;
215          child.ItemTask = optimizerTask;
216          child.syncTasksWithOptimizers = true;
217        }
218      }
219
220      syncTasksWithOptimizers = true;
221    }
222
223    /// <summary>
224    /// Adds the runs from the optimizerTask to the batchrun and replaces the Optimizer
225    /// Sideeffect: the optimizerTask.Optimizer will be prepared (scopes are deleted and executionstate will be reset)
226    /// </summary>
227    private void UpdateOptimizerInBatchRun(BatchRun batchRun, OptimizerTask optimizerTask) {
228      itemTaskLock.EnterWriteLock();
229      try {
230        if (batchRun.Optimizer == null) {
231          batchRun.Optimizer = (IOptimizer)optimizerTask.Item; // only set the first optimizer as Optimizer. if every time the Optimizer would be set, the runs would be cleared each time
232        }
233        foreach (IRun run in optimizerTask.Item.Runs) {
234          if (!batchRun.Runs.Contains(run)) {
235            run.Name = GetNewRunName(run, batchRun.Runs);
236            batchRun.Runs.Add(run);
237          }
238        }
239      }
240      finally {
241        itemTaskLock.ExitWriteLock();
242      }
243    }
244
245    /// <summary>
246    /// replace the child-optimizer in the experiment
247    /// Sideeffect: the optimizerTask.Optimizer will be prepared (scopes are deleted and executionstate will be reset)
248    /// </summary>
249    private void UpdateOptimizerInExperiment(Optimization.Experiment experiment, OptimizerTask optimizerTask) {
250      itemTaskLock.EnterWriteLock();
251      try {
252        if (optimizerTask.IndexInParentOptimizerList < 0)
253          throw new IndexOutOfRangeException("IndexInParentOptimizerList must be equal or greater than zero! The Task is invalid and the optimizer-tree cannot be reassembled.");
254
255        while (experiment.Optimizers.Count < optimizerTask.IndexInParentOptimizerList) {
256          experiment.Optimizers.Add(new UserDefinedAlgorithm("Placeholder")); // add dummy-entries to Optimizers so that its possible to insert the optimizerTask at the correct position
257        }
258        if (experiment.Optimizers.Count < optimizerTask.IndexInParentOptimizerList + 1) {
259          experiment.Optimizers.Add(optimizerTask.Item);
260        } else {
261          // if ComputeInParallel==true, don't replace the optimizer (except it is still a Placeholder)
262          // this is because Jobs with ComputeInParallel get submitted to hive with their child-optimizers deleted
263          if (!optimizerTask.ComputeInParallel || experiment.Optimizers[optimizerTask.IndexInParentOptimizerList].Name == "Placeholder") {
264            experiment.Optimizers[optimizerTask.IndexInParentOptimizerList] = optimizerTask.Item;
265          }
266        }
267      }
268      finally {
269        itemTaskLock.ExitWriteLock();
270      }
271    }
272
273    /// <summary>
274    /// Sets the IndexInParentOptimizerList property of the OptimizerJob
275    /// according to the position in the OptimizerList of the parentHiveTask.Task
276    /// Recursively updates all the child-jobs as well
277    /// </summary>
278    internal void SetIndexInParentOptimizerList(OptimizerHiveTask parentHiveTask) {
279      if (parentHiveTask != null) {
280        if (parentHiveTask.ItemTask.Item is Optimization.Experiment) {
281          this.ItemTask.IndexInParentOptimizerList = parentHiveTask.ItemTask.OptimizerAsExperiment.Optimizers.IndexOf(this.ItemTask.Item);
282        } else if (parentHiveTask.ItemTask.Item is Optimization.BatchRun) {
283          this.ItemTask.IndexInParentOptimizerList = 0;
284        } else {
285          throw new NotSupportedException("Only Experiment and BatchRuns are supported");
286        }
287      }
288      childHiveTasksLock.EnterReadLock();
289      try {
290        foreach (OptimizerHiveTask child in childHiveTasks) {
291          child.SetIndexInParentOptimizerList(this);
292        }
293      }
294      finally { childHiveTasksLock.ExitReadLock(); }
295    }
296
297    public override void AddChildHiveTask(HiveTask hiveTask) {
298      base.AddChildHiveTask(hiveTask);
299      var optimizerHiveJob = (OptimizerHiveTask)hiveTask;
300      syncTasksWithOptimizers = false;
301      if (this.ItemTask != null && optimizerHiveJob.ItemTask != null) {
302        // if task is in state Paused, it has to preserve its ResultCollection, which is cleared when a optimizer is added to an experiment
303        OptimizerTask optimizerJobClone = null;
304        if (optimizerHiveJob.Task.State == TaskState.Paused) {
305          optimizerJobClone = (OptimizerTask)optimizerHiveJob.ItemTask.Clone();
306        }
307
308        if (this.ItemTask.Item is Optimization.Experiment) {
309          if (!this.ItemTask.OptimizerAsExperiment.Optimizers.Contains(optimizerHiveJob.ItemTask.Item)) {
310            UpdateOptimizerInExperiment(this.ItemTask.OptimizerAsExperiment, optimizerHiveJob.ItemTask);
311          }
312        } else if (this.ItemTask.Item is Optimization.BatchRun) {
313          UpdateOptimizerInBatchRun(this.ItemTask.OptimizerAsBatchRun, optimizerHiveJob.ItemTask);
314        }
315
316        if (optimizerHiveJob.Task.State == TaskState.Paused) {
317          optimizerHiveJob.ItemTask = optimizerJobClone;
318        }
319      }
320      syncTasksWithOptimizers = true;
321    }
322
323    /// <summary>
324    /// Creates a TaskData object containing the Task and the IJob-Object as byte[]
325    /// </summary>
326    /// <param name="withoutChildOptimizers">
327    ///   if true the Child-Optimizers will not be serialized (if the task contains an Experiment)
328    /// </param>
329    public override TaskData GetAsTaskData(bool withoutChildOptimizers, out List<IPluginDescription> plugins) {
330      plugins = new List<IPluginDescription>();
331      if (this.itemTask == null) // || this.jobItem.Optimizer == null
332        return null;
333
334      IEnumerable<Type> usedTypes;
335      byte[] jobByteArray;
336      if (withoutChildOptimizers && this.ItemTask.Item is Optimization.Experiment) {
337        OptimizerTask clonedJob = (OptimizerTask)this.ItemTask.Clone(); // use a cloned task, so that the childHiveJob don't get confused
338        clonedJob.OptimizerAsExperiment.Optimizers.Clear();
339        jobByteArray = PersistenceUtil.Serialize(clonedJob, out usedTypes);
340      } else if (withoutChildOptimizers && this.ItemTask.Item is Optimization.BatchRun) {
341        OptimizerTask clonedJob = (OptimizerTask)this.ItemTask.Clone();
342        clonedJob.OptimizerAsBatchRun.Optimizer = null;
343        jobByteArray = PersistenceUtil.Serialize(clonedJob, out usedTypes);
344      } else if (this.ItemTask.Item is IAlgorithm) {
345        ((IAlgorithm)this.ItemTask.Item).StoreAlgorithmInEachRun = false; // avoid storing the algorithm in runs to reduce size
346        jobByteArray = PersistenceUtil.Serialize(this.ItemTask, out usedTypes);
347      } else {
348        jobByteArray = PersistenceUtil.Serialize(this.ItemTask, out usedTypes);
349      }
350
351      TaskData jobData = new TaskData() { TaskId = task.Id, Data = jobByteArray };
352      PluginUtil.CollectDeclaringPlugins(plugins, usedTypes);
353      return jobData;
354    }
355
356    public OptimizerHiveTask GetChildByOptimizerTask(OptimizerTask optimizerTask) {
357      childHiveTasksLock.EnterReadLock();
358      try {
359        foreach (OptimizerHiveTask child in childHiveTasks) {
360          if (child.ItemTask == optimizerTask)
361            return child;
362        }
363        return null;
364      }
365      finally { childHiveTasksLock.ExitReadLock(); }
366    }
367
368    public HiveTask<OptimizerTask> GetChildByOptimizer(IOptimizer optimizer) {
369      childHiveTasksLock.EnterReadLock();
370      try {
371        foreach (OptimizerHiveTask child in childHiveTasks) {
372          if (child.ItemTask.Item == optimizer)
373            return child;
374        }
375        return null;
376      }
377      finally { childHiveTasksLock.ExitReadLock(); }
378    }
379
380    public void ExecuteReadActionOnItemTask(Action action) {
381      itemTaskLock.EnterReadLock();
382      try {
383        action();
384      }
385      finally {
386        itemTaskLock.ExitReadLock();
387      }
388    }
389
390    #region Helpers
391    /// <summary>
392    /// Parses the run numbers out of runs and renames the run to the next number
393    /// </summary>
394    private static string GetNewRunName(IRun run, RunCollection runs) {
395      int idx = run.Name.IndexOf("Run ") + 4;
396
397      if (idx == 3 || runs.Count == 0)
398        return run.Name;
399
400      int maxRunNumber = int.MinValue;
401      foreach (IRun r in runs) {
402        int number = GetRunNumber(r.Name);
403        maxRunNumber = Math.Max(maxRunNumber, number);
404      }
405
406      return run.Name.Substring(0, idx) + (maxRunNumber + 1).ToString();
407    }
408
409    /// <summary>
410    /// Parses the number of a Run out of its name. Example "Genetic Algorithm Run 3" -> 3
411    /// </summary>
412    private static int GetRunNumber(string runName) {
413      int idx = runName.IndexOf("Run ") + 4;
414      if (idx == 3) {
415        return 0;
416      } else {
417        return int.Parse(runName.Substring(idx, runName.Length - idx));
418      }
419    }
420    #endregion
421  }
422}
Note: See TracBrowser for help on using the repository browser.