Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
01/10/11 02:29:22 (14 years ago)
Author:
cneumuel
Message:

#1260

  • clearance of unneeded scopes HiveEngine


File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/HeuristicLab.Hive-3.3/sources/HeuristicLab.Hive/HeuristicLab.HiveEngine/3.3/HiveEngine.cs

    r5232 r5263  
    2424  public class HiveEngine : Engine {
    2525    private Semaphore maxConcurrentConnections = new Semaphore(4, 4); // avoid too many connections
    26     private Semaphore maxConcurrentSerializations = new Semaphore(1, 1); // allow ony ONE concurrent serialization, because the operations share the same ParentScopes and serializing the same objects concurrently causes problems
    27     private Semaphore maxSerializedJobsInMemory = new Semaphore(2, 2); // avoid memory problems
     26    private Semaphore maxSerializedJobsInMemory = new Semaphore(4, 4); // avoid memory problems
    2827    private CancellationToken cancellationToken;
    2928
     
    6867          coll = (OperationCollection)next;
    6968          if (coll.Parallel) {
    70             IDictionary<IOperation, OperationJob> jobs = new Dictionary<IOperation, OperationJob>();
    71             foreach (IOperation op in coll) {
    72               jobs.Add(op, new OperationJob(op));
    73             }
    74 
    75             ExecuteOnHive(jobs, cancellationToken);
    76 
    77             foreach (var kvp in jobs) {
    78               if (kvp.Key is IAtomicOperation) {
    79                 ReIntegrateScope(kvp.Value.Operation as IAtomicOperation, kvp.Key as IAtomicOperation);
    80               } else if (kvp.Key is OperationCollection) {
     69            OperationJob[] jobs = new OperationJob[coll.Count];
     70            for (int i = 0; i < coll.Count; i++) {
     71              jobs[i] = new OperationJob(coll[i]);
     72            }
     73
     74            IScope[] scopes = ExecuteOnHive(jobs, cancellationToken);
     75
     76            for (int i = 0; i < coll.Count; i++) {
     77              if (coll[i] is IAtomicOperation) {
     78                ExchangeScope(scopes[i], ((IAtomicOperation)coll[i]).Scope);
     79              } else if (coll[i] is OperationCollection) {
    8180                // todo ??
    8281              }
     
    136135    /// TODO: Cancelation needs to be refined; all tasks currently stay in Semaphore.WaitOne after cancelation
    137136    /// </summary>
    138     /// <param name="jobDict"></param>
    139     private void ExecuteOnHive(IDictionary<IOperation, OperationJob> jobDict, CancellationToken cancellationToken) {
    140       LogMessage(string.Format("Executing {0} operations on the hive.", jobDict.Count));
    141 
     137    /// <param name="jobs"></param>
     138    private IScope[] ExecuteOnHive(OperationJob[] jobs, CancellationToken cancellationToken) {
     139      LogMessage(string.Format("Executing {0} operations on the hive.", jobs.Length));
     140      IScope[] scopes = new Scope[jobs.Length];
    142141      object locker = new object();
     142
    143143      try {
    144         IDictionary<Guid, IOperation> jobIds = new Dictionary<Guid, IOperation>();
     144        IDictionary<Guid, int> jobIndices = new Dictionary<Guid, int>();
    145145        List<Guid> remainingJobIds = new List<Guid>();
    146         IDictionary<Guid, int> jobNumbers = new Dictionary<Guid, int>(); // for better readability of log
    147146        JobResultList results;
    148147        var pluginsNeeded = ApplicationManager.Manager.Plugins.Select(x => new HivePluginInfoDto { Name = x.Name, Version = x.Version }).ToList();
     
    152151        // create upload-tasks
    153152        var uploadTasks = new List<Task<JobDto>>();
    154         foreach (var kvp in jobDict) {
     153        for (int i = 0; i < jobs.Length; i++) {
     154          var job = jobs[i];
     155
    155156          // shuffle random variable to avoid the same random sequence in each operation; todo: does not yet work (it cannot find the random variable)
    156           IRandom random = FindRandomParameter(kvp.Key as IExecutionContext);
     157          IRandom random = FindRandomParameter(job.Operation as IExecutionContext);
    157158          if (random != null)
    158159            random.Reset(random.Next());
     
    160161          uploadTasks.Add(Task.Factory.StartNew<JobDto>((keyValuePairObj) => {
    161162            return UploadJob(pluginsNeeded, keyValuePairObj, cancellationToken);
    162           }, kvp, cancellationToken));
     163          }, new KeyValuePair<int, OperationJob>(i, job), cancellationToken));
    163164        }
    164165
     
    176177            }
    177178
    178             IOperation key = ((KeyValuePair<IOperation, OperationJob>)task.AsyncState).Key;
     179            int key = ((KeyValuePair<int, OperationJob>)task.AsyncState).Key;
    179180            JobDto jobDto = task.Result;
    180181            lock (locker) {
    181182              uploadCount++;
    182               jobIds.Add(jobDto.Id, key);
     183              jobIndices.Add(jobDto.Id, key);
    183184              remainingJobIds.Add(jobDto.Id);
    184               jobNumbers.Add(jobDto.Id, uploadCount);
    185             }
    186             LogMessage(string.Format("Submitted job #{0}", jobNumbers[jobDto.Id], jobDto.Id));
     185            }
     186            jobs[key] = null; // relax memory
     187            LogMessage(string.Format("Submitted job #{0}", key + 1, jobDto.Id));
    187188            uploadTasks.Remove(task);
    188189          }
     
    203204          finishedCount += jobsFinished.Count();
    204205          var totalExecutionTime = TimeSpan.FromMilliseconds(results.Select(j => j.ExecutionTime).Union(executionTimes).Select(e => e.TotalMilliseconds).Sum());
    205           LogMessage(string.Format("Results polled. Jobs finished: {0}/{1}, TotalExecutionTime: {2}", finishedCount, jobDict.Count, totalExecutionTime));
     206          LogMessage(string.Format("Results polled. Jobs finished: {0}/{1}, TotalExecutionTime: {2}", finishedCount, jobs.Length, totalExecutionTime));
    206207          foreach (var result in jobsFinished) {
    207208            if (result.State == JobState.Finished) {
    208209              downloadTasks.Add(Task.Factory.StartNew<OperationJob>((jobIdObj) => {
    209                 return DownloadJob(jobNumbers, jobIdObj, cancellationToken);
     210                return DownloadJob(jobIndices, jobIdObj, cancellationToken);
    210211              }, result.Id, cancellationToken));
    211212            } else if (result.State == JobState.Aborted) {
    212               LogMessage(string.Format("Job #{0} aborted (id: {1})", jobNumbers[result.Id], result.Id));
     213              LogMessage(string.Format("Job #{0} aborted (id: {1})", jobIndices[result.Id] + 1, result.Id));
    213214            } else if (result.State == JobState.Failed) {
    214               LogMessage(string.Format("Job {0} failed (id: {1}): {2}", jobNumbers[result.Id], result.Id, result.Exception));
     215              LogMessage(string.Format("Job {0} failed (id: {1}): {2}", jobIndices[result.Id] + 1, result.Id, result.Exception));
    215216            }
    216217            remainingJobIds.Remove(result.Id);
     
    231232            throw task.Exception;
    232233          }
    233           jobDict[jobIds[(Guid)task.AsyncState]] = task.Result;
     234          scopes[jobIndices[(Guid)task.AsyncState]] = ((IAtomicOperation)task.Result.Operation).Scope;
    234235          downloadTasks.Remove(task);
    235236        }
     
    238239        // delete jobs
    239240        using (Disposable<IClientFacade> service = ServiceLocator.Instance.ClientFacadePool.GetService()) {
    240           foreach (Guid jobId in jobIds.Keys) {
     241          foreach (Guid jobId in jobIndices.Keys) {
    241242            service.Obj.DeleteJob(jobId);
    242243          }
    243244        }
    244245
    245         LogMessage(string.Format("Operations on the hive finished.", jobDict.Count));
     246        LogMessage(string.Format("Operations on the hive finished.", jobs.Length));
     247        return scopes;
    246248      }
    247249      catch (Exception e) {
     
    251253    }
    252254
     255    private static object locker = new object();
    253256    private JobDto UploadJob(List<HivePluginInfoDto> pluginsNeeded, object keyValuePairObj, CancellationToken cancellationToken) {
    254       var keyValuePair = (KeyValuePair<IOperation, OperationJob>)keyValuePairObj;
     257      var keyValuePair = (KeyValuePair<int, OperationJob>)keyValuePairObj;
    255258      var groups = ResourceIds.Split(';');
    256259      maxSerializedJobsInMemory.WaitOne();
     
    267270        }
    268271      }
    269       maxConcurrentSerializations.WaitOne();
    270       serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value);
    271       maxConcurrentSerializations.Release();
     272      // clone operation and remove unnecessary scopes; don't do this earlier to avoid memory problems
     273      lock (locker) {
     274        keyValuePair.Value.Operation = (IOperation)keyValuePair.Value.Operation.Clone();
     275        if (keyValuePair.Value.Operation is IAtomicOperation)
     276          ((IAtomicOperation)keyValuePair.Value.Operation).Scope.ClearParentScopes();
     277        serializedJob.SerializedJobData = SerializedJob.Serialize(keyValuePair.Value);
     278      }
    272279      serializedJob.JobInfo = new JobDto();
    273280      serializedJob.JobInfo.State = JobState.Offline;
     
    293300    }
    294301
    295     private OperationJob DownloadJob(IDictionary<Guid, int> jobNumbers, object jobIdObj, CancellationToken cancellationToken) {
     302    private OperationJob DownloadJob(IDictionary<Guid, int> jobIndices, object jobIdObj, CancellationToken cancellationToken) {
    296303      Guid jobId = (Guid)jobIdObj;
    297304      SerializedJob serializedJob = null;
     
    313320      serializedJob = null;
    314321      maxSerializedJobsInMemory.Release();
    315       LogMessage(string.Format("Downloaded job #{0}", jobNumbers[jobId], jobId));
     322      LogMessage(string.Format("Downloaded job #{0}", jobIndices[jobId] + 1, jobId));
    316323      return operationJob;
    317324    }
     
    335342    }
    336343  }
     344
     345  public static class ScopeExtensions {
     346    public static void ClearParentScopes(this IScope scope) {
     347      scope.ClearParentScopes(null);
     348    }
     349
     350    public static void ClearParentScopes(this IScope scope, IScope childScope) {
     351      if (childScope != null) {
     352        scope.SubScopes.Clear();
     353        scope.SubScopes.Add(childScope);
     354      }
     355      if (scope.Parent != null)
     356        scope.Parent.ClearParentScopes(scope);
     357    }
     358  }
    337359}
Note: See TracChangeset for help on using the changeset viewer.