Changeset 390 for trunk/sources/HeuristicLab.CEDMA.Server/RunScheduler.cs
- Timestamp:
- 07/22/08 19:15:29 (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.CEDMA.Server/RunScheduler.cs
r380 r390 30 30 using HeuristicLab.CEDMA.Core; 31 31 using HeuristicLab.Grid; 32 using System.Diagnostics; 32 33 33 34 namespace HeuristicLab.CEDMA.Server { 34 35 public class RunScheduler { 36 private class Job { 37 public long AgentId; 38 public WaitHandle WaitHandle; 39 public AtomicOperation Operation; 40 } 35 41 private Database database; 36 42 private JobManager jobManager; 37 43 private const int RELEASE_INTERVAL = 5; 38 44 private object remoteCommLock = new object(); 39 private object collectionsLock = new object(); 40 private Queue<WaitHandle> waithandles; 41 private Dictionary<WaitHandle, AtomicOperation> runningOperations; 42 private Dictionary<WaitHandle, long> runningEntries; 45 private object queueLock = new object(); 46 private Queue<Job> jobQueue; 43 47 44 48 public RunScheduler(Database database, JobManager jobManager) { 45 49 this.database = database; 46 50 this.jobManager = jobManager; 47 runningOperations = new Dictionary<WaitHandle, AtomicOperation>(); 48 runningEntries = new Dictionary<WaitHandle, long>(); 49 waithandles = new Queue<WaitHandle>(); 51 jobQueue = new Queue<Job>(); 50 52 Thread resultsGatheringThread = new Thread(GatherResults); 51 53 resultsGatheringThread.Start(); … … 58 60 } 59 61 private void ReleaseWaitingRuns() { 60 I Collection<RunEntry> runs;62 IEnumerable<AgentEntry> agents; 61 63 lock(remoteCommLock) { 62 runs = database.GetRuns(ProcessStatus.Waiting);64 agents = database.GetAgents(ProcessStatus.Waiting).Where(a=>!a.ControllerAgent); 63 65 } 64 foreach( RunEntry entry in runs) {66 foreach(AgentEntry entry in agents) { 65 67 IOperatorGraph opGraph = (IOperatorGraph)DbPersistenceManager.Restore(entry.RawData); 66 Scope scope = new Scope(); 67 AtomicOperation op = new AtomicOperation(opGraph.InitialOperator, scope); 68 AtomicOperation op = new AtomicOperation(opGraph.InitialOperator, new Scope()); 68 69 WaitHandle wHandle; 69 70 lock(remoteCommLock) { 70 wHandle = jobManager.BeginExecuteOperation(scope, op); 71 database.UpdateRunStatus(entry.Id, ProcessStatus.Active); 72 database.UpdateRunStart(entry.Id, DateTime.Now); 71 wHandle = jobManager.BeginExecuteOperation(op.Scope, op); 72 database.UpdateAgent(entry.Id, ProcessStatus.Active); 73 73 } 74 74 75 lock(collectionsLock) { 76 waithandles.Enqueue(wHandle); 77 runningOperations[wHandle] = op; 78 runningEntries[wHandle] = entry.Id; 75 Job job = new Job(); 76 job.AgentId = entry.Id; 77 job.Operation = op; 78 job.WaitHandle = wHandle; 79 80 lock(queueLock) { 81 jobQueue.Enqueue(job); 79 82 } 80 83 } … … 82 85 83 86 private void GatherResults() { 84 while(true) { 85 if(waithandles.Count == 0) Thread.Sleep(1000); 86 else { 87 WaitHandle w; 88 lock(collectionsLock) { 89 w = waithandles.Dequeue(); 90 } 91 w.WaitOne(); 92 long id; 93 AtomicOperation op; 94 lock(collectionsLock) { 95 id = runningEntries[w]; 96 runningEntries.Remove(w); 97 op = runningOperations[w]; 98 runningOperations.Remove(w); 99 } 100 w.Close(); 101 lock(remoteCommLock) { 102 jobManager.EndExecuteOperation(op); 103 database.UpdateRunStatus(id, ProcessStatus.Finished); 104 database.UpdateRunFinished(id, DateTime.Now); 87 try { 88 while(true) { 89 int runningJobs; 90 lock(queueLock) runningJobs = jobQueue.Count; 91 if(runningJobs==0) Thread.Sleep(1000); // TASK: replace with waithandle 92 else { 93 Job job; 94 lock(queueLock) { 95 job = jobQueue.Dequeue(); 96 } 97 job.WaitHandle.WaitOne(); 98 job.WaitHandle.Close(); 99 lock(remoteCommLock) { 100 jobManager.EndExecuteOperation(job.Operation); 101 database.UpdateAgent(job.AgentId, ProcessStatus.Finished); 102 } 105 103 } 106 104 } 105 } finally { 106 Debug.Assert(false); // make sure we are notified when this thread is killed while debugging 107 107 } 108 108 }
Note: See TracChangeset
for help on using the changeset viewer.