Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.CEDMA.Server/RunScheduler.cs @ 885

Last change on this file since 885 was 503, checked in by gkronber, 16 years ago

Improved loading speed of agent and results list by lazily loading and extracting rawdata of operator graphs and result items (ticket #249)

File size: 5.9 KB
Line 
1#region License Information
2/* HeuristicLab
3 * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL)
4 *
5 * This file is part of HeuristicLab.
6 *
7 * HeuristicLab is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * HeuristicLab is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with HeuristicLab. If not, see <http://www.gnu.org/licenses/>.
19 */
20#endregion
21
22using System;
23using System.Collections.Generic;
24using System.Linq;
25using System.Text;
26using HeuristicLab.CEDMA.DB;
27using HeuristicLab.CEDMA.DB.Interfaces;
28using HeuristicLab.Core;
29using System.Threading;
30using HeuristicLab.Grid;
31using System.Diagnostics;
32using HeuristicLab.Data;
33using HeuristicLab.CEDMA.Core;
34using HeuristicLab.Operators;
35
36namespace HeuristicLab.CEDMA.Server {
37  public class RunScheduler {
38    private class Job {
39      public long AgentId;
40      public WaitHandle WaitHandle;
41      public AtomicOperation Operation;
42    }
43    private string serverUri;
44    private Database database;
45    private JobManager jobManager;
46    private const int RELEASE_INTERVAL = 5;
47    private object remoteCommLock = new object();
48    private object queueLock = new object();
49    private Queue<Job> jobQueue;
50    private AutoResetEvent runningJobs = new AutoResetEvent(false);
51
52    public RunScheduler(Database database, JobManager jobManager, string serverUri) {
53      this.database = database;
54      this.jobManager = jobManager;
55      this.serverUri = serverUri;
56      jobQueue = new Queue<Job>();
57      Thread resultsGatheringThread = new Thread(GatherResults);
58      resultsGatheringThread.Start();
59    }
60    public void Run() {
61      while(true) {
62        ReleaseWaitingRuns();
63        Thread.Sleep(TimeSpan.FromSeconds(RELEASE_INTERVAL));
64      }
65    }
66    private void ReleaseWaitingRuns() {
67      ICollection<AgentEntry> agents;
68      lock(remoteCommLock) {
69        agents = database.GetAgents(ProcessStatus.Waiting);
70      }
71      foreach(AgentEntry entry in agents) {
72        Scope scope = new Scope();
73        // initialize CEDMA variables for the execution of the agent
74        scope.AddVariable(new Variable("AgentId", new IntData((int)entry.Id)));
75        scope.AddVariable(new Variable("CedmaServerUri", new StringData(serverUri)));
76
77        byte[] rawData = database.GetAgentRawData(entry.Id);
78        IOperatorGraph opGraph = (IOperatorGraph)PersistenceManager.RestoreFromGZip(rawData);
79
80        PatchLinks(opGraph, new Dictionary<long, IOperator>());
81
82        AtomicOperation operation = new AtomicOperation(opGraph.InitialOperator, scope);
83        WaitHandle wHandle;
84        lock(remoteCommLock) {
85          wHandle = jobManager.BeginExecuteOperation(operation.Scope, operation);
86          database.UpdateAgent(entry.Id, ProcessStatus.Active);
87        }
88
89        Job job = new Job();
90        job.AgentId = entry.Id;
91        job.Operation = operation;
92        job.WaitHandle = wHandle;
93
94        lock(queueLock) {
95          jobQueue.Enqueue(job);
96          runningJobs.Set();
97        }
98      }
99    }
100
101    private void PatchLinks(IOperatorGraph opGraph, Dictionary<long, IOperator> patchedOperators) {
102      Dictionary<IOperator, IOperator> patchDictionary = new Dictionary<IOperator, IOperator>();
103      foreach(IOperator op in opGraph.Operators) {
104        IOperator patched = PatchLinks(op, patchedOperators);
105        patchDictionary.Add(op, patched);
106      }
107      foreach(KeyValuePair<IOperator, IOperator> p in patchDictionary) {
108        IOperator original = p.Key;
109        IOperator patch = p.Value;
110        if(original != patch) {
111          foreach(IOperator subOperator in original.SubOperators) {
112            patch.AddSubOperator(subOperator);
113          }
114          if(opGraph.InitialOperator == original)
115            opGraph.InitialOperator = patch;
116          opGraph.RemoveOperator(original.Guid);
117          opGraph.AddOperator(patch);
118        }
119      }
120    }
121
122    private IOperator PatchLinks(IOperator op, Dictionary<long, IOperator> patchedOperators) {
123      if(op is OperatorLink) {
124        OperatorLink link = op as OperatorLink;
125        if(patchedOperators.ContainsKey(link.Id)) {
126          return patchedOperators[link.Id];
127        } else {
128          OperatorEntry targetEntry = database.GetOperator(link.Id);
129          IOperator target = (IOperator)PersistenceManager.RestoreFromGZip(targetEntry.RawData);
130          patchedOperators.Add(link.Id, target);
131          PatchLinks(target, patchedOperators);
132          return target;
133        }
134      } else if(op is CombinedOperator) {
135        PatchLinks(((CombinedOperator)op).OperatorGraph, patchedOperators);
136        return op;
137      }
138      return op;
139    }
140
141    private void GatherResults() {
142      try {
143        while(true) {
144          Job job = null;
145          lock(queueLock) if(jobQueue.Count > 0) job = jobQueue.Dequeue();
146          if(job == null) runningJobs.WaitOne();
147          else {
148            job.WaitHandle.WaitOne();
149            job.WaitHandle.Close();
150            lock(remoteCommLock) {
151              try {
152                jobManager.EndExecuteOperation(job.Operation);
153                database.UpdateAgent(job.AgentId, ProcessStatus.Finished);
154              } catch(JobExecutionException ex) {
155                database.UpdateAgent(job.AgentId, ProcessStatus.Error);
156              }
157            }
158          }
159        }
160      } finally {
161        Debug.Assert(false); // make sure we are notified when this thread is killed while debugging
162      }
163    }
164  }
165}
Note: See TracBrowser for help on using the repository browser.