Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.Clients.Hive/3.3/HiveJobs/OptimizerHiveTask.cs @ 7125

Last change on this file since 7125 was 7125, checked in by ascheibe, 13 years ago

#1672

  • removed magic numbers for upload retries
  • speed up job downloading by placing deserializing/downloading semaphores correctly
  • increased max. number of parallel downloads/deserializations
  • added more status messages when downloading to make it more clear what's actually happening
  • renamed some variables
File size: 17.2 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2011 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using HeuristicLab.Clients.Hive.Jobs;
26using HeuristicLab.Collections;
27using HeuristicLab.Common;
28using HeuristicLab.Optimization;
29using HeuristicLab.PluginInfrastructure;
30
31namespace HeuristicLab.Clients.Hive {
32  public class OptimizerHiveTask : HiveTask<OptimizerTask> {
33
34    #region Constructors and Cloning
35    public OptimizerHiveTask() { }
36    public OptimizerHiveTask(IOptimizer optimizer)
37      : this() {
38      this.ItemTask = new OptimizerTask(optimizer);
39    }
40    public OptimizerHiveTask(OptimizerTask optimizerJob)
41      : this() {
42      this.ItemTask = optimizerJob;
43    }
44    protected OptimizerHiveTask(OptimizerHiveTask original, Cloner cloner)
45      : base(original, cloner) {
46    }
47    public override IDeepCloneable Clone(Cloner cloner) {
48      return new OptimizerHiveTask(this, cloner);
49    }
50    #endregion
51
52    /// <summary>
53    /// if this.Optimizer is an experiment
54    ///   Uses the child-optimizers of this.HiveTask and creates HiveTask-childs
55    /// if this.Optimizer is a batchrun
56    ///   Creates a number of child-jobs according to repetitions
57    /// </summary>
58    protected override void UpdateChildHiveTasks() {
59      base.UpdateChildHiveTasks();
60      if (Task != null && syncTasksWithOptimizers) {
61        if (!ItemTask.ComputeInParallel) {
62          this.childHiveTasks.Clear();
63        } else {
64          if (ItemTask.Item is Optimization.Experiment) {
65            Optimization.Experiment experiment = (Optimization.Experiment)ItemTask.Item;
66            foreach (IOptimizer childOpt in experiment.Optimizers) {
67              this.childHiveTasks.Add(new OptimizerHiveTask(childOpt));
68            }
69          } else if (ItemTask.Item is Optimization.BatchRun) {
70            Optimization.BatchRun batchRun = ItemTask.OptimizerAsBatchRun;
71            if (batchRun.Optimizer != null) {
72              while (this.childHiveTasks.Count < batchRun.Repetitions) {
73                this.childHiveTasks.Add(new OptimizerHiveTask(batchRun.Optimizer));
74              }
75              while (this.childHiveTasks.Count > batchRun.Repetitions) {
76                this.childHiveTasks.Remove(this.childHiveTasks.Last());
77              }
78            }
79          }
80        }
81      }
82    }
83
84    protected override void RegisterItemTaskEvents() {
85      base.RegisterItemTaskEvents();
86      if (ItemTask != null) {
87        if (ItemTask.Item is Optimization.Experiment) {
88          Optimization.Experiment experiment = ItemTask.OptimizerAsExperiment;
89          experiment.Optimizers.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_ItemsAdded);
90          experiment.Optimizers.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_ItemsReplaced);
91          experiment.Optimizers.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_ItemsRemoved);
92          experiment.Optimizers.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_CollectionReset);
93        } else if (ItemTask.Item is Optimization.BatchRun) {
94          Optimization.BatchRun batchRun = ItemTask.OptimizerAsBatchRun;
95          batchRun.RepetitionsChanged += new EventHandler(batchRun_RepetitionsChanged);
96          batchRun.OptimizerChanged += new EventHandler(batchRun_OptimizerChanged);
97        }
98      }
99    }
100    protected override void DergisterItemTaskEvents() {
101      base.DergisterItemTaskEvents();
102      if (ItemTask != null) {
103        if (ItemTask.Item is Optimization.Experiment) {
104          Optimization.Experiment experiment = ItemTask.OptimizerAsExperiment;
105          experiment.Optimizers.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_ItemsAdded);
106          experiment.Optimizers.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_ItemsReplaced);
107          experiment.Optimizers.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_ItemsRemoved);
108          experiment.Optimizers.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<IOptimizer>>(Optimizers_CollectionReset);
109        } else if (ItemTask.Item is Optimization.BatchRun) {
110          Optimization.BatchRun batchRun = ItemTask.OptimizerAsBatchRun;
111          batchRun.RepetitionsChanged -= new EventHandler(batchRun_RepetitionsChanged);
112          batchRun.OptimizerChanged -= new EventHandler(batchRun_OptimizerChanged);
113        }
114      }
115    }
116
117    private void batchRun_OptimizerChanged(object sender, EventArgs e) {
118      if (syncTasksWithOptimizers) {
119        UpdateChildHiveTasks();
120      }
121    }
122
123    private void batchRun_RepetitionsChanged(object sender, EventArgs e) {
124      if (syncTasksWithOptimizers) {
125        UpdateChildHiveTasks();
126      }
127    }
128
129    private void Optimizers_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<IOptimizer>> e) {
130      if (syncTasksWithOptimizers && this.ItemTask.ComputeInParallel) {
131        childHiveTasksLock.EnterWriteLock();
132        try {
133          foreach (var item in e.Items) {
134            if (GetChildByOptimizer(item.Value) == null && item.Value.Name != "Placeholder") {
135              this.childHiveTasks.Add(new OptimizerHiveTask(item.Value));
136            }
137          }
138        } finally { childHiveTasksLock.ExitWriteLock(); }
139      }
140    }
141    private void Optimizers_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<IOptimizer>> e) {
142      if (syncTasksWithOptimizers && this.ItemTask.ComputeInParallel) {
143        childHiveTasksLock.EnterWriteLock();
144        try {
145          foreach (var item in e.OldItems) {
146            this.childHiveTasks.Remove(this.GetChildByOptimizer(item.Value));
147          }
148          foreach (var item in e.Items) {
149            if (GetChildByOptimizer(item.Value) == null && item.Value.Name != "Placeholder") {
150              this.childHiveTasks.Add(new OptimizerHiveTask(item.Value));
151            }
152          }
153        } finally { childHiveTasksLock.ExitWriteLock(); }
154      }
155    }
156    private void Optimizers_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<IOptimizer>> e) {
157      if (syncTasksWithOptimizers && this.ItemTask.ComputeInParallel) {
158        childHiveTasksLock.EnterWriteLock();
159        try {
160          foreach (var item in e.Items) {
161            this.childHiveTasks.Remove(this.GetChildByOptimizer(item.Value));
162          }
163        } finally { childHiveTasksLock.ExitWriteLock(); }
164      }
165    }
166    private void Optimizers_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<IOptimizer>> e) {
167      if (syncTasksWithOptimizers && this.ItemTask.ComputeInParallel) {
168        childHiveTasksLock.EnterWriteLock();
169        try {
170          foreach (var item in e.Items) {
171            this.childHiveTasks.Remove(this.GetChildByOptimizer(item.Value));
172          }
173        } finally { childHiveTasksLock.ExitWriteLock(); }
174      }
175    }
176
177    /// <summary>
178    /// if this.Optimizer is Experiment
179    ///   replace the child-optimizer in the experiment
180    /// if this.Optimizer is BatchRun
181    ///   add the runs from the optimizerTask to the batchrun and replace the Optimizer
182    /// </summary>
183    public override void IntegrateChild(ItemTask task, Guid childJobId) {
184      var optimizerTask = (OptimizerTask)task;
185      syncTasksWithOptimizers = false; // don't sync with optimizers during this method
186
187      if (this.ItemTask != null && this.ItemTask.Item != null) {
188        if (this.ItemTask.Item is Optimization.Experiment) {
189          UpdateOptimizerInExperiment(this.ItemTask.OptimizerAsExperiment, optimizerTask);
190        } else if (this.ItemTask.Item is Optimization.BatchRun) {
191          UpdateOptimizerInBatchRun(this.ItemTask.OptimizerAsBatchRun, optimizerTask);
192        }
193      }
194
195      childHiveTasksLock.EnterReadLock();
196      OptimizerHiveTask child = (OptimizerHiveTask)this.ChildHiveTasks.Single(j => j.Task.Id == childJobId);
197      try {
198        if (!optimizerTask.ComputeInParallel) {
199          child.syncTasksWithOptimizers = false;
200          child.ItemTask = optimizerTask;
201          child.syncTasksWithOptimizers = true;
202        }
203      } finally { childHiveTasksLock.ExitReadLock(); }
204      syncTasksWithOptimizers = true;
205    }
206
207    /// <summary>
208    /// Adds the runs from the optimizerTask to the batchrun and replaces the Optimizer
209    /// Sideeffect: the optimizerTask.Optimizer will be prepared (scopes are deleted and executionstate will be reset)
210    /// </summary>
211    private void UpdateOptimizerInBatchRun(BatchRun batchRun, OptimizerTask optimizerTask) {
212      if (batchRun.Optimizer == null) {
213        batchRun.Optimizer = (IOptimizer)optimizerTask.Item; // only set the first optimizer as Optimizer. if every time the Optimizer would be set, the runs would be cleared each time
214      }
215      foreach (IRun run in optimizerTask.Item.Runs) {
216        if (!batchRun.Runs.Contains(run)) {
217          run.Name = GetNewRunName(run, batchRun.Runs);
218          batchRun.Runs.Add(run);
219        }
220      }
221    }
222
223    /// <summary>
224    /// replace the child-optimizer in the experiment
225    /// Sideeffect: the optimizerTask.Optimizer will be prepared (scopes are deleted and executionstate will be reset)
226    /// </summary>
227    private void UpdateOptimizerInExperiment(Optimization.Experiment experiment, OptimizerTask optimizerTask) {
228      if (optimizerTask.IndexInParentOptimizerList < 0)
229        throw new IndexOutOfRangeException("IndexInParentOptimizerList must be equal or greater than zero! The Task is invalid and the optimizer-tree cannot be reassembled.");
230
231      while (experiment.Optimizers.Count < optimizerTask.IndexInParentOptimizerList) {
232        experiment.Optimizers.Add(new UserDefinedAlgorithm("Placeholder")); // add dummy-entries to Optimizers so that its possible to insert the optimizerTask at the correct position
233      }
234      if (experiment.Optimizers.Count < optimizerTask.IndexInParentOptimizerList + 1) {
235        experiment.Optimizers.Add(optimizerTask.Item);
236      } else {
237        // if ComputeInParallel==true, don't replace the optimizer (except it is still a Placeholder)
238        // this is because Jobs with ComputeInParallel get submitted to hive with their child-optimizers deleted
239        if (!optimizerTask.ComputeInParallel || experiment.Optimizers[optimizerTask.IndexInParentOptimizerList].Name == "Placeholder") {
240          experiment.Optimizers[optimizerTask.IndexInParentOptimizerList] = optimizerTask.Item;
241        }
242      }
243    }
244
245    /// <summary>
246    /// Sets the IndexInParentOptimizerList property of the OptimizerJob
247    /// according to the position in the OptimizerList of the parentHiveTask.Task
248    /// Recursively updates all the child-jobs as well
249    /// </summary>
250    internal void SetIndexInParentOptimizerList(OptimizerHiveTask parentHiveTask) {
251      if (parentHiveTask != null) {
252        if (parentHiveTask.ItemTask.Item is Optimization.Experiment) {
253          this.ItemTask.IndexInParentOptimizerList = parentHiveTask.ItemTask.OptimizerAsExperiment.Optimizers.IndexOf(this.ItemTask.Item);
254        } else if (parentHiveTask.ItemTask.Item is Optimization.BatchRun) {
255          this.ItemTask.IndexInParentOptimizerList = 0;
256        } else {
257          throw new NotSupportedException("Only Experiment and BatchRuns are supported");
258        }
259      }
260      childHiveTasksLock.EnterReadLock();
261      try {
262        foreach (OptimizerHiveTask child in childHiveTasks) {
263          child.SetIndexInParentOptimizerList(this);
264        }
265      } finally { childHiveTasksLock.ExitReadLock(); }
266    }
267
268    public override void AddChildHiveTask(HiveTask hiveTask) {
269      base.AddChildHiveTask(hiveTask);
270      var optimizerHiveJob = (OptimizerHiveTask)hiveTask;
271      syncTasksWithOptimizers = false;
272      if (this.ItemTask != null && optimizerHiveJob.ItemTask != null) {
273        // if task is in state Paused, it has to preserve its ResultCollection, which is cleared when a optimizer is added to an experiment
274        OptimizerTask optimizerJobClone = null;
275        if (optimizerHiveJob.Task.State == TaskState.Paused) {
276          optimizerJobClone = (OptimizerTask)optimizerHiveJob.ItemTask.Clone();
277        }
278
279        if (this.ItemTask.Item is Optimization.Experiment) {
280          if (!this.ItemTask.OptimizerAsExperiment.Optimizers.Contains(optimizerHiveJob.ItemTask.Item)) {
281            UpdateOptimizerInExperiment(this.ItemTask.OptimizerAsExperiment, optimizerHiveJob.ItemTask);
282          }
283        } else if (this.ItemTask.Item is Optimization.BatchRun) {
284          UpdateOptimizerInBatchRun(this.ItemTask.OptimizerAsBatchRun, optimizerHiveJob.ItemTask);
285        }
286
287        if (optimizerHiveJob.Task.State == TaskState.Paused) {
288          optimizerHiveJob.ItemTask = optimizerJobClone;
289        }
290      }
291      syncTasksWithOptimizers = true;
292    }
293
294    /// <summary>
295    /// Creates a TaskData object containing the Task and the IJob-Object as byte[]
296    /// </summary>
297    /// <param name="withoutChildOptimizers">
298    ///   if true the Child-Optimizers will not be serialized (if the task contains an Experiment)
299    /// </param>
300    public override TaskData GetAsTaskData(bool withoutChildOptimizers, out List<IPluginDescription> plugins) {
301      plugins = new List<IPluginDescription>();
302      if (this.itemTask == null) // || this.jobItem.Optimizer == null
303        return null;
304
305      IEnumerable<Type> usedTypes;
306      byte[] jobByteArray;
307      if (withoutChildOptimizers && this.ItemTask.Item is Optimization.Experiment) {
308        OptimizerTask clonedJob = (OptimizerTask)this.ItemTask.Clone(); // use a cloned task, so that the childHiveJob don't get confused
309        clonedJob.OptimizerAsExperiment.Optimizers.Clear();
310        jobByteArray = PersistenceUtil.Serialize(clonedJob, out usedTypes);
311      } else if (withoutChildOptimizers && this.ItemTask.Item is Optimization.BatchRun) {
312        OptimizerTask clonedJob = (OptimizerTask)this.ItemTask.Clone();
313        clonedJob.OptimizerAsBatchRun.Optimizer = null;
314        jobByteArray = PersistenceUtil.Serialize(clonedJob, out usedTypes);
315      } else if (this.ItemTask.Item is IAlgorithm) {
316        ((IAlgorithm)this.ItemTask.Item).StoreAlgorithmInEachRun = false; // avoid storing the algorithm in runs to reduce size
317        jobByteArray = PersistenceUtil.Serialize(this.ItemTask, out usedTypes);
318      } else {
319        jobByteArray = PersistenceUtil.Serialize(this.ItemTask, out usedTypes);
320      }
321
322      TaskData jobData = new TaskData() { TaskId = task.Id, Data = jobByteArray };
323      PluginUtil.CollectDeclaringPlugins(plugins, usedTypes);
324      return jobData;
325    }
326
327    public OptimizerHiveTask GetChildByOptimizerJob(OptimizerTask optimizerJob) {
328      childHiveTasksLock.EnterReadLock();
329      try {
330        foreach (OptimizerHiveTask child in childHiveTasks) {
331          if (child.ItemTask == optimizerJob)
332            return child;
333        }
334        return null;
335      } finally { childHiveTasksLock.ExitReadLock(); }
336    }
337
338    public HiveTask<OptimizerTask> GetChildByOptimizer(IOptimizer optimizer) {
339      childHiveTasksLock.EnterReadLock();
340      try {
341        foreach (OptimizerHiveTask child in childHiveTasks) {
342          if (child.ItemTask.Item == optimizer)
343            return child;
344        }
345        return null;
346      } finally { childHiveTasksLock.ExitReadLock(); }
347    }
348
349    #region Helpers
350    /// <summary>
351    /// Parses the run numbers out of runs and renames the run to the next number
352    /// </summary>
353    private static string GetNewRunName(IRun run, RunCollection runs) {
354      int idx = run.Name.IndexOf("Run ") + 4;
355
356      if (idx == -1 || runs.Count == 0)
357        return run.Name;
358
359      int maxRunNumber = int.MinValue;
360      foreach (IRun r in runs) {
361        int number = GetRunNumber(r.Name);
362        maxRunNumber = Math.Max(maxRunNumber, number);
363      }
364
365      return run.Name.Substring(0, idx) + (maxRunNumber + 1).ToString();
366    }
367
368    /// <summary>
369    /// Parses the number of a Run out of its name. Example "Genetic Algorithm Run 3" -> 3
370    /// </summary>
371    private static int GetRunNumber(string runName) {
372      int idx = runName.IndexOf("Run ") + 4;
373      if (idx == -1) {
374        return 0;
375      } else {
376        return int.Parse(runName.Substring(idx, runName.Length - idx));
377      }
378    }
379    #endregion
380  }
381}
Note: See TracBrowser for help on using the repository browser.