Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
07/23/10 09:37:57 (14 years ago)
Author:
cneumuel
Message:

resolved issues with 3.3, hive-server now executable (1096)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/3.3-HiveMigration/sources/HeuristicLab.Hive/HeuristicLab.Hive.Engine/3.3/HiveEngine.cs

    r4042 r4091  
    4141  /// in parallel.
    4242  /// </summary>
    43   public class HiveEngine : ItemBase, IEngine, IEditable {
     43  public class HiveEngine : HeuristicLab.Core.Engine {
    4444    private const int SNAPSHOT_POLLING_INTERVAL_MS = 1000;
    4545    private const int RESULT_POLLING_INTERVAL_MS = 10000;
     
    6060    }
    6161
    62     #region IEngine Members
    63 
    64     public IOperatorGraph OperatorGraph {
    65       get { return job.Engine.OperatorGraph; }
    66     }
    67 
    68     public IScope GlobalScope {
    69       get { return job.Engine.GlobalScope; }
    70     }
     62    //public OperatorGraph OperatorGraph {
     63    //  get { return job.Engine.OperatorGraph; }
     64    //}
     65
     66    //public IScope GlobalScope {
     67    //  get { return job.Engine.GlobalScope; }
     68    //}
    7169
    7270    public TimeSpan ExecutionTime {
     
    7573
    7674    public ThreadPriority Priority {
    77       get { return job.Engine.Priority; }
    78       set { job.Engine.Priority = value; }
    79     }
    80 
    81     public bool Running {
    82       get { return job.Engine.Running; }
    83     }
    84 
    85     public bool Canceled {
    86       get { return job.Engine.Canceled; }
    87     }
    88 
    89     public bool Terminated {
    90       get { return job.Engine.Terminated; }
    91     }
    92 
    93     public void Execute() {
     75      get { return this.Priority; }
     76      set { this.Priority = value; }
     77    }
     78
     79    //public bool Running {
     80    //  get { return job.Engine.ExecutionState == Core.ExecutionState.Started; }
     81    //}
     82
     83    //public bool Canceled {
     84    //  get { return job.Engine.Canceled; }
     85    //}
     86
     87    //public bool Terminated {
     88    //  get { return job.Engine.Terminated; }
     89    //}
     90
     91    public ExecutionState ExecutionState {
     92      get { return job.Engine.ExecutionState; }
     93    }
     94
     95    protected override void ProcessNextOperation() {
    9496      var jobObj = CreateJobObj();
    9597      IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
     
    9799      List<string> groups = new List<string>();
    98100      if (!String.Empty.Equals(RessourceIds)) {
    99         groups.AddRange(RessourceIds.Split(';'));       
     101        groups.AddRange(RessourceIds.Split(';'));
    100102      }
    101103
     
    110112          }   
    111113        }
    112       } */     
     114      } */
    113115
    114116      int loops = 1;
    115      
     117
    116118      Int32.TryParse(MultiSubmitCount, out loops);
    117       if(loops == 0)
     119      if (loops == 0)
    118120        loops = 1;
    119121
     
    122124        jobId = res.Obj.Id;
    123125      }
    124      
     126
    125127      StartResultPollingThread();
    126128    }
     
    144146            // 4. the result that we get from the server is a snapshot and not the final result
    145147            if (abortRequested) return;
    146               if (response.Success && response.Obj != null && response.StatusMessage != ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE) {
     148            if (response.Success && response.Obj != null && response.StatusMessage != ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE) {
    147149              Logger.Debug("HiveEngine: Results-polling - Got result!");
    148               restoredJob = (Job)PersistenceManager.RestoreFromGZip(response.Obj.SerializedJobData);
     150              throw new NotImplementedException("TODO[chn]use persistency-3.3");
     151              //restoredJob = (Job)PersistenceManager.RestoreFromGZip(response.Obj.SerializedJobData);
    149152              Logger.Debug("HiveEngine: Results-polling - IsSnapshotResult: " + (restoredJob.Progress < 1.0));
    150153            }
     
    153156
    154157        job = restoredJob;
    155         ControlManager.Manager.ShowControl(job.Engine.CreateView());
    156         OnChanged();
    157         OnFinished();
     158
     159        //ControlManager.Manager.ShowControl(job.Engine.CreateView());
     160        //OnChanged();
     161
     162        //OnFinished();
    158163      });
    159164      Logger.Debug("HiveEngine: Starting results-polling thread");
     
    161166    }
    162167
    163     public void RequestSnapshot() {
    164       IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
    165       int retryCount = 0;
    166       ResponseObject<SerializedJob> response;
    167       lock (locker) {
    168         Logger.Debug("HiveEngine: Abort - RequestSnapshot");
    169         Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
    170         if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
    171           // job is finished already
    172           Logger.Debug("HiveEngine: Abort - GetLastResult(false)");
    173           response = executionEngineFacade.GetLastSerializedResult(jobId, false, false);
    174           Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
    175         } else {
    176           // server sent snapshot request to client
    177           // poll until snapshot is ready
    178           do {
    179             Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS);
    180             Logger.Debug("HiveEngine: Abort - GetLastResult(true)");
    181             response = executionEngineFacade.GetLastSerializedResult(jobId, false, true);
    182             Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
    183             retryCount++;
    184             // loop while
    185             // 1. problem with communication with server
    186             // 2. job result not yet ready
    187           } while (
    188             (retryCount < MAX_SNAPSHOT_RETRIES) && (
    189             !response.Success ||
    190             response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE)
    191             );
    192         }
    193       }
    194       SerializedJob jobResult = response.Obj;
    195       if (jobResult != null) {
    196         Logger.Debug("HiveEngine: Results-polling - Got result!");
    197         job = (Job)PersistenceManager.RestoreFromGZip(jobResult.SerializedJobData);
    198         ControlManager.Manager.ShowControl(job.Engine.CreateView());
    199       }
    200       //HiveLogger.Debug("HiveEngine: Results-polling - Exception!");
    201       //Exception ex = new Exception(response.Obj.Exception.Message);
    202       //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
    203     }
    204 
    205     public void ExecuteStep() {
    206       throw new NotSupportedException();
    207     }
    208 
    209     public void ExecuteSteps(int steps) {
    210       throw new NotSupportedException();
    211     }
    212 
    213     public void Abort() {
    214       abortRequested = true;
    215       // RequestSnapshot();
    216       IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
    217       executionEngineFacade.AbortJob(jobId);
    218       OnChanged();
    219       OnFinished();
    220     }
    221 
    222     public void Reset() {
    223       abortRequested = false;
    224       job.Engine.Reset();
    225       jobId = Guid.NewGuid();
    226       OnInitialized();
    227     }
    228 
    229168    private HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob CreateJobObj() {
    230169      HeuristicLab.Hive.Contracts.BusinessObjects.JobDto jobObj =
    231170        new HeuristicLab.Hive.Contracts.BusinessObjects.JobDto();
    232171
    233       MemoryStream memStream = new MemoryStream();
    234       GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
    235       XmlDocument document = PersistenceManager.CreateXmlDocument();
    236       Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
    237       XmlNode rootNode = document.CreateElement("Root");
    238       document.AppendChild(rootNode);
    239       rootNode.AppendChild(PersistenceManager.Persist(job, document, dictionary));
    240       document.Save(stream);
    241       stream.Close();
    242 
    243       HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob executableJobObj =
    244         new HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob();
    245       executableJobObj.JobInfo = jobObj;
    246       executableJobObj.SerializedJobData = memStream.ToArray();
    247 
    248       List<IPluginDescription> plugins = new List<IPluginDescription>();
    249 
    250       foreach (IStorable storeable in dictionary.Values) {
    251         IPluginDescription pluginInfo = ApplicationManager.Manager.GetDeclaringPlugin(storeable.GetType());
    252         if (!plugins.Contains(pluginInfo)) {
    253           plugins.Add(pluginInfo);
    254           foreach (var dependency in pluginInfo.Dependencies) {
    255             if (!plugins.Contains(dependency)) plugins.Add(dependency);
    256           }
    257         }
    258       }
    259 
    260       List<HivePluginInfoDto> pluginsNeeded =
    261         new List<HivePluginInfoDto>();
    262       foreach (IPluginDescription uniquePlugin in plugins) {
    263         HivePluginInfoDto pluginInfo =
    264           new HivePluginInfoDto();
    265         pluginInfo.Name = uniquePlugin.Name;
    266         pluginInfo.Version = uniquePlugin.Version;
    267         pluginInfo.BuildDate = uniquePlugin.BuildDate;
    268         pluginsNeeded.Add(pluginInfo);
    269       }
    270 
    271       jobObj.CoresNeeded = 1;
    272       jobObj.PluginsNeeded = pluginsNeeded;
    273       jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
    274       return executableJobObj;
    275     }
    276 
    277     public event EventHandler Initialized;
    278     /// <summary>
    279     /// Fires a new <c>Initialized</c> event.
    280     /// </summary>
    281     protected virtual void OnInitialized() {
    282       if (Initialized != null)
    283         Initialized(this, new EventArgs());
    284     }
    285 
    286     public event EventHandler<EventArgs<IOperation>> OperationExecuted;
    287     /// <summary>
    288     /// Fires a new <c>OperationExecuted</c> event.
    289     /// </summary>
    290     /// <param name="operation">The operation that has been executed.</param>
    291     protected virtual void OnOperationExecuted(IOperation operation) {
    292       if (OperationExecuted != null)
    293         OperationExecuted(this, new EventArgs<IOperation>(operation));
    294     }
    295 
    296     public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
    297     /// <summary>
    298     /// Aborts the execution and fires a new <c>ExceptionOccurred</c> event.
    299     /// </summary>
    300     /// <param name="exception">The exception that was thrown.</param>
    301     protected virtual void OnExceptionOccurred(Exception exception) {
    302       Abort();
    303       if (ExceptionOccurred != null)
    304         ExceptionOccurred(this, new EventArgs<Exception>(exception));
    305     }
    306 
    307     public event EventHandler ExecutionTimeChanged;
    308     /// <summary>
    309     /// Fires a new <c>ExecutionTimeChanged</c> event.
    310     /// </summary>
    311     protected virtual void OnExecutionTimeChanged() {
    312       if (ExecutionTimeChanged != null)
    313         ExecutionTimeChanged(this, new EventArgs());
    314     }
    315 
    316     public event EventHandler Finished;
    317     /// <summary>
    318     /// Fires a new <c>Finished</c> event.
    319     /// </summary>
    320     protected virtual void OnFinished() {
    321       if (Finished != null)
    322         Finished(this, new EventArgs());
    323     }
    324 
    325     #endregion
    326 
    327     public override IView CreateView() {
    328       return new HiveEngineEditor(this);
    329     }
    330 
    331     #region IEditable Members
    332 
    333     public IEditor CreateEditor() {
    334       return new HiveEngineEditor(this);
    335     }
    336     #endregion
    337 
    338     public override System.Xml.XmlNode GetXmlNode(string name, System.Xml.XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
    339       XmlNode node = base.GetXmlNode(name, document, persistedObjects);
    340       XmlAttribute attr = document.CreateAttribute("HiveServerUrl");
    341       attr.Value = HiveServerUrl;
    342       node.Attributes.Append(attr);
    343       node.AppendChild(PersistenceManager.Persist("Job", job, document, persistedObjects));
    344       return node;
    345     }
    346 
    347     public override void Populate(System.Xml.XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
    348       base.Populate(node, restoredObjects);
    349       HiveServerUrl = node.Attributes["HiveServerUrl"].Value;
    350       job = (Job)PersistenceManager.Restore(node.SelectSingleNode("Job"), restoredObjects);
    351     }
     172
     173      throw new NotImplementedException("TODO[chn]use persistency-3.3");
     174
     175      //MemoryStream memStream = new MemoryStream();
     176      //GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
     177      //XmlDocument document = PersistenceManager.CreateXmlDocument();
     178      //Dictionary<Guid, IStorable> dictionary = new Dictionary<Guid, IStorable>();
     179      //XmlNode rootNode = document.CreateElement("Root");
     180      //document.AppendChild(rootNode);
     181      //rootNode.AppendChild(PersistenceManager.Persist(job, document, dictionary));
     182      //document.Save(stream);
     183      //stream.Close();
     184
     185      //HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob executableJobObj =
     186      //  new HeuristicLab.Hive.Contracts.BusinessObjects.SerializedJob();
     187      //executableJobObj.JobInfo = jobObj;
     188      //executableJobObj.SerializedJobData = memStream.ToArray();
     189
     190      //List<IPluginDescription> plugins = new List<IPluginDescription>();
     191
     192      //foreach (IStorable storeable in dictionary.Values) {
     193      //  IPluginDescription pluginInfo = ApplicationManager.Manager.GetDeclaringPlugin(storeable.GetType());
     194      //  if (!plugins.Contains(pluginInfo)) {
     195      //    plugins.Add(pluginInfo);
     196      //    foreach (var dependency in pluginInfo.Dependencies) {
     197      //      if (!plugins.Contains(dependency)) plugins.Add(dependency);
     198      //    }
     199      //  }
     200      //}
     201
     202      //List<HivePluginInfoDto> pluginsNeeded =
     203      //  new List<HivePluginInfoDto>();
     204      //foreach (IPluginDescription uniquePlugin in plugins) {
     205      //  HivePluginInfoDto pluginInfo =
     206      //    new HivePluginInfoDto();
     207      //  pluginInfo.Name = uniquePlugin.Name;
     208      //  pluginInfo.Version = uniquePlugin.Version;
     209      //  pluginInfo.BuildDate = uniquePlugin.BuildDate;
     210      //  pluginsNeeded.Add(pluginInfo);
     211      //}
     212
     213      //jobObj.CoresNeeded = 1;
     214      //jobObj.PluginsNeeded = pluginsNeeded;
     215      //jobObj.State = HeuristicLab.Hive.Contracts.BusinessObjects.State.offline;
     216      //return executableJobObj;
     217    }
     218
     219    //public event EventHandler Initialized;
     220    ///// <summary>
     221    ///// Fires a new <c>Initialized</c> event.
     222    ///// </summary>
     223    //protected virtual void OnInitialized() {
     224    //  if (Initialized != null)
     225    //    Initialized(this, new EventArgs());
     226    //}
     227
     228    //public event EventHandler<EventArgs<IOperation>> OperationExecuted;
     229    ///// <summary>
     230    ///// Fires a new <c>OperationExecuted</c> event.
     231    ///// </summary>
     232    ///// <param name="operation">The operation that has been executed.</param>
     233    //protected virtual void OnOperationExecuted(IOperation operation) {
     234    //  if (OperationExecuted != null)
     235    //    OperationExecuted(this, new EventArgs<IOperation>(operation));
     236    //}
     237
     238    //public event EventHandler<EventArgs<Exception>> ExceptionOccurred;
     239    ///// <summary>
     240    ///// Aborts the execution and fires a new <c>ExceptionOccurred</c> event.
     241    ///// </summary>
     242    ///// <param name="exception">The exception that was thrown.</param>
     243    //protected virtual void OnExceptionOccurred(Exception exception) {
     244    //  Abort();
     245    //  if (ExceptionOccurred != null)
     246    //    ExceptionOccurred(this, new EventArgs<Exception>(exception));
     247    //}
     248
     249    //public event EventHandler ExecutionTimeChanged;
     250    ///// <summary>
     251    ///// Fires a new <c>ExecutionTimeChanged</c> event.
     252    ///// </summary>
     253    //protected virtual void OnExecutionTimeChanged() {
     254    //  if (ExecutionTimeChanged != null)
     255    //    ExecutionTimeChanged(this, new EventArgs());
     256    //}
     257
     258    //public event EventHandler Finished;
     259    ///// <summary>
     260    ///// Fires a new <c>Finished</c> event.
     261    ///// </summary>
     262    //protected virtual void OnFinished() {
     263    //  if (Finished != null)
     264    //    Finished(this, new EventArgs());
     265    //}
     266
     267    //public void Abort() {
     268    //  abortRequested = true;
     269    //  // RequestSnapshot();
     270    //  IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
     271    //  executionEngineFacade.AbortJob(jobId);
     272    //  //OnChanged();
     273    //  OnFinished();
     274    //}
     275
     276    //public void Reset() {
     277    //  abortRequested = false;
     278
     279    //  throw new NotImplementedException("[chn] how to reset Engine?");
     280    //  //job.Engine.Reset();
     281
     282    //  jobId = Guid.NewGuid();
     283    //  OnInitialized();
     284    //}
     285
     286    //public void RequestSnapshot() {
     287    //  IExecutionEngineFacade executionEngineFacade = ServiceLocator.CreateExecutionEngineFacade(HiveServerUrl);
     288    //  int retryCount = 0;
     289    //  ResponseObject<SerializedJob> response;
     290    //  lock (locker) {
     291    //    Logger.Debug("HiveEngine: Abort - RequestSnapshot");
     292    //    Response snapShotResponse = executionEngineFacade.RequestSnapshot(jobId);
     293    //    if (snapShotResponse.StatusMessage == ApplicationConstants.RESPONSE_JOB_IS_NOT_BEEING_CALCULATED) {
     294    //      // job is finished already
     295    //      Logger.Debug("HiveEngine: Abort - GetLastResult(false)");
     296    //      response = executionEngineFacade.GetLastSerializedResult(jobId, false, false);
     297    //      Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
     298    //    } else {
     299    //      // server sent snapshot request to client
     300    //      // poll until snapshot is ready
     301    //      do {
     302    //        Thread.Sleep(SNAPSHOT_POLLING_INTERVAL_MS);
     303    //        Logger.Debug("HiveEngine: Abort - GetLastResult(true)");
     304    //        response = executionEngineFacade.GetLastSerializedResult(jobId, false, true);
     305    //        Logger.Debug("HiveEngine: Abort - Server: " + response.StatusMessage + " success: " + response.Success);
     306    //        retryCount++;
     307    //        // loop while
     308    //        // 1. problem with communication with server
     309    //        // 2. job result not yet ready
     310    //      } while (
     311    //        (retryCount < MAX_SNAPSHOT_RETRIES) && (
     312    //        !response.Success ||
     313    //        response.StatusMessage == ApplicationConstants.RESPONSE_JOB_RESULT_NOT_YET_HERE)
     314    //        );
     315    //    }
     316    //  }
     317    //  SerializedJob jobResult = response.Obj;
     318    //  if (jobResult != null) {
     319    //    Logger.Debug("HiveEngine: Results-polling - Got result!");
     320
     321    //    //job = (Job)PersistenceManager.RestoreFromGZip(jobResult.SerializedJobData);
     322    //    //ControlManager.Manager.ShowControl(job.Engine.CreateView());
     323    //    throw new NotImplementedException("TODO[chn]use persistency-3.3");
     324    //  }
     325    //  //HiveLogger.Debug("HiveEngine: Results-polling - Exception!");
     326    //  //Exception ex = new Exception(response.Obj.Exception.Message);
     327    //  //ThreadPool.QueueUserWorkItem(delegate(object state) { OnExceptionOccurred(ex); });
     328    //}
     329
     330    //public override System.Xml.XmlNode GetXmlNode(string name, System.Xml.XmlDocument document, IDictionary<Guid, IStorable> persistedObjects) {
     331    //  XmlNode node = base.GetXmlNode(name, document, persistedObjects);
     332    //  XmlAttribute attr = document.CreateAttribute("HiveServerUrl");
     333    //  attr.Value = HiveServerUrl;
     334    //  node.Attributes.Append(attr);
     335    //  node.AppendChild(PersistenceManager.Persist("Job", job, document, persistedObjects));
     336    //  return node;
     337    //}
     338
     339    //public override void Populate(System.Xml.XmlNode node, IDictionary<Guid, IStorable> restoredObjects) {
     340    //  base.Populate(node, restoredObjects);
     341    //  HiveServerUrl = node.Attributes["HiveServerUrl"].Value;
     342    //  job = (Job)PersistenceManager.Restore(node.SelectSingleNode("Job"), restoredObjects);
     343    //}
     344
     345
    352346  }
    353347}
Note: See TracChangeset for help on using the changeset viewer.