Free cookie consent management tool by TermsFeed Policy Generator

source: trunk/sources/HeuristicLab.DistributedEngine/JobManager.cs @ 238

Last change on this file since 238 was 228, checked in by gkronber, 17 years ago

bug fixing in DistributedEngine and Grid-Infrastructure

File size: 5.4 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using System.ServiceModel;
6using HeuristicLab.Grid;
7using System.Threading;
8using HeuristicLab.Core;
9using System.IO;
10using System.IO.Compression;
11using System.Windows.Forms;
12
13namespace HeuristicLab.DistributedEngine {
14  class JobManager {
15    private IGridServer server;
16    private string address;
17    private Dictionary<Guid, AtomicOperation> engineOperations = new Dictionary<Guid, AtomicOperation>();
18    private Dictionary<Guid, byte[]> runningEngines = new Dictionary<Guid, byte[]>();
19    private Dictionary<Guid, ManualResetEvent> waithandles = new Dictionary<Guid, ManualResetEvent>();
20    private object locker = new object();
21    private const int MAX_RESTARTS = 5;
22    private Exception exception;
23    private ChannelFactory<IGridServer> factory;
24    public Exception Exception {
25      get { return exception; }
26    }
27
28    public JobManager(string address) {
29      this.address = address;
30    }
31
32    internal void Reset() {
33      lock(locker) {
34        ResetConnection();
35        foreach(WaitHandle wh in waithandles.Values) wh.Close();
36        waithandles.Clear();
37        engineOperations.Clear();
38        runningEngines.Clear();
39        exception = null;
40      }
41    }
42
43    private void ResetConnection() {
44      // open a new channel
45      NetTcpBinding binding = new NetTcpBinding();
46      binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
47      binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
48      binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
49      binding.Security.Mode = SecurityMode.None;
50      factory = new ChannelFactory<IGridServer>(binding);
51      server = factory.CreateChannel(new EndpointAddress(address));
52    }
53
54    public WaitHandle BeginExecuteOperation(IOperatorGraph operatorGraph, IScope globalScope, AtomicOperation operation) {
55      ProcessingEngine engine = new ProcessingEngine(operatorGraph, globalScope, operation); // OperatorGraph not needed?
56      MemoryStream memStream = new MemoryStream();
57      GZipStream stream = new GZipStream(memStream, CompressionMode.Compress, true);
58      PersistenceManager.Save(engine, stream);
59      stream.Close();
60      if(factory.State != CommunicationState.Opened)
61        ResetConnection();
62      Guid currentEngineGuid = server.BeginExecuteEngine(memStream.ToArray());
63      lock(locker) {
64        runningEngines[currentEngineGuid] = memStream.ToArray();
65        engineOperations[currentEngineGuid] = operation;
66        waithandles[currentEngineGuid] = new ManualResetEvent(false);
67        ThreadPool.QueueUserWorkItem(new WaitCallback(TryGetResult), currentEngineGuid);
68      }
69      return waithandles[currentEngineGuid];
70    }
71
72    private void TryGetResult(object state) {
73      Guid engineGuid = (Guid)state;
74      int restartCounter = 0;
75      do {
76        try {
77          if(factory.State != CommunicationState.Opened) ResetConnection();
78          byte[] resultXml = server.TryEndExecuteEngine(engineGuid, 100);
79          if(resultXml != null) {
80            // restore the engine
81            GZipStream stream = new GZipStream(new MemoryStream(resultXml), CompressionMode.Decompress);
82            ProcessingEngine resultEngine = (ProcessingEngine)PersistenceManager.Load(stream);
83
84            // merge the results
85            IScope oldScope = engineOperations[engineGuid].Scope;
86            oldScope.Clear();
87            foreach(IVariable variable in resultEngine.InitialOperation.Scope.Variables) {
88              oldScope.AddVariable(variable);
89            }
90            foreach(IScope subScope in resultEngine.InitialOperation.Scope.SubScopes) {
91              oldScope.AddSubScope(subScope);
92            }
93            foreach(KeyValuePair<string, string> alias in resultEngine.InitialOperation.Scope.Aliases) {
94              oldScope.AddAlias(alias.Key, alias.Value);
95            }
96
97            lock(locker) {
98              // signal the wait handle and clean up then return
99              waithandles[engineGuid].Set();
100              engineOperations.Remove(engineGuid);
101              waithandles.Remove(engineGuid);
102              runningEngines.Remove(engineGuid);
103            }
104            return;
105          } else {
106            // check if the server is still working on the job
107            JobState jobState = server.JobState(engineGuid);
108            if(jobState == JobState.Unkown) {
109              // restart job
110              byte[] packedEngine;
111              lock(locker) {
112                packedEngine = runningEngines[engineGuid];
113              }
114              server.BeginExecuteEngine(packedEngine);
115              restartCounter++;
116            }
117          }
118        } catch(Exception e) {
119          // catch all exceptions set an exception flag, signal the wait-handle and exit the routine
120          this.exception = new Exception("There was a problem with parallel execution", e);
121          waithandles[engineGuid].Set();
122          return;
123        }
124
125        // when we reach a maximum amount of restarts => signal the wait-handle and set a flag that there was a problem
126        if(restartCounter > MAX_RESTARTS) {
127          this.exception = new Exception("Maximal number of parallel job restarts reached");
128          waithandles[engineGuid].Set();
129          return;
130        }
131
132        Thread.Sleep(TimeSpan.FromSeconds(10.0));
133      } while(true);
134    }
135  }
136}
Note: See TracBrowser for help on using the repository browser.