Free cookie consent management tool by TermsFeed Policy Generator

Ignore:
Timestamp:
05/30/08 13:41:27 (16 years ago)
Author:
gkronber
Message:

merged changesets r219:r228, r240, r241:258, r263:265, r267,r268, r269 from trunk into the HL3 stable branch

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/3.0/sources/HeuristicLab.Grid/EngineStore.cs

    r35 r279  
    3434    private Dictionary<Guid, ManualResetEvent> waitHandles;
    3535    private Dictionary<Guid, byte[]> results;
    36     private Dictionary<Guid, string> runningClients;
     36    private Dictionary<Guid, DateTime> resultDate;
     37    private Dictionary<Guid, DateTime> runningEngineDate;
     38    private const int RESULT_EXPIRY_TIME_MIN = 10;
     39    private const int RUNNING_JOB_EXPIRY_TIME_MIN = 10;
    3740    private object bigLock;
    38     private ChannelFactory<IClient> clientChannelFactory;
    3941    public int WaitingJobs {
    4042      get {
     
    5961      waitingEngines = new Dictionary<Guid, byte[]>();
    6062      runningEngines = new Dictionary<Guid, byte[]>();
    61       runningClients = new Dictionary<Guid, string>();
    6263      waitHandles = new Dictionary<Guid, ManualResetEvent>();
    6364      results = new Dictionary<Guid, byte[]>();
     65      resultDate = new Dictionary<Guid, DateTime>();
     66      runningEngineDate = new Dictionary<Guid, DateTime>();
    6467      bigLock = new object();
    65 
    66       NetTcpBinding binding = new NetTcpBinding();
    67       binding.MaxReceivedMessageSize = 100000000; // 100Mbytes
    68       binding.ReaderQuotas.MaxStringContentLength = 100000000; // also 100M chars
    69       binding.ReaderQuotas.MaxArrayLength = 100000000; // also 100M elements;
    70       binding.Security.Mode = SecurityMode.None;
    71 
    72       clientChannelFactory = new ChannelFactory<IClient>(binding);
    7368    }
    7469
    75     public bool TryTakeEngine(string clientUrl, out Guid guid, out byte[] engine) {
     70    public bool TryTakeEngine(out Guid guid, out byte[] engine) {
    7671      lock(bigLock) {
    7772        if(engineList.Count == 0) {
     
    8580          waitingEngines.Remove(guid);
    8681          runningEngines[guid] = engine;
    87           runningClients[guid] = clientUrl;
     82          runningEngineDate[guid] = DateTime.Now;
    8883          return true;
    8984        }
     
    9388    public void StoreResult(Guid guid, byte[] result) {
    9489      lock(bigLock) {
    95         if(!runningEngines.ContainsKey(guid)) return; // ignore result when the engine is not known to be running
    96 
     90        // clear old results
     91        List<Guid> expiredResults = FindExpiredResults(DateTime.Now.AddMinutes(-RESULT_EXPIRY_TIME_MIN));
     92        foreach(Guid expiredGuid in expiredResults) {
     93          results.Remove(expiredGuid);
     94          waitHandles.Remove(expiredGuid);
     95          resultDate.Remove(expiredGuid);
     96        }
     97        // add the new result
    9798        runningEngines.Remove(guid);
    98         runningClients.Remove(guid);
     99        runningEngineDate.Remove(guid);
    99100        results[guid] = result;
     101        resultDate[guid] = DateTime.Now;
    100102        waitHandles[guid].Set();
    101103      }
     104    }
     105
     106    private List<Guid> FindExpiredResults(DateTime expirationDate) {
     107      List<Guid> expiredResults = new List<Guid>();
     108      foreach(Guid guid in results.Keys) {
     109        if(resultDate[guid] < expirationDate) {
     110          expiredResults.Add(guid);
     111        }
     112      }
     113      return expiredResults;
     114    }
     115    private List<Guid> FindExpiredJobs(DateTime expirationDate) {
     116      List<Guid> expiredJobs = new List<Guid>();
     117      foreach(Guid guid in runningEngines.Keys) {
     118        if(runningEngineDate[guid] < expirationDate) {
     119          expiredJobs.Add(guid);
     120        }
     121      }
     122      return expiredJobs;
    102123    }
    103124
     
    113134      return GetResult(guid, System.Threading.Timeout.Infinite);
    114135    }
     136
    115137    internal byte[] GetResult(Guid guid, int timeout) {
    116138      lock(bigLock) {
    117         if(waitHandles.ContainsKey(guid)) {
     139        // result already available
     140        if(results.ContainsKey(guid)) {
     141          // if the wait-handle for this result is still alive then close and remove it
     142          if(waitHandles.ContainsKey(guid)) {
     143            ManualResetEvent waitHandle = waitHandles[guid];
     144            waitHandle.Close();
     145            waitHandles.Remove(guid);
     146          }
     147          byte[] result = results[guid];
     148          results.Remove(guid);
     149          return result;
     150        } else {
     151          // result not yet available, if there is also no wait-handle for that result then we will never have a result and can return null
     152          if(!waitHandles.ContainsKey(guid)) return null;
     153          // otherwise we have a wait-handle and can wait for the result
    118154          ManualResetEvent waitHandle = waitHandles[guid];
     155          // wait
    119156          if(waitHandle.WaitOne(timeout, true)) {
     157            // ok got the result in within the wait time => close and remove the wait-hande and return the result
    120158            waitHandle.Close();
    121159            waitHandles.Remove(guid);
    122160            byte[] result = results[guid];
    123             results.Remove(guid);
    124161            return result;
    125162          } else {
     163            // no result yet, check for which jobs we waited too long and requeue those jobs
     164            List<Guid> expiredJobs = FindExpiredJobs(DateTime.Now.AddMinutes(-RUNNING_JOB_EXPIRY_TIME_MIN));
     165            foreach(Guid expiredGuid in expiredJobs) {
     166              engineList.Insert(0, expiredGuid);
     167              waitingEngines[expiredGuid] = runningEngines[expiredGuid];
     168              runningEngines.Remove(expiredGuid);
     169              runningEngineDate.Remove(expiredGuid);
     170            }
    126171            return null;
    127172          }
    128         } else {
    129           return null;
    130173        }
    131174      }
     
    133176
    134177    internal void AbortEngine(Guid guid) {
    135       string clientUrl = "";
     178      throw new NotImplementedException();
     179    }
     180
     181    internal JobState JobState(Guid guid) {
    136182      lock(bigLock) {
    137         if(runningClients.ContainsKey(guid)) {
    138           clientUrl = runningClients[guid];
    139           IClient client = clientChannelFactory.CreateChannel(new EndpointAddress(clientUrl));
    140           client.Abort(guid);
    141         } else if(waitingEngines.ContainsKey(guid)) {
    142           byte[] engine = waitingEngines[guid];
    143           waitingEngines.Remove(guid);
    144           engineList.Remove(guid);
    145           waitHandles[guid].Set();
    146           results.Add(guid, engine);
    147         }
     183        if(waitingEngines.ContainsKey(guid)) return HeuristicLab.Grid.JobState.Waiting;
     184        else if(waitHandles.ContainsKey(guid)) return HeuristicLab.Grid.JobState.Busy;
     185        else if(results.ContainsKey(guid)) return HeuristicLab.Grid.JobState.Finished;
     186        else return HeuristicLab.Grid.JobState.Unkown;
    148187      }
    149188    }
Note: See TracChangeset for help on using the changeset viewer.