Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
06/08/09 18:34:28 (15 years ago)
Author:
gkronber
Message:

Worked on asynchronous result polling in HiveEngine. #545 (Engine which can be executed in the Hive).

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.Hive.Engine/3.2/HiveEngine.cs

    r2018 r2032  
    3333using System.Xml;
    3434using System.IO.Compression;
     35using HeuristicLab.Tracing;
    3536
    3637namespace HeuristicLab.Hive.Engine {
     
    4445    private Guid jobId;
    4546    private Job job;
     47    private object locker = new object();
     48    private volatile bool abortRequested;
     49
    4650    public string HiveServerUrl { get; set; }
    4751
    4852    public HiveEngine() {
    4953      job = new Job();
     54      abortRequested = false;
    5055    }
    5156
     
    96101        IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
    97102        ResponseObject<JobResult> response = null;
     103        Job restoredJob = null;
    98104        do {
    99           response = executionEngineFacade.GetLastResult(jobId, true);
    100           if (response.Success && response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE) {
    101             Thread.Sleep(RESULT_POLLING_INTERVAL_MS);
     105          Thread.Sleep(RESULT_POLLING_INTERVAL_MS);
     106          lock (locker) {
     107            HiveLogger.Debug("HiveEngine: Results-polling - GetLastResult");
     108            response = executionEngineFacade.GetLastResult(jobId, false);
     109            HiveLogger.Debug("HiveEngine: Results-polling - Server: " + response.StatusMessage + " success: " + response.Success);
     110            // loop while
     111            // 1. the user doesn't request an abort
     112            // 2. there is a problem with server communication (success==false)
     113            // 3. no result for the job is available yet (response.Obj==null)
     114            // 4. the result that we get from the server is a snapshot and not the final result
     115            if (abortRequested) return;
     116            if (response.Success && response.Obj != null) {
     117              HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
     118              restoredJob = (Job)PersistenceManager.RestoreFromGZip(response.Obj.Result);
     119              HiveLogger.Debug("HiveEngine: Results-polling - IsSnapshotResult: " + restoredJob.Engine.Canceled);
     120            }
    102121          }
    103         } while (response.Success && response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE);
    104         if (response.Success) {
    105           JobResult jobResult = response.Obj;
    106           if (jobResult != null) {
    107             job = (Job)PersistenceManager.RestoreFromGZip(jobResult.Result);
    108             OnFinished();
    109           }
    110         } else {
    111           Exception ex = new Exception(response.Obj.Exception.Message);
    112           ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    113         }
     122        } while (restoredJob == null || restoredJob.Engine.Canceled);
     123
     124        job = restoredJob;
     125        OnChanged();
     126        OnFinished();
    114127      });
     128      HiveLogger.Debug("HiveEngine: Starting results-polling thread");
    115129      t.Start();
    116130    }
     
    119133      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
    120134
    121       // poll until snapshot is ready
    122135      ResponseObject<JobResult> response;
    123 
    124       // request snapshot
    125       Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
    126       if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
    127         response = executionEngineFacade.GetLastResult(jobId, false);
    128       } else {
    129         do {
    130           response = executionEngineFacade.GetLastResult(jobId, true);
    131           if (response.Success && response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE) {
     136      lock (locker) {
     137        HiveLogger.Debug("HiveEngine: Abort - RequestSnapshot");
     138        Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
     139        if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
     140          // job is finished already
     141          HiveLogger.Debug("HiveEngine: Abort - GetLastResult(false)");
     142          response = executionEngineFacade.GetLastResult(jobId, false);
     143          HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
     144        } else {
     145          // server sent snapshot request to client
     146          // poll until snapshot is ready
     147          do {
    132148            Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS);
    133           }
    134         } while (response.Success && response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE);
     149            HiveLogger.Debug("HiveEngine: Abort - GetLastResult(true)");
     150            response = executionEngineFacade.GetLastResult(jobId, true);
     151            HiveLogger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
     152            // loop while
     153            // 1. problem with communication with server
     154            // 2. job result not yet ready
     155          } while (
     156            !response.Success ||
     157            response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE);
     158        }
    135159      }
    136       if (response.Success) {
    137         JobResult jobResult = response.Obj;
    138         if (jobResult != null) {
    139           job = (Job)PersistenceManager.RestoreFromGZip(jobResult.Result);
    140           //PluginManager.ControlManager.ShowControl(job.Engine.CreateView());
    141         }
    142       } else {
    143         Exception ex = new Exception(response.Obj.Exception.Message);
    144         ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
     160      JobResult jobResult = response.Obj;
     161      if (jobResult != null) {
     162        HiveLogger.Debug("HiveEngine: Results-polling - Got result!");
     163        job = (Job)PersistenceManager.RestoreFromGZip(jobResult.Result);
     164        //PluginManager.ControlManager.ShowControl(job.Engine.CreateView());
    145165      }
     166      //HiveLogger.Debug("HiveEngine: Results-polling - Exception!");
     167      //Exception ex = new Exception(response.Obj.Exception.Message);
     168      //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    146169    }
    147170
     
    155178
    156179    public void Abort() {
     180      abortRequested = true;
     181      RequestSnapshot();
    157182      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
    158183      executionEngineFacade.AbortJob(jobId);
     184      OnChanged();
    159185      OnFinished();
    160186    }
    161187
    162188    public void Reset() {
     189      abortRequested = false;
    163190      job.Engine.Reset();
    164191      jobId = Guid.NewGuid();
Note: See TracChangeset for help on using the changeset viewer.