#region License Information
/* HeuristicLab
* Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
*
* This file is part of HeuristicLab.
*
* HeuristicLab is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* HeuristicLab is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with HeuristicLab. If not, see .
*/
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using HeuristicLab.CEDMA.DB;
using HeuristicLab.CEDMA.DB.Interfaces;
using HeuristicLab.Core;
using System.Threading;
using HeuristicLab.Grid;
using System.Diagnostics;
using HeuristicLab.Data;
using HeuristicLab.CEDMA.Core;
using HeuristicLab.Operators;
namespace HeuristicLab.CEDMA.Server {
public class RunScheduler {
private class Job {
public long AgentId;
public WaitHandle WaitHandle;
public AtomicOperation Operation;
}
private string serverUri;
private Database database;
private JobManager jobManager;
private const int RELEASE_INTERVAL = 5;
private object remoteCommLock = new object();
private object queueLock = new object();
private Queue jobQueue;
private AutoResetEvent runningJobs = new AutoResetEvent(false);
public RunScheduler(Database database, JobManager jobManager, string serverUri) {
this.database = database;
this.jobManager = jobManager;
this.serverUri = serverUri;
jobQueue = new Queue();
Thread resultsGatheringThread = new Thread(GatherResults);
resultsGatheringThread.Start();
}
public void Run() {
while(true) {
ReleaseWaitingRuns();
Thread.Sleep(TimeSpan.FromSeconds(RELEASE_INTERVAL));
}
}
private void ReleaseWaitingRuns() {
ICollection agents;
lock(remoteCommLock) {
agents = database.GetAgents(ProcessStatus.Waiting);
}
foreach(AgentEntry entry in agents) {
Scope scope = new Scope();
// initialize CEDMA variables for the execution of the agent
scope.AddVariable(new Variable("AgentId", new IntData((int)entry.Id)));
scope.AddVariable(new Variable("CedmaServerUri", new StringData(serverUri)));
byte[] rawData = database.GetAgentRawData(entry.Id);
IOperatorGraph opGraph = (IOperatorGraph)PersistenceManager.RestoreFromGZip(rawData);
PatchLinks(opGraph, new Dictionary());
AtomicOperation operation = new AtomicOperation(opGraph.InitialOperator, scope);
WaitHandle wHandle;
lock(remoteCommLock) {
wHandle = jobManager.BeginExecuteOperation(operation.Scope, operation);
database.UpdateAgent(entry.Id, ProcessStatus.Active);
}
Job job = new Job();
job.AgentId = entry.Id;
job.Operation = operation;
job.WaitHandle = wHandle;
lock(queueLock) {
jobQueue.Enqueue(job);
runningJobs.Set();
}
}
}
private void PatchLinks(IOperatorGraph opGraph, Dictionary patchedOperators) {
Dictionary patchDictionary = new Dictionary();
foreach(IOperator op in opGraph.Operators) {
IOperator patched = PatchLinks(op, patchedOperators);
patchDictionary.Add(op, patched);
}
foreach(KeyValuePair p in patchDictionary) {
IOperator original = p.Key;
IOperator patch = p.Value;
if(original != patch) {
foreach(IOperator subOperator in original.SubOperators) {
patch.AddSubOperator(subOperator);
}
if(opGraph.InitialOperator == original)
opGraph.InitialOperator = patch;
opGraph.RemoveOperator(original.Guid);
opGraph.AddOperator(patch);
}
}
}
private IOperator PatchLinks(IOperator op, Dictionary patchedOperators) {
if(op is OperatorLink) {
OperatorLink link = op as OperatorLink;
if(patchedOperators.ContainsKey(link.Id)) {
return patchedOperators[link.Id];
} else {
OperatorEntry targetEntry = database.GetOperator(link.Id);
IOperator target = (IOperator)PersistenceManager.RestoreFromGZip(targetEntry.RawData);
patchedOperators.Add(link.Id, target);
PatchLinks(target, patchedOperators);
return target;
}
} else if(op is CombinedOperator) {
PatchLinks(((CombinedOperator)op).OperatorGraph, patchedOperators);
return op;
}
return op;
}
private void GatherResults() {
try {
while(true) {
Job job = null;
lock(queueLock) if(jobQueue.Count > 0) job = jobQueue.Dequeue();
if(job == null) runningJobs.WaitOne();
else {
job.WaitHandle.WaitOne();
job.WaitHandle.Close();
lock(remoteCommLock) {
try {
jobManager.EndExecuteOperation(job.Operation);
database.UpdateAgent(job.AgentId, ProcessStatus.Finished);
} catch(JobExecutionException ex) {
database.UpdateAgent(job.AgentId, ProcessStatus.Error);
}
}
}
}
} finally {
Debug.Assert(false); // make sure we are notified when this thread is killed while debugging
}
}
}
}