#region License Information /* HeuristicLab * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Text; using HeuristicLab.CEDMA.DB; using HeuristicLab.CEDMA.DB.Interfaces; using HeuristicLab.Core; using System.Threading; using HeuristicLab.CEDMA.Core; using HeuristicLab.Grid; namespace HeuristicLab.CEDMA.Server { public class RunScheduler { private Database database; private JobManager jobManager; private const int RELEASE_INTERVAL = 5; private object remoteCommLock = new object(); private object collectionsLock = new object(); private Queue waithandles; private Dictionary runningOperations; private Dictionary runningEntries; public RunScheduler(Database database, JobManager jobManager) { this.database = database; this.jobManager = jobManager; runningOperations = new Dictionary(); runningEntries = new Dictionary(); waithandles = new Queue(); Thread resultsGatheringThread = new Thread(GatherResults); resultsGatheringThread.Start(); } public void Run() { while(true) { ReleaseWaitingRuns(); Thread.Sleep(TimeSpan.FromSeconds(RELEASE_INTERVAL)); } } private void ReleaseWaitingRuns() { ICollection runs; lock(remoteCommLock) { runs = database.GetRuns(ProcessStatus.Waiting); } foreach(RunEntry entry in runs) { IOperatorGraph opGraph = (IOperatorGraph)DbPersistenceManager.Restore(entry.RawData); Scope scope = new Scope(); AtomicOperation op = new AtomicOperation(opGraph.InitialOperator, scope); WaitHandle wHandle; lock(remoteCommLock) { wHandle = jobManager.BeginExecuteOperation(scope, op); database.UpdateRunStatus(entry.Id, ProcessStatus.Active); database.UpdateRunStart(entry.Id, DateTime.Now); } lock(collectionsLock) { waithandles.Enqueue(wHandle); runningOperations[wHandle] = op; runningEntries[wHandle] = entry.Id; } } } private void GatherResults() { while(true) { if(waithandles.Count == 0) Thread.Sleep(1000); else { WaitHandle w; lock(collectionsLock) { w = waithandles.Dequeue(); } w.WaitOne(); long id; AtomicOperation op; lock(collectionsLock) { id = runningEntries[w]; runningEntries.Remove(w); op = runningOperations[w]; runningOperations.Remove(w); } w.Close(); lock(remoteCommLock) { jobManager.EndExecuteOperation(op); database.UpdateRunStatus(id, ProcessStatus.Finished); database.UpdateRunFinished(id, DateTime.Now); } } } } } }