Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.CEDMA.Server/RunScheduler.cs @ 392

Last change on this file since 392 was 392, checked in by gkronber, 16 years ago

worked on unification of agents and runs (ticket #188)

File size: 3.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.CEDMA.DB;
27using HeuristicLab.CEDMA.DB.Interfaces;
28using HeuristicLab.Core;
29using System.Threading;
30using HeuristicLab.CEDMA.Core;
31using HeuristicLab.Grid;
32using System.Diagnostics;
33
34namespace HeuristicLab.CEDMA.Server {
35  public class RunScheduler {
36    private class Job {
37      public long AgentId;
38      public WaitHandle WaitHandle;
39      public AtomicOperation Operation;
40    }
41    private Database database;
42    private JobManager jobManager;
43    private const int RELEASE_INTERVAL = 5;
44    private object remoteCommLock = new object();
45    private object queueLock = new object();
46    private Queue<Job> jobQueue;
47    private AutoResetEvent runningJobs = new AutoResetEvent(false);
48
49    public RunScheduler(Database database, JobManager jobManager) {
50      this.database = database;
51      this.jobManager = jobManager;
52      jobQueue = new Queue<Job>();
53      Thread resultsGatheringThread = new Thread(GatherResults);
54      resultsGatheringThread.Start();
55    }
56    public void Run() {
57      while(true) {
58        ReleaseWaitingRuns();
59        Thread.Sleep(TimeSpan.FromSeconds(RELEASE_INTERVAL));
60      }
61    }
62    private void ReleaseWaitingRuns() {
63      IEnumerable<AgentEntry> agents;
64      lock(remoteCommLock) {
65        agents = database.GetAgents(ProcessStatus.Waiting).Where(a=>!a.ControllerAgent);
66      }
67      foreach(AgentEntry entry in agents) {
68        IOperatorGraph opGraph = (IOperatorGraph)DbPersistenceManager.Restore(entry.RawData);
69        AtomicOperation op = new AtomicOperation(opGraph.InitialOperator, new Scope());
70        WaitHandle wHandle;
71        lock(remoteCommLock) {
72          wHandle = jobManager.BeginExecuteOperation(op.Scope, op);
73          database.UpdateAgent(entry.Id, ProcessStatus.Active);
74        }
75
76        Job job = new Job();
77        job.AgentId = entry.Id;
78        job.Operation = op;
79        job.WaitHandle = wHandle;
80
81        lock(queueLock) {
82          jobQueue.Enqueue(job);
83          runningJobs.Set();
84        }
85      }
86    }
87
88    private void GatherResults() {
89      try {
90        while(true) {
91          Job job = null;
92          lock(queueLock) if(jobQueue.Count > 0) job = jobQueue.Dequeue();
93          if(job == null) runningJobs.WaitOne();
94          else {
95            job.WaitHandle.WaitOne();
96            job.WaitHandle.Close();
97            lock(remoteCommLock) {
98              try {
99                jobManager.EndExecuteOperation(job.Operation);
100                database.UpdateAgent(job.AgentId, ProcessStatus.Finished);
101              } catch(JobExecutionException ex) {
102                database.UpdateAgent(job.AgentId, ProcessStatus.Error);
103              }
104            }
105          }
106        }
107      } finally {
108        Debug.Assert(false); // make sure we are notified when this thread is killed while debugging
109      }
110    }
111  }
112}
Note: See TracBrowser for help on using the repository browser.