Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
08/02/10 17:27:24 (14 years ago)
Author:
cneumuel
Message:
  • Made HiveExperiment storable, so that a running HiveExperiment can be disconnected, stored and later resumed. (#1115)
  • Added Log to each JobItem (#1115)
Location:
branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HeuristicLab.Hive.Experiment-3.3.csproj

    r4120 r4133  
    8585      <HintPath>..\..\..\..\..\..\..\..\..\Programme\HeuristicLab 3.3\HeuristicLab.PluginInfrastructure-3.3.dll</HintPath>
    8686    </Reference>
    87     <Reference Include="HeuristicLab.Tracing-3.3, Version=3.2.0.0, Culture=neutral, PublicKeyToken=ba48961d6f65dcec, processorArchitecture=MSIL" />
     87    <Reference Include="HeuristicLab.Tracing-3.3" />
    8888    <Reference Include="System" />
    8989    <Reference Include="System.Core" />
  • branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs

    r4121 r4133  
    5353    private const string itemDescription = "An experiment which contains multiple batch runs of algorithms which are executed in the Hive.";
    5454    private const int resultPollingIntervalMs = 15000;
    55 
     55    private const int maxSnapshotRetries = 20;
    5656    private object locker = new object();
    57     private const int maxSnapshotRetries = 20;
     57
    5858    private System.Timers.Timer timer;
    5959    private bool pausePending, stopPending;
     60
     61    [Storable]
    6062    private DateTime lastUpdateTime;
    6163
     64    private bool isPollingResults;
     65    public bool IsPollingResults {
     66      get { return isPollingResults; }
     67      private set {
     68        if (isPollingResults != value) {
     69          isPollingResults = value;
     70          OnIsPollingResultsChanged();
     71        }
     72      }
     73    }
     74
     75    private bool stopResultsPollingPending = false;
     76
     77    private IDictionary<Guid, Thread> resultPollingThreads;
     78   
    6279    [Storable]
    6380    private IDictionary<Guid, IOptimizer> pendingOptimizers = new Dictionary<Guid, IOptimizer>();
     
    6885      get { return jobItems; }
    6986    }
    70    
     87
    7188
    7289    [Storable]
     
    7693      set {
    7794        if (serverUrl != value) {
    78           serverUrl = value; 
     95          serverUrl = value;
    7996          OnServerUrlChanged();
    8097        }
     
    88105      set {
    89106        if (resourceIds != value) {
    90           resourceIds = value; 
     107          resourceIds = value;
    91108          OnResourceIdsChanged();
    92109        }
     
    115132    public HiveExperiment(bool deserializing)
    116133      : base(deserializing) {
     134      this.resultPollingThreads = new Dictionary<Guid, Thread>();
     135      jobItems = new JobItemList();
    117136    }
    118137
     
    124143      pausePending = stopPending = false;
    125144      jobItems = new JobItemList();
     145      isPollingResults = false;
     146      resultPollingThreads = new Dictionary<Guid, Thread>();
    126147      InitTimer();
    127148    }
     
    142163      clone.pausePending = this.pausePending;
    143164      clone.jobItems = (JobItemList)cloner.Clone(jobItems);
     165      clone.lastUpdateTime = this.lastUpdateTime;
     166      clone.isPollingResults = this.isPollingResults;
    144167      return clone;
    145168    }
     
    148171    private void AfterDeserialization() {
    149172      InitTimer();
     173      this.IsPollingResults = false;
     174      this.stopResultsPollingPending = false;
    150175      LogMessage("I was deserialized.");
    151176    }
     
    215240      this.ExecutionState = Core.ExecutionState.Started;
    216241      Thread t = new Thread(() => {
    217         IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl);
     242        IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
    218243
    219244        pendingOptimizers = new Dictionary<Guid, IOptimizer>();
     
    234259          };
    235260          jobItems.Add(jobItem);
    236 
    237           LogMessage("Sent job to server (jobId: " + response.Obj.Id + ")");
    238         }
    239 
     261          jobItem.LogMessage("Job sent to Hive");
     262
     263          LogMessage("Sent job to Hive (jobId: " + response.Obj.Id + ")");
     264        }
     265       
    240266        // start results polling after sending sending the jobs to the server (to avoid race conflicts at the optimizers-collection)
    241         foreach (JobItem jobItem in jobItems) {
    242           StartResultPollingThread(jobItem.JobDto);
    243         }
     267        StartResultPolling();
    244268      });
    245269      t.Start();
     270    }
     271
     272    private void CreateResultPollingThreads() {
     273      foreach(JobItem jobItem in JobItems) {
     274        resultPollingThreads.Add(jobItem.JobDto.Id, CreateResultPollingThread(jobItem.JobDto));
     275      }
     276    }
     277
     278    public void StartResultPolling() {
     279      this.stopResultsPollingPending = false;
     280      CreateResultPollingThreads();
     281      foreach (Thread pollingThread in resultPollingThreads.Values) {
     282        pollingThread.Start();
     283      }
     284      this.IsPollingResults = true;
     285    }
     286
     287    public void StopResultPolling() {
     288      this.stopResultsPollingPending = true;
     289      foreach (Thread pollingThread in resultPollingThreads.Values) {
     290        pollingThread.Interrupt();
     291      }
     292      this.stopResultsPollingPending = false;
     293    }
     294
     295    private JobItem GetJobItemById(Guid jobId) {
     296      return jobItems.Single(x => x.JobDto.Id == jobId);
    246297    }
    247298
     
    267318
    268319    public void Stop() {
    269       // todo
    270     }
    271    
     320      foreach(JobItem jobItem in jobItems) {
     321        AbortJob(jobItem.JobDto.Id);
     322      }
     323    }
     324
     325    public void AbortJob(Guid jobId) {
     326      IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
     327      executionEngineFacade.AbortJob(jobId);
     328      resultPollingThreads[jobId].Interrupt();
     329      GetJobItemById(jobId).LogMessage("Aborting Job");
     330    }
    272331    #endregion
     332
     333    private IExecutionEngineFacade GetExecutionEngineFacade() {
     334      return ServiceLocator.CreateExecutionEngineFacade(ServerUrl);
     335    }
    273336
    274337    private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
     
    307370    }
    308371
    309     private void StartResultPollingThread(JobDto job) {
    310       Thread t = new Thread(() => {
    311         IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(ServerUrl);
    312         IJob restoredObject = null;
    313 
    314         do {
    315           Thread.Sleep(resultPollingIntervalMs);
    316           //lock (locker) { [chn] try without locking for better performance
    317             if (stopPending) return;
     372    private Thread CreateResultPollingThread(JobDto job) {
     373      return new Thread(() => {
     374        try {
     375          GetJobItemById(job.Id).LogMessage("Starting job results polling");
     376          IExecutionEngineFacade executionEngineFacade = GetExecutionEngineFacade();
     377          IJob restoredObject = null;
     378
     379          do {
     380            Thread.Sleep(resultPollingIntervalMs);
     381            if (stopPending || !this.IsPollingResults) {
     382              return;
     383            }
    318384
    319385            ResponseObject<JobDto> response = executionEngineFacade.GetJobById(job.Id);
    320386            LogMessage("Response: " + response.StatusMessage + " (jobId: " + job.Id + ")");
     387            GetJobItemById(job.Id).LogMessage("Response: " + response.StatusMessage);
    321388
    322389            if (response.Obj != null) {
    323390              UpdateJobItem(response.Obj);
    324391            }
    325            
     392
    326393            // loop while
    327394            // 1. the user doesn't request an abort
     
    334401              UpdateSnapshot(jobResponse);
    335402            }
    336           //}
    337         } while (restoredObject == null || restoredObject.ExecutionState != Core.ExecutionState.Stopped);
    338 
    339         LogMessage("Job finished (jobId: " + job.Id + ")");
    340         // job retrieved... replace the existing optimizers with the finished one
    341         IOptimizer originalOptimizer = pendingOptimizers[job.Id];
    342         IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
    343 
    344         ReplaceOptimizer(originalOptimizer, restoredOptimizer);
    345         pendingOptimizers.Remove(job.Id);
    346 
    347         if (pendingOptimizers.Count == 0) {
    348           // finished
    349           this.ExecutionState = Core.ExecutionState.Stopped;
    350           OnStopped();
     403          } while (restoredObject == null || restoredObject.ExecutionState != Core.ExecutionState.Stopped);
     404
     405          LogMessage("Job finished (jobId: " + job.Id + ")");
     406          GetJobItemById(job.Id).LogMessage("Job finished");
     407          // job retrieved... replace the existing optimizers with the finished one
     408          IOptimizer originalOptimizer = pendingOptimizers[job.Id];
     409          IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
     410
     411          ReplaceOptimizer(originalOptimizer, restoredOptimizer);
     412          pendingOptimizers.Remove(job.Id);
     413
     414          if (pendingOptimizers.Count == 0) {
     415            // finished
     416            this.ExecutionState = Core.ExecutionState.Stopped;
     417            OnStopped();
     418          }
     419        } catch (ThreadInterruptedException exception) {
     420
     421        } finally {
     422          GetJobItemById(job.Id).LogMessage("ResultsPolling Thread stopped");
     423          resultPollingThreads.Remove(job.Id);
     424          if (resultPollingThreads.Count == 0) {
     425            IsPollingResults = false;
     426          }
    351427        }
    352428      });
    353       t.Start();
    354429    }
    355430
     
    370445      }
    371446    }
    372    
     447
    373448    #region Required Plugin Search
    374449    /// <summary>
     
    523598    }
    524599
     600    public event EventHandler IsResultsPollingChanged;
     601    private void OnIsPollingResultsChanged() {
     602      if (this.IsPollingResults) {
     603        LogMessage("Results Polling Started");
     604        timer.Start();
     605      } else {
     606        LogMessage("Results Polling Stopped");
     607        timer.Stop();
     608      }
     609      EventHandler handler = IsResultsPollingChanged;
     610      if (handler != null) handler(this, EventArgs.Empty);
     611    }
    525612    #endregion
    526613  }
  • branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/JobItem.cs

    r4121 r4133  
    88using HeuristicLab.Hive.Contracts;
    99using System.Drawing;
     10using HeuristicLab.Common;
     11using System.Diagnostics;
    1012
    1113namespace HeuristicLab.Hive.Experiment {
     14  [StorableClass]
    1215  public class JobItem : Item {
     16    private static object locker = new object();
     17
    1318    public override Image ItemImage {
    1419      get {
     
    5964      get { return log; }
    6065    }
    61 
     66   
    6267    public JobItem() {
    6368      log = new Log();
     
    7378
    7479    public event EventHandler LatestSnapshotChanged;
    75     public void OnLatestSnapshotChanged() {
     80    private void OnLatestSnapshotChanged() {
     81      LogMessage("LatestSnapshotChanged");
    7682      EventHandler handler = LatestSnapshotChanged;
    7783      if (handler != null) handler(this, EventArgs.Empty);
     
    8086    public event EventHandler JobDtoChanged;
    8187    private void OnJobDtoChanged() {
     88      LogMessage("JobDtoChanged");
    8289      EventHandler handler = JobDtoChanged;
    8390      if (handler != null) handler(this, EventArgs.Empty);
    8491    }
     92
     93    public void LogMessage(string message) {
     94      lock (locker) {
     95        log.LogMessage(message);
     96      }
     97    }
     98
     99    public override IDeepCloneable Clone(Cloner cloner) {
     100      LogMessage("I am beeing cloned");
     101      JobItem clone = (JobItem)base.Clone(cloner);
     102      clone.latestSnapshotTime = this.latestSnapshotTime;
     103      clone.jobDto = (JobDto)cloner.Clone(this.jobDto);
     104      clone.latestSnapshot = (ResponseObject<SerializedJob>)cloner.Clone(this.latestSnapshot);
     105      clone.log = (ILog)cloner.Clone(this.log);
     106      return clone;
     107    }
    85108  }
    86109}
  • branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/JobItemList.cs

    r4120 r4133  
    1414    public JobItemList(int capacity) : base(capacity) { }
    1515    public JobItemList(IEnumerable<JobItem> collection) : base(collection) { }
     16
     17    public override Common.IDeepCloneable Clone(Common.Cloner cloner) {
     18      JobItemList clone = (JobItemList)base.Clone(cloner);
     19      return clone;
     20    }
    1621  }
    1722}
  • branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/Properties/AssemblyInfo.cs

    r4121 r4133  
    5555// You can specify all the values or you can default the Revision and Build Numbers
    5656// by using the '*' as shown below:
    57 [assembly: AssemblyVersion("3.3.0.4120")]
    58 [assembly: AssemblyFileVersion("3.3.0.4120")]
    59 [assembly: AssemblyBuildDate("2010/07/30 13:39:10")]
     57[assembly: AssemblyVersion("3.3.0.4121")]
     58[assembly: AssemblyFileVersion("3.3.0.4121")]
     59[assembly: AssemblyBuildDate("2010/08/02 17:23:46")]
Note: See TracChangeset for help on using the changeset viewer.