Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
09/07/10 10:22:27 (14 years ago)
Author:
cneumuel
Message:
  • created HiveClient which shows an overview over all submitted HiveExperiments
  • its possible to download all submitted HiveExperiments including results
  • Experiments are now sent as a whole to the Hive and the Hive-Slaves take care of creating child-jobs (if necessary). The parent job is then paused and will be reactivated when all child-jobs are finished
  • WcfService-Clients are now consistently managed by WcfServicePool which allows to use IDisposable-Pattern and always keeps exactly one proxy-object until all callers disposed them.
  • created ProgressView which is able to lock a View and display progress of an action. It also allows to simulate progress if no progress-information is available so that users don't get too nervous while waiting.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/3.3-Hive/sources/HeuristicLab.Hive/HeuristicLab.Hive.Experiment/3.3/HiveExperiment.cs

    r4342 r4368  
    4545using HeuristicLab.Hive.Experiment.Properties;
    4646using System.ComponentModel;
     47using HeuristicLab.Hive.Experiment.Jobs;
    4748
    4849namespace HeuristicLab.Hive.Experiment {
     
    5152  /// </summary>
    5253  [Item(itemName, itemDescription)]
    53   [Creatable("Testing & Analysis")]
    5454  [StorableClass]
    5555  public class HiveExperiment : NamedItem, IExecutable {
     
    6363    private System.Timers.Timer timer;
    6464    private bool pausePending, stopPending;
    65     private bool sendingJobsFinished = false;
    6665
    6766    // ensure that only 2 threads can fetch jobresults simultaniously
     
    9897    [Storable]
    9998    private DateTime lastUpdateTime;
    100 
    101     /// <summary>
    102     /// Mapping from JobId to an optimizer.
    103     /// Stores all pending optimizers. If an optimizer is finished it is removed from this collection
    104     /// </summary>
    105     [Storable]
    106     private IDictionary<Guid, IOptimizer> pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
    107 
    108     /// <summary>
    109     /// Stores a mapping from the child-optimizer to the parent optimizer.
    110     /// Needed to replace a finished optimizer in the optimizer-tree.
    111     /// Only pending optmizers are stored.
    112     /// </summary>
    113     [Storable]
    114     private IDictionary<IOptimizer, IOptimizer> parentOptimizersByPendingOptimizer = new Dictionary<IOptimizer, IOptimizer>();
    115 
    116     [Storable]
    117     private JobItemList jobItems;
    118     public JobItemList JobItems {
    119       get { return jobItems; }
    120     }
    121 
     99   
    122100    [Storable]
    123101    private string resourceIds;
     
    173151      }
    174152    }
     153
     154    [Storable]
     155    private IJob rootJob;
     156    public IJob RootJob {
     157      get { return rootJob; }
     158      set {
     159        if (rootJob != value) {
     160          rootJob = value;
     161          OnRootJobChanged();
     162        }
     163      }
     164    }
     165
     166    private JobItem rootJobItem;
     167    public JobItem RootJobItem {
     168      get { return rootJobItem; }
     169      set {
     170        if (rootJobItem != null) {
     171          DeregisterRootJobItemEvents();
     172        }
     173        if (rootJobItem != value) {
     174          rootJobItem = value;
     175          RegisterRootJobItemEvents();
     176          OnRootJobItemChanged();
     177        }
     178      }
     179    }
     180
     181    private void RegisterRootJobItemEvents() {
     182      rootJobItem.FinalResultAvailable += new EventHandler(rootJobItem_FinalResultAvailable);
     183    }
     184
     185    private void DeregisterRootJobItemEvents() {
     186      rootJobItem.FinalResultAvailable -= new EventHandler(rootJobItem_FinalResultAvailable);
     187    }
     188
    175189    #endregion
    176190
     
    185199      this.log = new Log();
    186200      pausePending = stopPending = false;
    187       jobItems = new JobItemList();
    188201      isPollingResults = false;
    189       RegisterJobItemListEvents();
    190202      InitTimer();
    191203    }
     
    198210      clone.executionState = this.executionState;
    199211      clone.executionTime = this.executionTime;
    200       clone.pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
    201 
    202       lock (pendingOptimizerMappingsLocker) {
    203         foreach (var pair in this.pendingOptimizersByJobId)
    204           clone.pendingOptimizersByJobId[pair.Key] = (IOptimizer)cloner.Clone(pair.Value);
    205 
    206         foreach (var pair in this.parentOptimizersByPendingOptimizer)
    207           clone.parentOptimizersByPendingOptimizer[(IOptimizer)cloner.Clone(pair.Key)] = (IOptimizer)cloner.Clone(pair.Value);
    208       }
    209212      clone.log = (ILog)cloner.Clone(log);
    210213      clone.stopPending = this.stopPending;
    211214      clone.pausePending = this.pausePending;
    212       clone.jobItems.AddRange((JobItemList)cloner.Clone(jobItems));
    213215      clone.lastUpdateTime = this.lastUpdateTime;
    214216      clone.isPollingResults = this.isPollingResults;
     217      clone.rootJob = (IJob)cloner.Clone(this.rootJob);
    215218      return clone;
    216219    }
     
    221224      this.IsPollingResults = false;
    222225      this.stopResultsPollingPending = false;
    223       RegisterJobItemListEvents();
     226      RegisterEvents();
    224227      LogMessage("I was deserialized.");
     228    }
     229
     230    private void RegisterEvents() {
     231      RootJobChanged += new EventHandler(HiveExperiment_RootJobChanged);
     232    }
     233
     234    private void DeRegisterEvents() {
     235      RootJobChanged -= new EventHandler(HiveExperiment_RootJobChanged);
    225236    }
    226237
     
    247258      if (experiment != null) {
    248259        StopResultPolling();
    249         lock (pendingOptimizerMappingsLocker) {
    250           pendingOptimizersByJobId.Clear();
    251           parentOptimizersByPendingOptimizer.Clear();
    252         }
    253         lock (jobItems) {
    254           jobItems.Clear();
    255         }
    256260        experiment.Prepare();
    257261        this.ExecutionState = Core.ExecutionState.Prepared;
     
    261265
    262266    public void Start() {
    263       sendingJobsFinished = false;
    264267      OnStarted();
    265268      ExecutionTime = new TimeSpan();
    266269      lastUpdateTime = DateTime.Now;
    267270      this.ExecutionState = Core.ExecutionState.Started;
    268       StartResultPolling();
    269271
    270272      Thread t = new Thread(() => {
    271         IClientFacade clientFacade = CreateStreamedClientFacade();
    272 
    273         try {
    274           pendingOptimizersByJobId = new Dictionary<Guid, IOptimizer>();
    275 
    276           LogMessage("Extracting jobs from Experiment");
    277           parentOptimizersByPendingOptimizer = GetOptimizers(true);
    278           LogMessage("Extraction of jobs from Experiment finished");
    279 
    280           IEnumerable<string> groups = ResourceGroups;
    281           lock (pendingOptimizerMappingsLocker) {
    282             foreach (IOptimizer optimizer in parentOptimizersByPendingOptimizer.Keys) {
    283               SerializedJob serializedJob = CreateSerializedJob(optimizer);
    284               ResponseObject<JobDto> response = clientFacade.AddJobWithGroupStrings(serializedJob, groups);
    285               pendingOptimizersByJobId.Add(response.Obj.Id, optimizer);
    286 
    287               JobItem jobItem = new JobItem() {
    288                 JobDto = response.Obj,
    289                 LatestSnapshot = null,
    290                 Optimizer = optimizer
    291               };
    292               lock (jobItems) {
    293                 jobItems.Add(jobItem);
    294               }
    295               LogMessage(jobItem.JobDto.Id, "Job sent to Hive");
     273        using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     274          try {
     275            RootJobItem = ToJobItem(RootJob);
     276
     277            IEnumerable<string> groups = ResourceGroups;
     278            SerializedJob serializedJob = RootJobItem.ToSerializedJob();
     279            ResponseObject<JobDto> response = service.Obj.AddJobWithGroupStrings(serializedJob, groups);
     280
     281            if (response.StatusMessage != ResponseStatus.Ok) {
     282              throw new Exception(response.StatusMessage.ToString());
     283            } else {
     284              RootJobItem.JobDto = response.Obj;
     285              LogMessage(RootJobItem.JobDto.Id, "Job sent to Hive");
     286
     287              StartResultPolling();
    296288            }
    297289          }
    298         }
    299         catch (Exception e) {
    300           LogMessage("Error: Starting HiveExperiment failed: " + e.Message);
    301           this.ExecutionState = Core.ExecutionState.Stopped;
    302           OnStopped();
    303         }
    304         finally {
    305           ServiceLocator.DisposeClientFacade(clientFacade);
    306         }
    307         sendingJobsFinished = true;
     290          catch (Exception e) {
     291            LogMessage("Error: Starting HiveExperiment failed: " + e.Message);
     292            this.ExecutionState = Core.ExecutionState.Stopped;
     293            OnStopped();
     294          }
     295        }
    308296      });
    309297      t.Start();
     
    311299
    312300    public void Stop() {
     301      if (IsPollingResults)
     302        StopResultPolling();
    313303      this.ExecutionState = Core.ExecutionState.Stopped;
    314       foreach (JobItem jobItem in jobItems) {
    315         AbortJob(jobItem.JobDto.Id);
    316       }
    317304      OnStopped();
    318305    }
    319306    #endregion
    320 
    321     #region Optimizier Management
    322     /// <summary>
    323     /// Returns all optimizers in the current Experiment
    324     /// </summary>
    325     /// <param name="flatout">if false only top level optimizers are returned, if true the optimizer-tree is flatted</param>
    326     /// <returns></returns>
    327     private IDictionary<IOptimizer, IOptimizer> GetOptimizers(bool flatout) {
    328       if (!flatout) {
    329         var optimizers = new Dictionary<IOptimizer, IOptimizer>();
    330         foreach (IOptimizer opt in experiment.Optimizers) {
    331           optimizers.Add(experiment, opt);
    332         }
    333         return optimizers;
     307   
     308    #region Job Management
     309    public void RefreshJobTree() {
     310      this.RootJob = CreateJobTree(this.Experiment);
     311    }
     312   
     313    private IJob CreateJobTree(IOptimizer optimizer) {
     314      IJob job = null;
     315      if (optimizer is HeuristicLab.Optimization.Experiment) {
     316        HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)optimizer;
     317        ExperimentJob expJob = new ExperimentJob(exp);
     318        foreach (IOptimizer opt in exp.Optimizers) {
     319          expJob.AddChildJob(CreateJobTree(opt));
     320        }
     321        job = expJob;
     322      } else if (optimizer is BatchRun) {
     323        job = new BatchRunJob(optimizer);
    334324      } else {
    335         return FlatOptimizerTree(null, experiment, "");
    336       }
    337     }
    338 
    339     /// <summary>
    340     /// Recursively iterates all IOptimizers in the optimizer-tree and returns them.
    341     ///
    342     /// [chn] this could be implemented more cleanly if Experiment and BatchRun would implement an interface like:
    343     /// interface IParallelizable {
    344     ///   IEnumerable&lt;IOptimizer&gt; GetOptimizers();
    345     /// }
    346     /// </summary>
    347     /// <returns>a dictionary mapping from the parent optimizer to the child optimizer</returns>
    348     private IDictionary<IOptimizer, IOptimizer> FlatOptimizerTree(IOptimizer parent, IOptimizer optimizer, string prepend) {
    349       IDictionary<IOptimizer, IOptimizer> optimizers = new Dictionary<IOptimizer, IOptimizer>();
    350       if (optimizer is HeuristicLab.Optimization.Experiment) {
    351         HeuristicLab.Optimization.Experiment experiment = optimizer as HeuristicLab.Optimization.Experiment;
    352         if (this.experiment != experiment) {
    353           prepend += experiment.Name + "/"; // don't prepend for top-level optimizers
    354         }
    355         foreach (IOptimizer opt in experiment.Optimizers) {
    356           AddRange(optimizers, FlatOptimizerTree(experiment, opt, prepend));
    357         }
    358       } else if (optimizer is BatchRun) {
    359         BatchRun batchRun = optimizer as BatchRun;
    360         prepend += batchRun.Name + "/";
    361         for (int i = 0; i < batchRun.Repetitions; i++) {
    362           IOptimizer opt = (IOptimizer)batchRun.Algorithm.Clone();
    363           opt.Name += " [" + i + "]";
    364           IDictionary<IOptimizer, IOptimizer> batchOptimizers = FlatOptimizerTree(batchRun, opt, prepend);
    365           AddRange(optimizers, batchOptimizers);
    366         }
    367       } else if (optimizer is EngineAlgorithm) {
    368         optimizer.Name = prepend + optimizer.Name;
    369         optimizers.Add(optimizer, parent);
    370         LogMessage("Optimizer extracted: " + optimizer.Name);
    371       } else {
    372         Logger.Warn("Optimizer of type " + optimizers.GetType().ToString() + " unknown");
    373         optimizer.Name = prepend + optimizer.Name;
    374         optimizers.Add(optimizer, parent);
    375         LogMessage("Optimizer extracted: " + optimizer.Name);
    376       }
    377       return optimizers;
    378     }
    379 
    380     private void ReplaceOptimizer(IOptimizer parentOptimizer, IOptimizer originalOptimizer, IOptimizer newOptimizer) {
    381       lock (locker) {
    382         if (parentOptimizer is HeuristicLab.Optimization.Experiment) {
    383           HeuristicLab.Optimization.Experiment exp = (HeuristicLab.Optimization.Experiment)parentOptimizer;
    384           int originalOptimizerIndex = exp.Optimizers.IndexOf(originalOptimizer);
    385           exp.Optimizers[originalOptimizerIndex] = newOptimizer;
    386         } else if (parentOptimizer is BatchRun) {
    387           BatchRun batchRun = (BatchRun)parentOptimizer;
    388           if (newOptimizer is IAlgorithm) {
    389             batchRun.Runs.Add(new Run(newOptimizer.Name, (IAlgorithm)newOptimizer));
    390           } else {
    391             throw new NotSupportedException("Only IAlgorithm types supported");
    392           }
    393         } else {
    394           throw new NotSupportedException("Invalid parentOptimizer");
    395         }
    396       }
    397     }
    398 
    399     private bool NoMorePendingOptimizers() {
    400       lock (pendingOptimizerMappingsLocker) {
    401         return pendingOptimizersByJobId.Count == 0;
    402       }
    403     }
    404 
    405     /// <summary>
    406     /// Removes optimizers from
    407     ///  - parentOptimizersByPendingOptimizer
    408     ///  - pendingOptimizersByJobId
    409     /// </summary>
    410     /// <param name="jobId"></param>
    411     private void DisposeOptimizerMappings(Guid jobId) {
    412       LogMessage(jobId, "Disposing Optimizer Mappings");
    413       lock (pendingOptimizerMappingsLocker) {
    414         parentOptimizersByPendingOptimizer.Remove(pendingOptimizersByJobId[jobId]);
    415         pendingOptimizersByJobId.Remove(jobId);
    416       }
    417     }
    418 
    419     #endregion
    420 
    421     #region Job Management
     325        job = new OptimizerJob(optimizer);
     326      }
     327      return job;
     328    }
     329
     330    private JobItem ToJobItem(IJob j) {
     331      return new JobItem() {
     332        Job = j,
     333        JobDto = new JobDto() {
     334          State = JobState.Offline,
     335          PluginsNeeded = HivePluginInfoDto.FindPluginsNeeded(j.GetType())
     336        }
     337      };
     338    }
     339
    422340    /// <summary>
    423341    /// Updates all JobItems with the results
     342    /// if one is finished, the serialized job is downloaded and updated
    424343    /// </summary>
    425344    /// <param name="jobResultList"></param>
    426345    private void UpdateJobItems(JobResultList jobResultList) {
    427       // use a Dict to avoid quadratic runtime complexity
    428       IDictionary<Guid, JobResult> jobResultDict = jobResultList.ToDictionary(job => job.JobId);
    429       lock (jobItems) {
    430         foreach (JobItem jobItem in JobItems) {
    431           if (jobResultDict.ContainsKey(jobItem.JobDto.Id)) {
    432             jobItem.JobResult = jobResultDict[jobItem.JobDto.Id];
     346      if (RootJobItem == null) {
     347        var rootResults = jobResultList.Where(res => !res.ParentJobId.HasValue);
     348        if (rootResults.Count() > 0) {
     349          RootJobItem = new JobItem(rootResults.First());
     350        } else {
     351          LogMessage("Error: Could not find JobResult for RootJobItem");
     352          return;
     353        }
     354      }
     355
     356      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     357        foreach (JobResult jobResult in jobResultList) {
     358          JobItem jobItem = FindJobItem(RootJobItem, jobResult.Id);
     359          if (jobItem != null) {
     360            jobItem.UpdateJob(jobResult);
     361            if (jobItem.JobDto.State == JobState.Finished && !jobItem.IsFinalResultAvailable) {
     362              // job is finished but the final result was not yet downloaded
     363              SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobResult.Id).Obj;
     364              jobItem.Job = XmlParser.Deserialize<IJob>(new MemoryStream(serializedJob.SerializedJobData));
     365              UpdateChildJobs(jobItem);
     366              this.Experiment.Runs.AddRange(((OptimizerJob)jobItem.Job).Optimizer.Runs);
     367              //ReplaceOptimizerInExperiment(jobItem.JobDto.Id, jobItem.Job);
     368            }
     369          } else {
     370            // job does not yet exist locally
     371            JobItem parentJobItem = FindJobItem(RootJobItem, jobResult.ParentJobId.Value);
     372            if (parentJobItem != null) {
     373              LogMessage(jobResult.Id, "Creating JobItem");
     374              SerializedJob serializedJob = service.Obj.GetLastSerializedResult(jobResult.Id).Obj;
     375              JobItem newJobItem = new JobItem();
     376              newJobItem.JobDto = serializedJob.JobInfo;
     377              newJobItem.Job = XmlParser.Deserialize<IJob>(new MemoryStream(serializedJob.SerializedJobData));
     378              parentJobItem.AddChildJob(newJobItem);
     379              if (newJobItem.JobDto.State == JobState.Finished && newJobItem.IsFinalResultAvailable) {
     380                this.Experiment.Runs.AddRange(((OptimizerJob)newJobItem.Job).Optimizer.Runs);
     381                UpdateChildJobs(newJobItem);
     382                //ReplaceOptimizerInExperiment(jobItem.JobDto.Id, jobItem.Job);
     383              }
     384            } else {
     385              LogMessage("Error: Could not update JobResult for " + jobResult.Id);
     386            }
    433387          }
    434388        }
    435389      }
     390    }
     391
     392    /// <summary>
     393    /// Updates the ChildJobItems of a JobItem according to the IJob.ChildJobs of JobItem.Job (pretty confusing, right)
     394    /// </summary>
     395    /// <param name="jobItem"></param>
     396    private void UpdateChildJobs(JobItem jobItem) {
     397      List<JobItem> newJobItems = new List<JobItem>();
     398      foreach (IJob job in jobItem.Job.ChildJobs) {
     399        if (!jobItem.ReplaceChildJob(job)) {
     400          newJobItems.Add(ToJobItem(job));
     401        }
     402      }
     403      foreach (JobItem item in newJobItems) {
     404        jobItem.AddChildJob(item);
     405      }
     406    }
     407
     408    private JobItem FindJobItem(JobItem parentJobItem, Guid jobId) {
     409      if (parentJobItem.JobDto.Id == jobId) {
     410        return parentJobItem;
     411      } else {
     412        foreach (JobItem child in parentJobItem.ChildJobItems) {
     413          JobItem result = FindJobItem(child, jobId);
     414          if (result != null)
     415            return result;
     416        }
     417      }
     418      return null;
    436419    }
    437420
     
    441424      Thread t = new Thread(() => {
    442425        try {
    443           if (jobItem.State == JobState.Finished) {
     426          if (jobItem.JobDto.State == JobState.Finished) {
    444427            FetchAndUpdateJob(jobItem.JobDto.Id);
    445             DisposeOptimizerMappings(jobItem.JobDto.Id);
    446           } else if (jobItem.State == JobState.Failed) {
    447             DisposeOptimizerMappings(jobItem.JobDto.Id);
     428          } else if (jobItem.JobDto.State == JobState.Failed) {
     429           
    448430          }
    449 
    450           if (NoMorePendingOptimizers()) {
    451             StopResultPolling();
    452             this.ExecutionState = Core.ExecutionState.Stopped;
    453             OnStopped();
    454           }
    455431        }
    456432        catch (Exception ex) {
    457           Logger.Error("JobItem_JobStateChanged failed badly: " + ex.Message);
    458433          LogMessage("JobItem_JobStateChanged failed badly: " + ex.Message);
    459434        }
     
    469444      LogMessage(jobId, "FetchAndUpdateJob started");
    470445      if (fetchJobSemaphore.WaitOne(new TimeSpan(0, 2, 0))) {
    471         IClientFacade clientFacade = null;
     446
    472447        try {
    473           clientFacade = CreateStreamedClientFacade();
    474           IOptimizer originalOptimizer;
    475           originalOptimizer = pendingOptimizersByJobId[jobId];
    476 
    477           ResponseObject<SerializedJob> jobResponse = clientFacade.GetLastSerializedResult(jobId, false, false);
    478           IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
    479           IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
    480           ReplaceOptimizer(parentOptimizersByPendingOptimizer[originalOptimizer], originalOptimizer, restoredOptimizer);
    481           LogMessage(jobId, "FetchAndUpdateJob ended");
     448          using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     449            ResponseObject<SerializedJob> jobResponse = service.Obj.GetLastSerializedResult(jobId);
     450            IJob restoredObject = XmlParser.Deserialize<IJob>(new MemoryStream(jobResponse.Obj.SerializedJobData));
     451            IOptimizer restoredOptimizer = ((OptimizerJob)restoredObject).Optimizer;
     452            LogMessage(jobId, "FetchAndUpdateJob ended");
     453          }
    482454        }
    483455        catch (Exception e) {
     
    486458        }
    487459        finally {
    488           ServiceLocator.DisposeClientFacade(clientFacade);
    489460          fetchJobSemaphore.Release();
    490461        }
     
    499470    }
    500471
    501     private void UpdateJobItem(JobDto jobDto) {
    502       JobItem jobItem = jobItems.Single(x => x.JobDto.Id == jobDto.Id);
    503       jobItem.JobDto = jobDto;
    504     }
    505 
    506472    public void AbortJob(Guid jobId) {
    507       IClientFacade clientFacade = CreateClientFacade();
    508       Response response = clientFacade.AbortJob(jobId);
    509       LogMessage(jobId, "Aborting Job: " + response.StatusMessage);
    510     }
    511 
    512     private SerializedJob CreateSerializedJob(IOptimizer optimizer) {
    513       IJob job = new OptimizerJob() {
    514         Optimizer = optimizer
    515       };
    516 
    517       // serialize job
    518       MemoryStream memStream = new MemoryStream();
    519       XmlGenerator.Serialize(job, memStream);
    520       byte[] jobByteArray = memStream.ToArray();
    521       memStream.Dispose();
    522 
    523       // find out which which plugins are needed for the given object
    524       List<HivePluginInfoDto> pluginsNeeded = (
    525         from p in GetDeclaringPlugins(job.GetType())
    526         select new HivePluginInfoDto() {
    527           Name = p.Name,
    528           Version = p.Version
    529         }).ToList();
    530 
    531       JobDto jobDto = new JobDto() {
    532         CoresNeeded = 1, // [chn] how to determine real cores needed?
    533         PluginsNeeded = pluginsNeeded,
    534         State = JobState.Offline,
    535         MemoryNeeded = 0
    536       };
    537 
    538       SerializedJob serializedJob = new SerializedJob() {
    539         JobInfo = jobDto,
    540         SerializedJobData = jobByteArray
    541       };
    542 
    543       return serializedJob;
    544     }
    545 
    546     private JobItem GetJobItemById(Guid jobId) {
    547       return jobItems.Single(x => x.JobDto.Id == jobId);
     473      using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
     474        Response response = service.Obj.AbortJob(jobId);
     475        LogMessage(jobId, "Aborting Job: " + response.StatusMessage);
     476      }
    548477    }
    549478    #endregion
     
    553482      this.stopResultsPollingPending = false;
    554483      this.IsPollingResults = true;
    555       resultPollingThread = CreateResultPollingThread();
     484      resultPollingThread = new Thread(RunResultPollingThread);
    556485      if (resultPollingThread.ThreadState != System.Threading.ThreadState.Running)
    557486        resultPollingThread.Start();
     
    560489    public void StopResultPolling() {
    561490      this.stopResultsPollingPending = true;
    562       if (resultPollingThread != null && resultPollingThread.ThreadState == System.Threading.ThreadState.WaitSleepJoin) {
    563         resultPollingThread.Interrupt();
     491      if (resultPollingThread != null) {
     492        if (resultPollingThread.ThreadState == System.Threading.ThreadState.WaitSleepJoin) {
     493          resultPollingThread.Interrupt();
     494        }
     495        resultPollingThread.Join();
    564496      }
    565497      this.stopResultsPollingPending = false;
    566498    }
    567499
    568     private Thread CreateResultPollingThread() {
    569       return new Thread(() => {
    570         try {
    571           do {
    572             IClientFacade clientFacade = CreateStreamedClientFacade();
    573             IEnumerable<Guid> jobIdsToQuery = from job in JobItems
    574                                               where job.State != JobState.Finished &&
    575                                               job.State != JobState.Failed
    576                                               select job.JobDto.Id;
    577             if (jobIdsToQuery.Count() > 0) {
    578               LogMessage("Polling results for " + jobIdsToQuery.Count() + " jobs");
     500    private void RunResultPollingThread() {
     501      try {
     502        do {
     503          if (RootJobItem.JobDto.State != JobState.Finished) {
     504            using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     505              LogMessage("Polling results");
    579506              try {
    580                 ResponseObject<JobResultList> response = clientFacade.GetJobResults(jobIdsToQuery);
     507                ResponseObject<JobResultList> response = service.Obj.GetChildJobResults(RootJobItem.JobDto.Id, true, true);
    581508                if (response.StatusMessage == ResponseStatus.Ok) {
    582509                  JobResultList jobItemList = response.Obj;
     
    591518                LogMessage("Polling results failed: " + e.Message);
    592519              }
    593               finally {
    594                 ServiceLocator.DisposeClientFacade(clientFacade);
    595               }
    596               Thread.Sleep(resultPollingIntervalMs);
    597             } else {
    598               if (sendingJobsFinished) {
    599                 // all the jobs have been sent to hive, but non are to query any more (all finished or failed)
    600                 this.stopResultsPollingPending = true;
    601               }
    602520            }
    603           } while (!this.stopResultsPollingPending);
    604         }
    605         catch (ThreadInterruptedException exception) {
    606           // thread has been interuppted
    607         }
    608         catch (Exception e) {
    609           LogMessage("Result Polling Thread failed badly: " + e.Message);
    610           Logger.Error("Result Polling Thread failed badly: " + e.Message);
    611         }
    612         finally {
    613           this.IsPollingResults = false;
    614         }
    615       });
     521            Thread.Sleep(resultPollingIntervalMs);
     522          } else {
     523            // all the jobs have been sent to hive, but non are to query any more (all finished or failed)
     524            this.stopResultsPollingPending = true;
     525          }
     526        } while (!this.stopResultsPollingPending);
     527      }
     528      catch (ThreadInterruptedException) {
     529        // thread has been interuppted
     530      }
     531      catch (Exception e) {
     532        LogMessage("Result Polling Thread failed badly: " + e.Message);
     533      }
     534      finally {
     535        this.IsPollingResults = false;
     536      }
    616537    }
    617538
     
    620541    #region Snapshots
    621542
    622     private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
    623       JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
    624       jobItem.LatestSnapshot = response;
    625     }
    626 
    627     public void RequestSnapshot(Guid jobId) {
    628       Thread t = new Thread(() => {
    629         IClientFacade clientFacade = null;
    630         try {
    631           clientFacade = CreateStreamedClientFacade();
    632 
    633           ResponseObject<SerializedJob> response;
    634           int retryCount = 0;
    635 
    636           Response snapShotResponse = clientFacade.RequestSnapshot(jobId);
    637           if (snapShotResponse.StatusMessage == ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated) {
    638             // job already finished
    639             Logger.Debug("HiveExperiment: Abort - GetLastResult(false)");
    640             response = clientFacade.GetLastSerializedResult(jobId, false, false);
    641             Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
    642           } else {
    643             // server sent snapshot request to client
    644             // poll until snapshot is ready
    645             do {
    646               Thread.Sleep(snapshotPollingIntervalMs);
    647               Logger.Debug("HiveExperiment: Abort - GetLastResult(true)");
    648               response = clientFacade.GetLastSerializedResult(jobId, false, true);
    649               Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
    650               retryCount++;
    651               // loop while
    652               // 1. problem with communication with server
    653               // 2. job result not yet ready
    654             } while (
    655               (retryCount < maxSnapshotRetries) && (
    656               response.StatusMessage == ResponseStatus.GetLastSerializedResult_JobResultNotYetThere)
    657               );
    658           }
    659           if (response.StatusMessage == ResponseStatus.Ok) {
    660             LogMessage(jobId, "Snapshot polling successfull for job " + jobId);
    661             UpdateSnapshot(response);
    662           } else {
    663             LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage);
    664           }
    665         }
    666         catch (Exception e) {
    667           LogMessage("RequestSnapshot Thread failed badly: " + e.Message);
    668           Logger.Error("RequestSnapshot Thread failed badly: " + e.Message);
    669         }
    670         finally {
    671           ServiceLocator.DisposeClientFacade(clientFacade);
    672         }
    673       });
    674       t.Start();
    675     }
    676 
    677     void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) {
    678       JobItem jobItem = (JobItem)sender;
    679       if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) {
    680         RequestSnapshot(jobItem.JobDto.Id);
    681       }
    682     }
    683 
    684     #endregion
    685 
    686     #region Required Plugin Search
    687     /// <summary>
    688     /// Returns a list of plugins in which the type itself and all members
    689     /// of the type are declared. Objectgraph is searched recursively.
    690     /// </summary>
    691     private IEnumerable<IPluginDescription> GetDeclaringPlugins(Type type) {
    692       HashSet<Type> types = new HashSet<Type>();
    693       FindTypes(type, types, "HeuristicLab.");
    694       return GetDeclaringPlugins(types);
    695     }
    696 
    697     /// <summary>
    698     /// Returns the plugins (including dependencies) in which the given types are declared
    699     /// </summary>
    700     private IEnumerable<IPluginDescription> GetDeclaringPlugins(IEnumerable<Type> types) {
    701       HashSet<IPluginDescription> plugins = new HashSet<IPluginDescription>();
    702       foreach (Type t in types) {
    703         FindDeclaringPlugins(ApplicationManager.Manager.GetDeclaringPlugin(t), plugins);
    704       }
    705       return plugins;
    706     }
    707 
    708     /// <summary>
    709     /// Finds the dependencies of the given plugin and adds it to the plugins hashset.
    710     /// Also searches the dependencies recursively.
    711     /// </summary>
    712     private void FindDeclaringPlugins(IPluginDescription plugin, HashSet<IPluginDescription> plugins) {
    713       if (!plugins.Contains(plugin)) {
    714         plugins.Add(plugin);
    715         foreach (IPluginDescription dependency in plugin.Dependencies) {
    716           FindDeclaringPlugins(dependency, plugins);
    717         }
    718       }
    719     }
    720 
    721     /// <summary>
    722     /// Recursively finds all types used in type which are in a namespace which starts with namespaceStart
    723     /// Be aware that search is not performed on attributes
    724     /// </summary>
    725     /// <param name="type">the type to be searched</param>
    726     /// <param name="types">found types will be stored there, needed in order to avoid duplicates</param>
    727     /// <param name="namespaceStart">only types from namespaces which start with this will be searched and added</param>
    728     private void FindTypes(Type type, HashSet<Type> types, string namespaceStart) {
    729       if (!types.Contains(type) && type.Namespace.StartsWith(namespaceStart)) {
    730         types.Add(type);
    731 
    732         // constructors
    733         foreach (ConstructorInfo info in type.GetConstructors()) {
    734           foreach (ParameterInfo paramInfo in info.GetParameters()) {
    735             FindTypes(paramInfo.ParameterType, types, namespaceStart);
    736           }
    737         }
    738 
    739         // interfaces
    740         foreach (Type t in type.GetInterfaces()) {
    741           FindTypes(t, types, namespaceStart);
    742         }
    743 
    744         // events
    745         foreach (EventInfo info in type.GetEvents()) {
    746           FindTypes(info.EventHandlerType, types, namespaceStart);
    747           FindTypes(info.DeclaringType, types, namespaceStart);
    748         }
    749 
    750         // properties
    751         foreach (PropertyInfo info in type.GetProperties()) {
    752           FindTypes(info.PropertyType, types, namespaceStart);
    753         }
    754 
    755         // fields
    756         foreach (FieldInfo info in type.GetFields()) {
    757           FindTypes(info.FieldType, types, namespaceStart);
    758         }
    759 
    760         // methods
    761         foreach (MethodInfo info in type.GetMethods()) {
    762           foreach (ParameterInfo paramInfo in info.GetParameters()) {
    763             FindTypes(paramInfo.ParameterType, types, namespaceStart);
    764           }
    765           FindTypes(info.ReturnType, types, namespaceStart);
    766         }
    767       }
    768     }
     543    //private void UpdateSnapshot(ResponseObject<SerializedJob> response) {
     544    //  JobItem jobItem = jobItems.Single(x => x.JobDto.Id == response.Obj.JobInfo.Id);
     545    //  jobItem.LatestSnapshot = response;
     546    //}
     547
     548    //public void RequestSnapshot(Guid jobId) {
     549    //  Thread t = new Thread(() => {
     550    //    IClientFacade clientFacade = null;
     551    //    try {
     552    //      clientFacade = CreateStreamedClientFacade();
     553
     554    //      ResponseObject<SerializedJob> response;
     555    //      int retryCount = 0;
     556
     557    //      Response snapShotResponse = clientFacade.RequestSnapshot(jobId);
     558    //      if (snapShotResponse.StatusMessage == ResponseStatus.RequestSnapshot_JobIsNotBeeingCalculated) {
     559    //        // job already finished
     560    //        Logger.Debug("HiveExperiment: Abort - GetLastResult(false)");
     561    //        response = clientFacade.GetLastSerializedResult(jobId, false, false);
     562    //        Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
     563    //      } else {
     564    //        // server sent snapshot request to client
     565    //        // poll until snapshot is ready
     566    //        do {
     567    //          Thread.Sleep(snapshotPollingIntervalMs);
     568    //          Logger.Debug("HiveExperiment: Abort - GetLastResult(true)");
     569    //          response = clientFacade.GetLastSerializedResult(jobId, false, true);
     570    //          Logger.Debug("HiveExperiment: Abort - Server: " + response.StatusMessage);
     571    //          retryCount++;
     572    //          // loop while
     573    //          // 1. problem with communication with server
     574    //          // 2. job result not yet ready
     575    //        } while (
     576    //          (retryCount < maxSnapshotRetries) && (
     577    //          response.StatusMessage == ResponseStatus.GetLastSerializedResult_JobResultNotYetThere)
     578    //          );
     579    //      }
     580    //      if (response.StatusMessage == ResponseStatus.Ok) {
     581    //        LogMessage(jobId, "Snapshot polling successfull for job " + jobId);
     582    //        //UpdateSnapshot(response);
     583    //      } else {
     584    //        LogMessage(jobId, "Error: Polling of Snapshot failed for job " + jobId + ": " + response.StatusMessage);
     585    //      }
     586    //    }
     587    //    catch (Exception e) {
     588    //      LogMessage("RequestSnapshot Thread failed badly: " + e.Message);
     589    //      Logger.Error("RequestSnapshot Thread failed badly: " + e.Message);
     590    //    }
     591    //    finally {
     592    //      ServiceLocator.DisposeClientFacade(clientFacade);
     593    //    }
     594    //  });
     595    //  t.Start();
     596    //}
     597
     598    //void JobItem_SnapshotRequestedStateChanged(object sender, EventArgs e) {
     599    //  JobItem jobItem = (JobItem)sender;
     600    //  if (jobItem.SnapshotRequestedState == SnapshotRequestedState.Requested) {
     601    //    RequestSnapshot(jobItem.JobDto.Id);
     602    //  }
     603    //}
     604
    769605    #endregion
    770606
     
    827663      LogMessage("Experiment changed");
    828664      EventHandler handler = ExperimentChanged;
     665      if (handler != null) handler(this, EventArgs.Empty);
     666    }
     667
     668    public event EventHandler RootJobChanged;
     669    private void OnRootJobChanged() {
     670      EventHandler handler = RootJobChanged;
     671      if (handler != null) handler(this, EventArgs.Empty);
     672    }
     673
     674    public event EventHandler RootJobItemChanged;
     675    private void OnRootJobItemChanged() {
     676      EventHandler handler = RootJobItemChanged;
    829677      if (handler != null) handler(this, EventArgs.Empty);
    830678    }
     
    843691    }
    844692
    845     private void RegisterJobItemListEvents() {
    846       jobItems.CollectionReset += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
    847       jobItems.ItemsAdded += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
    848       jobItems.ItemsRemoved += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
    849       jobItems.ItemsReplaced += new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
    850       foreach (JobItem jobItem in jobItems) {
    851         RegisterJobItemEvents(jobItem);
    852       }
    853     }
    854 
    855     private void DeregisterJobItemListEvents() {
    856       jobItems.CollectionReset -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_CollectionReset);
    857       jobItems.ItemsAdded -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsAdded);
    858       jobItems.ItemsRemoved -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsRemoved);
    859       jobItems.ItemsReplaced -= new CollectionItemsChangedEventHandler<IndexedItem<JobItem>>(jobItems_ItemsReplaced);
    860       foreach (JobItem jobItem in jobItems) {
    861         DeregisterJobItemEvents(jobItem);
    862       }
    863     }
    864 
    865     void jobItems_ItemsReplaced(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
    866       UpdateJobItemEvents(e);
    867     }
    868 
    869     private void UpdateJobItemEvents(CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
    870       if (e.OldItems != null) {
    871         foreach (var item in e.OldItems) {
    872           DeregisterJobItemEvents(item.Value);
    873         }
    874       }
    875       if (e.Items != null) {
    876         foreach (var item in e.Items) {
    877           RegisterJobItemEvents(item.Value);
    878         }
    879       }
    880     }
    881 
    882     private void RegisterJobItemEvents(JobItem jobItem) {
    883       jobItem.SnapshotRequestedStateChanged += new EventHandler(JobItem_SnapshotRequestedStateChanged);
    884       jobItem.JobStateChanged += new EventHandler(JobItem_JobStateChanged);
    885     }
    886 
    887     private void DeregisterJobItemEvents(JobItem jobItem) {
    888       jobItem.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
    889       jobItem.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
    890     }
    891 
    892     void jobItems_ItemsRemoved(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
    893       UpdateJobItemEvents(e);
    894     }
    895 
    896     void jobItems_ItemsAdded(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
    897       UpdateJobItemEvents(e);
    898     }
    899 
    900     void jobItems_CollectionReset(object sender, CollectionItemsChangedEventArgs<IndexedItem<JobItem>> e) {
    901       foreach (var item in e.OldItems) {
    902         item.Value.JobStateChanged -= new EventHandler(JobItem_JobStateChanged);
    903         item.Value.SnapshotRequestedStateChanged -= new EventHandler(JobItem_SnapshotRequestedStateChanged);
    904       }
     693    void HiveExperiment_RootJobChanged(object sender, EventArgs e) {
     694      this.Experiment = (Optimization.Experiment)((ExperimentJob)rootJobItem.Job).Optimizer;
     695    }
     696
     697    void rootJobItem_FinalResultAvailable(object sender, EventArgs e) {
     698      this.Experiment = (Optimization.Experiment)((ExperimentJob)rootJobItem.Job).Optimizer;
    905699    }
    906700    #endregion
    907 
    908     #region Helper Functions
    909     private IClientFacade CreateClientFacade() {
    910       IClientFacade clientFacade = null;
    911       do {
    912         try {
    913           //clientFacade = ServiceLocator.CreateClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientServiceName));
    914           clientFacade = ServiceLocator.CreateClientFacade(Settings.Default.HiveServerIp);
    915 
    916         }
    917         catch (EndpointNotFoundException exception) {
    918           LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
    919           Thread.Sleep(resultPollingIntervalMs);
    920         }
    921       } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
    922       return clientFacade;
    923     }
    924 
    925     private IClientFacade CreateStreamedClientFacade() {
    926       IClientFacade clientFacade = null;
    927       do {
    928         try {
    929           //clientFacade = ServiceLocator.CreateStreamedClientFacade(string.Format("http://{0}:{1}/{2}", Settings.Default.HiveServerIp, Settings.Default.HiveServerPort, WcfSettings.ClientStreamedServiceName));
    930           clientFacade = ServiceLocator.CreateStreamedClientFacade(Settings.Default.HiveServerIp);
    931         }
    932         catch (EndpointNotFoundException exception) {
    933           LogMessage("Could not connect to Server: " + exception.Message + ". Will try again in " + (resultPollingIntervalMs / 1000) + " sec.");
    934           Thread.Sleep(resultPollingIntervalMs);
    935         }
    936       } while (clientFacade == null && this.ExecutionState != Core.ExecutionState.Stopped);
    937       return clientFacade;
    938     }
    939 
    940     private void AddRange(IDictionary<IOptimizer, IOptimizer> optimizers, IDictionary<IOptimizer, IOptimizer> childs) {
    941       foreach (KeyValuePair<IOptimizer, IOptimizer> kvp in childs) {
    942         optimizers.Add(kvp);
    943       }
    944     }
    945 
    946     #endregion
    947 
     701   
    948702    #region Logging
    949703    private void LogMessage(string message) {
     
    951705      lock (locker) {
    952706        log.LogMessage(message);
     707        Logger.Debug(message);
    953708      }
    954709    }
    955710
    956711    private void LogMessage(Guid jobId, string message) {
    957       GetJobItemById(jobId).LogMessage(message);
     712      //GetJobItemById(jobId).LogMessage(message);
    958713      LogMessage(message + " (jobId: " + jobId + ")");
    959714    }
    960715
    961716    #endregion
     717
     718    /// <summary>
     719    /// Downloads the root job from hive and sets the experiment, rootJob and rootJobItem
     720    /// </summary>
     721    public void LoadExperimentFromHive() {
     722      using (Disposable<IClientFacade> service = ServiceLocator.Instance.StreamedClientFacadePool.GetService()) {
     723        ResponseObject<SerializedJob> response = service.Obj.GetLastSerializedResult(RootJobItem.JobDto.Id);
     724        IJob job = XmlParser.Deserialize<IJob>(new MemoryStream(response.Obj.SerializedJobData));
     725        RootJob = job;
     726        RootJobItem.JobDto = response.Obj.JobInfo;
     727        this.Experiment = (Optimization.Experiment)((OptimizerJob)job).Optimizer;
     728        if (rootJobItem.JobDto.State == JobState.Finished) {
     729          // set execution time and finish up
     730        } else if (rootJobItem.JobDto.State != JobState.Aborted && rootJobItem.JobDto.State != JobState.Failed) {
     731          this.ExecutionState = Core.ExecutionState.Started;
     732        }
     733      }
     734    }
    962735  }
    963736}
Note: See TracChangeset for help on using the changeset viewer.