Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
05/10/11 17:58:59 (13 years ago)
Author:
cneumuel
Message:

#1233

  • added semaphores to ensure an appdomain is never unloaded when the start method has not finished
  • HiveEngine uploading and downloading of jobs works and is displayed in the view
File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.4/sources/HeuristicLab.HiveEngine/3.4/HiveEngine.cs

    r6111 r6178  
    77using HeuristicLab.Common;
    88using HeuristicLab.Core;
     9using HeuristicLab.Hive;
    910using HeuristicLab.Persistence.Default.CompositeSerializers.Storable;
    1011using HeuristicLab.PluginInfrastructure;
     
    140141
    141142            IScope[] scopes = ExecuteOnHive(jobs, parentScopeClone, cancellationToken);
    142            
     143            //IScope[] scopes = ExecuteLocally(jobs, parentScopeClone, cancellationToken);
     144
    143145            for (int i = 0; i < coll.Count; i++) {
    144146              if (coll[i] is IAtomicOperation) {
     
    203205
    204206    // testfunction:
    205     //private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
    206     //  IScope[] scopes = new Scope[jobs.Length];
    207     //  for (int i = 0; i < jobs.Length; i++) {
    208     //    var job = (EngineJob)jobs[i].Clone();
    209     //    job.Start();
    210     //    while (job.ExecutionState != ExecutionState.Stopped) {
    211     //      Thread.Sleep(100);
    212     //    }
    213     //    scopes[i] = ((IAtomicOperation)job.InitialOperation).Scope;
    214     //  }
    215     //  return scopes;
    216     //}
     207    private IScope[] ExecuteLocally(EngineJob[] jobs, IScope parentScopeClone, CancellationToken cancellationToken) {
     208      IScope[] scopes = new Scope[jobs.Length];
     209      for (int i = 0; i < jobs.Length; i++) {
     210        var serialized = PersistenceUtil.Serialize(jobs[i]);
     211        var deserialized = PersistenceUtil.Deserialize<IJob>(serialized);
     212        deserialized.Start();
     213        while (deserialized.ExecutionState != ExecutionState.Stopped) {
     214          Thread.Sleep(100);
     215        }
     216        var serialized2 = PersistenceUtil.Serialize(deserialized);
     217        var deserialized2 = PersistenceUtil.Deserialize<EngineJob>(serialized2);
     218        var newScope = ((IAtomicOperation)deserialized2.InitialOperation).Scope;
     219        scopes[i] = newScope;
     220      }
     221      return scopes;
     222    }
    217223
    218224    /// <summary>
     
    230236      try {
    231237        List<Guid> remainingJobIds = new List<Guid>();
    232         List<LightweightJob> lightweightJobs;
    233 
    234         int finishedCount = 0;
    235         int uploadCount = 0;
    236238
    237239        // create hive experiment
    238240        hiveExperiment.Name = "HiveEngine Run " + hiveExperiments.Count;
     241        hiveExperiment.DateCreated = DateTime.Now;
    239242        hiveExperiment.UseLocalPlugins = this.UseLocalPlugins;
    240243        hiveExperiment.ResourceNames = this.ResourceNames;
    241         hiveExperiment.Id = ServiceLocator.Instance.CallHiveService(s => s.AddHiveExperiment(hiveExperiment));
    242244        var refreshableHiveExperiment = new RefreshableHiveExperiment(hiveExperiment);
     245        refreshableHiveExperiment.IsControllable = false;
    243246        hiveExperiments.Add(refreshableHiveExperiment);
    244247
     
    246249        var uploadTasks = new List<Task<Job>>();
    247250        for (int i = 0; i < jobs.Length; i++) {
    248           var job = jobs[i];
     251          hiveExperiment.HiveJobs.Add(new EngineHiveJob(jobs[i], parentScopeClone));
    249252
    250253          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
    251           IRandom random = FindRandomParameter(job.InitialOperation as IExecutionContext);
     254          IRandom random = FindRandomParameter(jobs[i].InitialOperation as IExecutionContext);
    252255          if (random != null)
    253256            random.Reset(random.Next());
    254 
    255           uploadTasks.Add(Task.Factory.StartNew<Job>((keyValuePairObj) => {
    256             return UploadJob(keyValuePairObj, parentScopeClone, cancellationToken, GetResourceIds(), hiveExperiment.Id);
    257           }, new KeyValuePair<int, EngineJob>(i, job), cancellationToken));
    258         }
    259 
    260         Task processUploadedJobsTask = new Task(() => {
    261           // process finished upload-tasks
    262           int uploadTasksCount = uploadTasks.Count;
    263           for (int i = 0; i < uploadTasksCount; i++) {
    264             cancellationToken.ThrowIfCancellationRequested();
    265 
    266             var uploadTasksArray = uploadTasks.ToArray();
    267             var task = uploadTasksArray[Task.WaitAny(uploadTasksArray)];
    268             if (task.Status == TaskStatus.Faulted) {
    269               LogException(task.Exception);
    270               throw task.Exception;
    271             }
    272 
    273             int key = ((KeyValuePair<int, EngineJob>)task.AsyncState).Key;
    274             Job job = task.Result;
    275             lock (locker) {
    276               uploadCount++;
    277               jobIndices.Add(job.Id, key);
    278               remainingJobIds.Add(job.Id);
    279             }
    280             jobs[key] = null; // relax memory
    281             LogMessage(string.Format("Uploaded job #{0}", key + 1, job.Id));
    282             uploadTasks.Remove(task);
    283           }
    284         }, cancellationToken, TaskCreationOptions.PreferFairness);
    285         processUploadedJobsTask.Start();
    286 
    287         refreshableHiveExperiment.RefreshAutomatically = true;
    288        
     257        }
     258        ExperimentManagerClient.StartExperiment((e) => { throw e; }, refreshableHiveExperiment);
     259        // do polling until experiment is finished and all jobs are downloaded
    289260        while (!refreshableHiveExperiment.AllJobsFinished()) {
    290           Thread.Sleep(1000);
    291           // update time
    292           // handle cancellation
    293         }
    294 
    295 
    296 
    297         // poll job-statuses and create tasks for those which are finished
    298         var downloadTasks = new List<Task<EngineJob>>();
    299         var executionTimes = new List<TimeSpan>();
    300         var executionTimeOnHiveBefore = executionTimeOnHive;
    301         while (processUploadedJobsTask.Status != TaskStatus.RanToCompletion || remainingJobIds.Count > 0) {
    302           cancellationToken.ThrowIfCancellationRequested();
    303 
    304           Thread.Sleep(10000);
    305           try {
    306             lightweightJobs = ServiceLocator.Instance.CallHiveService(s => s.GetLightweightJobs(remainingJobIds));
    307 
    308             var jobsFinished = lightweightJobs.Where(j => j.State == JobState.Finished || j.State == JobState.Failed || j.State == JobState.Aborted);
    309             finishedCount += jobsFinished.Count();
    310             if (jobsFinished.Count() > 0) LogMessage(string.Format("Finished: {0}/{1}", finishedCount, jobs.Length));
    311             ExecutionTimeOnHive = executionTimeOnHiveBefore + executionTimes.Sum() + lightweightJobs.Select(x => x.ExecutionTime.HasValue ? x.ExecutionTime.Value : TimeSpan.Zero).Sum();
    312 
    313             foreach (var result in jobsFinished) {
    314               if (result.State == JobState.Finished) {
    315                 downloadTasks.Add(Task.Factory.StartNew<EngineJob>((jobIdObj) => {
    316                   return DownloadJob(jobIndices, jobIdObj, cancellationToken);
    317                 }, result.Id, cancellationToken));
    318               } else if (result.State == JobState.Aborted) {
    319                 LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id));
    320               } else if (result.State == JobState.Failed) {
    321                 LogMessage(string.Format("Job #{0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.CurrentStateLog != null ? result.CurrentStateLog.Exception : string.Empty));
    322               }
    323               remainingJobIds.Remove(result.Id);
    324               executionTimes.Add(result.ExecutionTime.HasValue ? result.ExecutionTime.Value : TimeSpan.Zero);
    325             }
    326           }
    327           catch (Exception e) {
    328             LogException(e);
    329           }
    330         }
    331 
    332         // process finished download-tasks
    333         int downloadTasksCount = downloadTasks.Count;
    334         for (int i = 0; i < downloadTasksCount; i++) {
    335           cancellationToken.ThrowIfCancellationRequested();
    336 
    337           var downloadTasksArray = downloadTasks.ToArray();
    338           var task = downloadTasksArray[Task.WaitAny(downloadTasksArray)];
    339           var jobId = (Guid)task.AsyncState;
    340           if (task.Status == TaskStatus.Faulted) {
    341             LogException(task.Exception);
    342             throw task.Exception;
    343           }
    344           scopes[jobIndices[(Guid)task.AsyncState]] = ((IAtomicOperation)task.Result.InitialOperation).Scope;
    345           downloadTasks.Remove(task);
    346         }
    347 
    348         LogMessage(string.Format("All jobs finished (TotalExecutionTime: {0}).", executionTimes.Sum()));
     261          Thread.Sleep(500);
     262          this.ExecutionTimeOnHive = TimeSpan.FromMilliseconds(hiveExperiments.Sum(x => x.HiveExperiment.ExecutionTime.TotalMilliseconds));
     263        }
     264        LogMessage(string.Format("{0} finished (TotalExecutionTime: {1}).", refreshableHiveExperiment.ToString(), refreshableHiveExperiment.HiveExperiment.ExecutionTime));
     265
     266        // get scopes
     267        int j = 0;
     268        foreach (var hiveJob in hiveExperiment.HiveJobs) {
     269          var scope = ((IAtomicOperation) ((EngineJob)hiveJob.ItemJob).InitialOperation).Scope;
     270          scopes[j++] = scope;
     271        }
     272        refreshableHiveExperiment.RefreshAutomatically = false;
    349273        DeleteHiveExperiment(hiveExperiment.Id);
    350 
    351274        return scopes;
    352275      }
     
    505428  }
    506429
    507   public static class ScopeExtensions {
    508     public static void ClearParentScopes(this IScope scope) {
    509       scope.ClearParentScopes(null);
    510     }
    511 
    512     public static void ClearParentScopes(this IScope scope, IScope childScope) {
    513       if (childScope != null) {
    514         scope.SubScopes.Clear();
    515         scope.SubScopes.Add(childScope);
    516       }
    517       if (scope.Parent != null)
    518         scope.Parent.ClearParentScopes(scope);
    519     }
    520   }
    521 
    522430  public static class EnumerableExtensions {
    523431    public static TimeSpan Sum(this IEnumerable<TimeSpan> times) {
Note: See TracChangeset for help on using the changeset viewer.