Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs @ 219

Last change on this file since 219 was 219, checked in by gkronber, 16 years ago
  • extracted communication code out of the DistributedEngine into class JobManager
  • implemented a method to retrieve the JobState for a given job (specified by it's guid) in GridServer
  • implemented restarting of jobs in JobManager
  • improved exception handling in JobManager and DistributedEngine

(ticket #136)

File size: 5.2 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using System.ServiceModel;
6using HeuristicLab.Grid;
7using System.Threading;
8using HeuristicLab.Core;
9using System.IO;
10using System.IO.Compression;
11using System.Windows.Forms;
12
13namespace HeuristicLab.DistributedEngine {
14  class JobManager {
15    private IGridServer server;
16    private string address;
17    private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>();
18    private Dictionary<Guid, byte[]> runningEngines = new Dictionary<Guid, byte[]>();
19    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
20    private object locker = new object();
21    private const int MAX_RESTARTS = 5;
22    private Exception exception;
23    public Exception Exception {
24      get { return exception; }
25    }
26
27    public JobManager(string address) {
28      this.address = address;
29    }
30
31    internal void Reset() {
32      lock(locker) {
33        // open a new channel
34        NetTcpBinding binding = new NetTcpBinding();
35        binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
36        binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
37        binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
38        binding.Security.Mode = SecurityMode.None;
39        ChannelFactory<IGridServer> factory = new ChannelFactory<IGridServer>(binding);
40        server = factory.CreateChannel(new EndpointAddress(address));
41
42        foreach(WaitHandle wh in waithandles.Values) wh.Close();
43        waithandles.Clear();
44        engineOperations.Clear();
45        runningEngines.Clear();
46        exception = null;
47      }
48    }
49
50    public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) {
51      ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed?
52      MemoryStream memStream = new MemoryStream();
53      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
54      PersistenceManager.Save(engine, stream); // Careful! Make sure that persistence is thread-safe!
55      stream.Close();
56      Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
57      lock(locker) {
58        runningEngines[currentEngineGuid] = memStream.ToArray();
59        engineOperations[currentEngineGuid] = operation;
60        waithandles[currentEngineGuid] = new ManualResetEvent(false);
61        ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
62      }
63      return waithandles[currentEngineGuid];
64    }
65
66    private void TryGetResult(object state) {
67      Guid engineGuid = (Guid)state;
68      int restartCounter = 0;
69      do {
70        try {
71          byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100);
72          if(resultXml != null) {
73            // restore the engine
74            GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress);
75            ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
76
77            // merge the results
78            IScope oldScope = engineOperations[engineGuid].Scope;
79            oldScope.Clear();
80            foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) {
81              oldScope.AddVariable(variable);
82            }
83            foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) {
84              oldScope.AddSubScope(subScope);
85            }
86            foreach(KeyValuePair<string, string> alias in resultEngine.InitialOperation.Scope.Aliases) {
87              oldScope.AddAlias(alias.Key, alias.Value);
88            }
89
90            lock(locker) {
91              // signal the wait handle and clean up then return
92              waithandles[engineGuid].Set();
93              engineOperations.Remove(engineGuid);
94              waithandles.Remove(engineGuid);
95              runningEngines.Remove(engineGuid);
96            }
97            return;
98          } else {
99            // check if the server is still working on the job
100            JobState jobState = server.JobState(engineGuid);
101            if(jobState == JobState.Unkown) {
102              // restart job
103              byte[] packedEngine;
104              lock(locker) {
105                packedEngine = runningEngines[engineGuid];
106              }
107              server.BeginExecuteEngine(packedEngine);
108              restartCounter++;
109            }
110          }
111        } catch(Exception e) {
112          // catch all exceptions set an exception flag, signal the wait-handle and exit the routine
113          this.exception = new Exception("There was a problem with parallel execution", e);
114          waithandles[engineGuid].Set();
115          return;
116        }
117
118        // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem
119        if(restartCounter > MAX_RESTARTS) {
120          this.exception = new Exception("Maximal number of parallel job restarts reached");
121          waithandles[engineGuid].Set();
122          return;
123        }
124
125        Thread.Sleep(TimeSpan.FromSeconds(10.0));
126      } while(true);
127    }
128  }
129}
Note: See TracBrowser for help on using the repository browser.