Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.CEDMA.Server/RunScheduler.cs @ 405

Last change on this file since 405 was 403, checked in by gkronber, 16 years ago
  • fixed ticket #201 (Fix plugin dependencies for CEDMA plugins)
  • deleted classes DbPersistenceManager because the common code was moved to the PersistenceManager in HeuristicLab.Core.
File size: 4.1 KB
RevLine 
[375]1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.CEDMA.DB;
27using HeuristicLab.CEDMA.DB.Interfaces;
28using HeuristicLab.Core;
29using System.Threading;
30using HeuristicLab.Grid;
[390]31using System.Diagnostics;
[393]32using HeuristicLab.Data;
[375]33
34namespace HeuristicLab.CEDMA.Server {
35  public class RunScheduler {
[390]36    private class Job {
37      public long AgentId;
38      public WaitHandle WaitHandle;
39      public AtomicOperation Operation;
40    }
[393]41    private string serverUri;
[375]42    private Database database;
43    private JobManager jobManager;
44    private const int RELEASE_INTERVAL = 5;
[378]45    private object remoteCommLock = new object();
[390]46    private object queueLock = new object();
47    private Queue<Job> jobQueue;
[392]48    private AutoResetEvent runningJobs = new AutoResetEvent(false);
[378]49
[393]50    public RunScheduler(Database database, JobManager jobManager, string serverUri) {
[375]51      this.database = database;
52      this.jobManager = jobManager;
[393]53      this.serverUri = serverUri;
[390]54      jobQueue = new Queue<Job>();
[380]55      Thread resultsGatheringThread = new Thread(GatherResults);
56      resultsGatheringThread.Start();
[375]57    }
58    public void Run() {
59      while(true) {
60        ReleaseWaitingRuns();
61        Thread.Sleep(TimeSpan.FromSeconds(RELEASE_INTERVAL));
62      }
63    }
64    private void ReleaseWaitingRuns() {
[398]65      ICollection<AgentEntry> agents;
[378]66      lock(remoteCommLock) {
[398]67        agents = database.GetAgents(ProcessStatus.Waiting);
[378]68      }
[390]69      foreach(AgentEntry entry in agents) {
[393]70        Scope scope = new Scope();
71        // initialize CEDMA variables for the execution of the agent
72        scope.AddVariable(new Variable("AgentId", new IntData((int)entry.Id)));
73        scope.AddVariable(new Variable("CedmaServerUri", new StringData(serverUri)));
[403]74        IOperatorGraph opGraph = (IOperatorGraph)PersistenceManager.RestoreFromGZip(entry.RawData);
[393]75        AtomicOperation op = new AtomicOperation(opGraph.InitialOperator, scope);
[378]76        WaitHandle wHandle;
77        lock(remoteCommLock) {
[390]78          wHandle = jobManager.BeginExecuteOperation(op.Scope, op);
79          database.UpdateAgent(entry.Id, ProcessStatus.Active);
[378]80        }
[375]81
[390]82        Job job = new Job();
83        job.AgentId = entry.Id;
84        job.Operation = op;
85        job.WaitHandle = wHandle;
86
87        lock(queueLock) {
88          jobQueue.Enqueue(job);
[392]89          runningJobs.Set();
[380]90        }
[378]91      }
92    }
[375]93
[380]94    private void GatherResults() {
[390]95      try {
96        while(true) {
[392]97          Job job = null;
98          lock(queueLock) if(jobQueue.Count > 0) job = jobQueue.Dequeue();
99          if(job == null) runningJobs.WaitOne();
[390]100          else {
101            job.WaitHandle.WaitOne();
102            job.WaitHandle.Close();
103            lock(remoteCommLock) {
[392]104              try {
105                jobManager.EndExecuteOperation(job.Operation);
106                database.UpdateAgent(job.AgentId, ProcessStatus.Finished);
107              } catch(JobExecutionException ex) {
108                database.UpdateAgent(job.AgentId, ProcessStatus.Error);
109              }
[390]110            }
[380]111          }
112        }
[390]113      } finally {
114        Debug.Assert(false); // make sure we are notified when this thread is killed while debugging
[375]115      }
116    }
117  }
118}
Note: See TracBrowser for help on using the repository browser.