Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.CEDMA.Server/RunScheduler.cs @ 405

Last change on this file since 405 was 403, checked in by gkronber, 16 years ago
  • fixed ticket #201 (Fix plugin dependencies for CEDMA plugins)
  • deleted classes DbPersistenceManager because the common code was moved to the PersistenceManager in HeuristicLab.Core.
File size: 4.1 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.CEDMA.DB;
27using HeuristicLab.CEDMA.DB.Interfaces;
28using HeuristicLab.Core;
29using System.Threading;
30using HeuristicLab.Grid;
31using System.Diagnostics;
32using HeuristicLab.Data;
33
34namespace HeuristicLab.CEDMA.Server {
35  public class RunScheduler {
36    private class Job {
37      public long AgentId;
38      public WaitHandle WaitHandle;
39      public AtomicOperation Operation;
40    }
41    private string serverUri;
42    private Database database;
43    private JobManager jobManager;
44    private const int RELEASE_INTERVAL = 5;
45    private object remoteCommLock = new object();
46    private object queueLock = new object();
47    private Queue<Job> jobQueue;
48    private AutoResetEvent runningJobs = new AutoResetEvent(false);
49
50    public RunScheduler(Database database, JobManager jobManager, string serverUri) {
51      this.database = database;
52      this.jobManager = jobManager;
53      this.serverUri = serverUri;
54      jobQueue = new Queue<Job>();
55      Thread resultsGatheringThread = new Thread(GatherResults);
56      resultsGatheringThread.Start();
57    }
58    public void Run() {
59      while(true) {
60        ReleaseWaitingRuns();
61        Thread.Sleep(TimeSpan.FromSeconds(RELEASE_INTERVAL));
62      }
63    }
64    private void ReleaseWaitingRuns() {
65      ICollection<AgentEntry> agents;
66      lock(remoteCommLock) {
67        agents = database.GetAgents(ProcessStatus.Waiting);
68      }
69      foreach(AgentEntry entry in agents) {
70        Scope scope = new Scope();
71        // initialize CEDMA variables for the execution of the agent
72        scope.AddVariable(new Variable("AgentId", new IntData((int)entry.Id)));
73        scope.AddVariable(new Variable("CedmaServerUri", new StringData(serverUri)));
74        IOperatorGraph opGraph = (IOperatorGraph)PersistenceManager.RestoreFromGZip(entry.RawData);
75        AtomicOperation op = new AtomicOperation(opGraph.InitialOperator, scope);
76        WaitHandle wHandle;
77        lock(remoteCommLock) {
78          wHandle = jobManager.BeginExecuteOperation(op.Scope, op);
79          database.UpdateAgent(entry.Id, ProcessStatus.Active);
80        }
81
82        Job job = new Job();
83        job.AgentId = entry.Id;
84        job.Operation = op;
85        job.WaitHandle = wHandle;
86
87        lock(queueLock) {
88          jobQueue.Enqueue(job);
89          runningJobs.Set();
90        }
91      }
92    }
93
94    private void GatherResults() {
95      try {
96        while(true) {
97          Job job = null;
98          lock(queueLock) if(jobQueue.Count > 0) job = jobQueue.Dequeue();
99          if(job == null) runningJobs.WaitOne();
100          else {
101            job.WaitHandle.WaitOne();
102            job.WaitHandle.Close();
103            lock(remoteCommLock) {
104              try {
105                jobManager.EndExecuteOperation(job.Operation);
106                database.UpdateAgent(job.AgentId, ProcessStatus.Finished);
107              } catch(JobExecutionException ex) {
108                database.UpdateAgent(job.AgentId, ProcessStatus.Error);
109              }
110            }
111          }
112        }
113      } finally {
114        Debug.Assert(false); // make sure we are notified when this thread is killed while debugging
115      }
116    }
117  }
118}
Note: See TracBrowser for help on using the repository browser.