Free cookie consent management tool by TermsFeed Policy Generator

source: branches/CEDMA-Refactoring-Ticket419/HeuristicLab.CEDMA.Server/RunScheduler.cs @ 1115

Last change on this file since 1115 was 988, checked in by gkronber, 15 years ago

worked on #419 (Refactor CEDMA plugins)

File size: 5.8 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.CEDMA.DB;
27using HeuristicLab.CEDMA.DB.Interfaces;
28using HeuristicLab.Core;
29using System.Threading;
30using HeuristicLab.Grid;
31using System.Diagnostics;
32using HeuristicLab.Data;
33using HeuristicLab.CEDMA.Core;
34using HeuristicLab.Operators;
35
36namespace HeuristicLab.CEDMA.Server {
37  public class RunScheduler {
38    private class Job {
39      public long AgentId;
40      public WaitHandle WaitHandle;
41      public AtomicOperation Operation;
42    }
43    private string serverUri;
44    private JobManager jobManager;
45    private const int RELEASE_INTERVAL = 5;
46    private object remoteCommLock = new object();
47    private object queueLock = new object();
48    private Queue<Job> jobQueue;
49    private AutoResetEvent runningJobs = new AutoResetEvent(false);
50
51    public RunScheduler(JobManager jobManager, string serverUri) {
52      this.jobManager = jobManager;
53      this.serverUri = serverUri;
54      jobQueue = new Queue<Job>();
55      Thread resultsGatheringThread = new Thread(GatherResults);
56      resultsGatheringThread.Start();
57    }
58    public void Run() {
59      while(true) {
60        ReleaseWaitingRuns();
61        Thread.Sleep(TimeSpan.FromSeconds(RELEASE_INTERVAL));
62      }
63    }
64    private void ReleaseWaitingRuns() {
65      ICollection<AgentEntry> agents;
66      lock(remoteCommLock) {
67        agents = database.GetAgents(ProcessStatus.Waiting);
68      }
69      foreach(AgentEntry entry in agents) {
70        Scope scope = new Scope();
71        // initialize CEDMA variables for the execution of the agent
72        scope.AddVariable(new Variable("AgentId", new IntData((int)entry.Id)));
73        scope.AddVariable(new Variable("CedmaServerUri", new StringData(serverUri)));
74
75        byte[] rawData = database.GetAgentRawData(entry.Id);
76        IOperatorGraph opGraph = (IOperatorGraph)PersistenceManager.RestoreFromGZip(rawData);
77
78        PatchLinks(opGraph, new Dictionary<long, IOperator>());
79
80        AtomicOperation operation = new AtomicOperation(opGraph.InitialOperator, scope);
81        WaitHandle wHandle;
82        lock(remoteCommLock) {
83          wHandle = jobManager.BeginExecuteOperation(operation.Scope, operation);
84          database.UpdateAgent(entry.Id, ProcessStatus.Active);
85        }
86
87        Job job = new Job();
88        job.AgentId = entry.Id;
89        job.Operation = operation;
90        job.WaitHandle = wHandle;
91
92        lock(queueLock) {
93          jobQueue.Enqueue(job);
94          runningJobs.Set();
95        }
96      }
97    }
98
99    private void PatchLinks(IOperatorGraph opGraph, Dictionary<long, IOperator> patchedOperators) {
100      Dictionary<IOperator, IOperator> patchDictionary = new Dictionary<IOperator, IOperator>();
101      foreach(IOperator op in opGraph.Operators) {
102        IOperator patched = PatchLinks(op, patchedOperators);
103        patchDictionary.Add(op, patched);
104      }
105      foreach(KeyValuePair<IOperator, IOperator> p in patchDictionary) {
106        IOperator original = p.Key;
107        IOperator patch = p.Value;
108        if(original != patch) {
109          foreach(IOperator subOperator in original.SubOperators) {
110            patch.AddSubOperator(subOperator);
111          }
112          if(opGraph.InitialOperator == original)
113            opGraph.InitialOperator = patch;
114          opGraph.RemoveOperator(original.Guid);
115          opGraph.AddOperator(patch);
116        }
117      }
118    }
119
120    private IOperator PatchLinks(IOperator op, Dictionary<long, IOperator> patchedOperators) {
121      if(op is OperatorLink) {
122        OperatorLink link = op as OperatorLink;
123        if(patchedOperators.ContainsKey(link.Id)) {
124          return patchedOperators[link.Id];
125        } else {
126          OperatorEntry targetEntry = database.GetOperator(link.Id);
127          IOperator target = (IOperator)PersistenceManager.RestoreFromGZip(targetEntry.RawData);
128          patchedOperators.Add(link.Id, target);
129          PatchLinks(target, patchedOperators);
130          return target;
131        }
132      } else if(op is CombinedOperator) {
133        PatchLinks(((CombinedOperator)op).OperatorGraph, patchedOperators);
134        return op;
135      }
136      return op;
137    }
138
139    private void GatherResults() {
140      try {
141        while(true) {
142          Job job = null;
143          lock(queueLock) if(jobQueue.Count > 0) job = jobQueue.Dequeue();
144          if(job == null) runningJobs.WaitOne();
145          else {
146            job.WaitHandle.WaitOne();
147            job.WaitHandle.Close();
148            lock(remoteCommLock) {
149              try {
150                jobManager.EndExecuteOperation(job.Operation);
151                database.UpdateAgent(job.AgentId, ProcessStatus.Finished);
152              } catch(JobExecutionException ex) {
153                database.UpdateAgent(job.AgentId, ProcessStatus.Error);
154              }
155            }
156          }
157        }
158      } finally {
159        Debug.Assert(false); // make sure we are notified when this thread is killed while debugging
160      }
161    }
162  }
163}
Note: See TracBrowser for help on using the repository browser.