Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
07/22/08 19:15:29 (16 years ago)
Author:
gkronber
Message:

unified runs and agents (are actually the same with the minor difference that agents can create new agents (runs)) (ticket #188)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.CEDMA.Server/RunScheduler.cs

    r380 r390  
    3030using HeuristicLab.CEDMA.Core;
    3131using HeuristicLab.Grid;
     32using System.Diagnostics;
    3233
    3334namespace HeuristicLab.CEDMA.Server {
    3435  public class RunScheduler {
     36    private class Job {
     37      public long AgentId;
     38      public WaitHandle WaitHandle;
     39      public AtomicOperation Operation;
     40    }
    3541    private Database database;
    3642    private JobManager jobManager;
    3743    private const int RELEASE_INTERVAL = 5;
    3844    private object remoteCommLock = new object();
    39     private object collectionsLock = new object();
    40     private Queue<WaitHandle> waithandles;
    41     private Dictionary<WaitHandle, AtomicOperation> runningOperations;
    42     private Dictionary<WaitHandle, long> runningEntries;
     45    private object queueLock = new object();
     46    private Queue<Job> jobQueue;
    4347
    4448    public RunScheduler(Database database, JobManager jobManager) {
    4549      this.database = database;
    4650      this.jobManager = jobManager;
    47       runningOperations = new Dictionary<WaitHandle, AtomicOperation>();
    48       runningEntries = new Dictionary<WaitHandle, long>();
    49       waithandles = new Queue<WaitHandle>();
     51      jobQueue = new Queue<Job>();
    5052      Thread resultsGatheringThread = new Thread(GatherResults);
    5153      resultsGatheringThread.Start();
     
    5860    }
    5961    private void ReleaseWaitingRuns() {
    60       ICollection<RunEntry> runs;
     62      IEnumerable<AgentEntry> agents;
    6163      lock(remoteCommLock) {
    62         runs = database.GetRuns(ProcessStatus.Waiting);
     64        agents = database.GetAgents(ProcessStatus.Waiting).Where(a=>!a.ControllerAgent);
    6365      }
    64       foreach(RunEntry entry in runs) {
     66      foreach(AgentEntry entry in agents) {
    6567        IOperatorGraph opGraph = (IOperatorGraph)DbPersistenceManager.Restore(entry.RawData);
    66         Scope scope = new Scope();
    67         AtomicOperation op = new AtomicOperation(opGraph.InitialOperator, scope);
     68        AtomicOperation op = new AtomicOperation(opGraph.InitialOperator, new Scope());
    6869        WaitHandle wHandle;
    6970        lock(remoteCommLock) {
    70           wHandle = jobManager.BeginExecuteOperation(scope, op);
    71           database.UpdateRunStatus(entry.Id, ProcessStatus.Active);
    72           database.UpdateRunStart(entry.Id, DateTime.Now);
     71          wHandle = jobManager.BeginExecuteOperation(op.Scope, op);
     72          database.UpdateAgent(entry.Id, ProcessStatus.Active);
    7373        }
    7474
    75         lock(collectionsLock) {
    76           waithandles.Enqueue(wHandle);
    77           runningOperations[wHandle] = op;
    78           runningEntries[wHandle] = entry.Id;
     75        Job job = new Job();
     76        job.AgentId = entry.Id;
     77        job.Operation = op;
     78        job.WaitHandle = wHandle;
     79
     80        lock(queueLock) {
     81          jobQueue.Enqueue(job);
    7982        }
    8083      }
     
    8285
    8386    private void GatherResults() {
    84       while(true) {
    85         if(waithandles.Count == 0) Thread.Sleep(1000);
    86         else {
    87           WaitHandle w;
    88           lock(collectionsLock) {
    89             w = waithandles.Dequeue();
    90           }
    91           w.WaitOne();
    92           long id;
    93           AtomicOperation op;
    94           lock(collectionsLock) {
    95             id = runningEntries[w];
    96             runningEntries.Remove(w);
    97             op = runningOperations[w];
    98             runningOperations.Remove(w);
    99           }
    100           w.Close();
    101           lock(remoteCommLock) {
    102             jobManager.EndExecuteOperation(op);
    103             database.UpdateRunStatus(id, ProcessStatus.Finished);
    104             database.UpdateRunFinished(id, DateTime.Now);
     87      try {
     88        while(true) {
     89          int runningJobs;
     90          lock(queueLock) runningJobs = jobQueue.Count;
     91          if(runningJobs==0) Thread.Sleep(1000); // TASK: replace with waithandle
     92          else {
     93            Job job;
     94            lock(queueLock) {
     95              job = jobQueue.Dequeue();
     96            }
     97            job.WaitHandle.WaitOne();
     98            job.WaitHandle.Close();
     99            lock(remoteCommLock) {
     100              jobManager.EndExecuteOperation(job.Operation);
     101              database.UpdateAgent(job.AgentId, ProcessStatus.Finished);
     102            }
    105103          }
    106104        }
     105      } finally {
     106        Debug.Assert(false); // make sure we are notified when this thread is killed while debugging
    107107      }
    108108    }
Note: See TracChangeset for help on using the changeset viewer.