Free cookie consent management tool by TermsFeed Policy Generator

Changeset 380


Ignore:
Timestamp:
07/10/08 17:22:04 (16 years ago)
Author:
gkronber
Message:

improved contention problem by using ReaderWriterLock in the DB proxy and reducing the number of threads in the RunScheduler (ticket #189)

Location:
trunk/sources
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/sources/HeuristicLab.CEDMA.DB/Database.cs

    r375 r380  
    3030using System.Data.SQLite;
    3131using System.Data.Common;
     32using System.Threading;
    3233
    3334namespace HeuristicLab.CEDMA.DB {
     
    3536  public class Database : IDatabase {
    3637    private string connectionString;
    37     private object bigLock = new object();
     38    private ReaderWriterLockSlim rwLock;
    3839    public Database(string connectionString) {
    3940      this.connectionString = connectionString;
     41      rwLock = new ReaderWriterLockSlim();
    4042    }
    4143
    4244    #region create empty database
    4345    public void CreateNew() {
    44       lock(bigLock) {
     46      rwLock.EnterWriteLock();
     47      try {
    4548        using(DbConnection cnn = new SQLiteConnection(connectionString)) {
    4649          cnn.Open();
     
    6972          }
    7073        }
     74      } finally {
     75        rwLock.ExitWriteLock();
    7176      }
    7277    }
     
    7580    #region insert agent/run/result/sub-result
    7681    public long InsertAgent(string name, byte[] rawData) {
    77       lock(bigLock) {
     82      rwLock.EnterWriteLock();
     83      try {
    7884        using(DbConnection cnn = new SQLiteConnection(connectionString)) {
    7985          cnn.Open();
     
    97103          }
    98104        }
     105      } finally {
     106        rwLock.ExitWriteLock();
    99107      }
    100108    }
    101109
    102110    public long InsertRun(long agentId, byte[] rawData) {
    103       lock(bigLock) {
     111      rwLock.EnterWriteLock();
     112      try {
    104113        using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) {
    105114          cnn.Open();
     
    128137          }
    129138        }
     139      } finally {
     140        rwLock.ExitWriteLock();
    130141      }
    131142    }
    132143
    133144    public long InsertResult(long runId, byte[] rawData) {
    134       lock(bigLock) {
     145      rwLock.EnterWriteLock();
     146      try {
    135147        using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) {
    136148          cnn.Open();
     
    159171          }
    160172        }
     173      } finally {
     174        rwLock.ExitWriteLock();
    161175      }
    162176    }
    163177
    164178    public long InsertSubResult(long resultId, byte[] rawData) {
    165       lock(bigLock) {
    166 
     179      rwLock.EnterWriteLock();
     180      try {
    167181        using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) {
    168182          cnn.Open();
     
    191205          }
    192206        }
     207      } finally {
     208        rwLock.ExitWriteLock();
    193209      }
    194210    }
     
    197213    #region update agent/run
    198214    public void UpdateAgent(long id, string name) {
    199       lock(bigLock) {
     215      rwLock.EnterWriteLock();
     216      try {
    200217        using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) {
    201218          cnn.Open();
     
    217234          }
    218235        }
     236      } finally {
     237        rwLock.ExitWriteLock();
    219238      }
    220239    }
    221240
    222241    public void UpdateAgent(long id, ProcessStatus status) {
    223       lock(bigLock) {
     242      rwLock.EnterWriteLock();
     243      try {
    224244        using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) {
    225245          cnn.Open();
     
    241261          }
    242262        }
     263      } finally {
     264        rwLock.ExitWriteLock();
    243265      }
    244266    }
    245267
    246268    public void UpdateAgent(long id, byte[] rawData) {
    247       lock(bigLock) {
     269      rwLock.EnterWriteLock();
     270      try {
    248271        using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) {
    249272          cnn.Open();
     
    265288          }
    266289        }
     290      } finally {
     291        rwLock.ExitWriteLock();
    267292      }
    268293    }
    269294
    270295    public void UpdateRunStart(long runId, DateTime startTime) {
    271       lock(bigLock) {
     296      rwLock.EnterWriteLock();
     297      try {
    272298        using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) {
    273299          cnn.Open();
     
    289315          }
    290316        }
     317      } finally {
     318        rwLock.ExitWriteLock();
    291319      }
    292320    }
    293321
    294322    public void UpdateRunFinished(long runId, DateTime finishedTime) {
    295       lock(bigLock) {
     323      rwLock.EnterWriteLock();
     324      try {
    296325        using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) {
    297326          cnn.Open();
     
    313342          }
    314343        }
     344      } finally {
     345        rwLock.ExitWriteLock();
    315346      }
    316347    }
    317348
    318349    public void UpdateRunStatus(long runId, ProcessStatus status) {
    319       lock(bigLock) {
     350      rwLock.EnterWriteLock();
     351      try {
    320352        using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) {
    321353          cnn.Open();
     
    337369          }
    338370        }
    339       }
    340     }
    341 
    342 
    343 
     371      } finally {
     372        rwLock.ExitWriteLock();
     373      }
     374    }
    344375    #endregion
    345376
    346 
    347377    #region get agent/run/result/sub-result
    348378
    349379    public ICollection<AgentEntry> GetAgents(ProcessStatus status) {
    350       lock(bigLock) {
    351         List<AgentEntry> agents = new List<AgentEntry>();
     380      rwLock.EnterReadLock();
     381      List<AgentEntry> agents = new List<AgentEntry>();
     382      try {
    352383        using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) {
    353384          cnn.Open();
     
    369400          }
    370401        }
    371         return agents;
    372       }
     402      } finally {
     403        rwLock.ExitReadLock();
     404      }
     405      return agents;
    373406    }
    374407
    375408    public ICollection<AgentEntry> GetAgents() {
    376       lock(bigLock) {
    377         List<AgentEntry> agents = new List<AgentEntry>();
     409      rwLock.EnterReadLock();
     410      List<AgentEntry> agents = new List<AgentEntry>();
     411      try {
    378412        using(DbConnection cnn = new SQLiteConnection(connectionString)) {
    379413          cnn.Open();
     
    392426          }
    393427        }
    394         return agents;
    395       }
    396     }
    397 
    398     //(ID integer primary key autoincrement, AgentId integer, CreationTime DateTime, StartTime DateTime, FinishedTime DateTime, Status text default 'Unknown', RawData Blob)";
     428      } finally {
     429        rwLock.ExitReadLock();
     430      }
     431      return agents;
     432    }
    399433
    400434    public ICollection<RunEntry> GetRuns() {
    401       lock(bigLock) {
    402         List<RunEntry> runs = new List<RunEntry>();
     435      List<RunEntry> runs = new List<RunEntry>();
     436      rwLock.EnterReadLock();
     437      try {
    403438        using(DbConnection cnn = new SQLiteConnection(connectionString)) {
    404439          cnn.Open();
     
    420455          }
    421456        }
    422         return runs;
    423       }
     457      } finally {
     458        rwLock.ExitReadLock();
     459      }
     460      return runs;
    424461    }
    425462
    426463    public ICollection<RunEntry> GetRuns(ProcessStatus status) {
    427       lock(bigLock) {
    428         List<RunEntry> runs = new List<RunEntry>();
     464      List<RunEntry> runs = new List<RunEntry>();
     465      rwLock.EnterReadLock();
     466      try {
    429467        using(DbConnection cnn = new SQLiteConnection(connectionString)) {
    430468          cnn.Open();
     
    442480                run.AgentId = r.GetInt32(1);
    443481                run.CreationTime = r.GetDateTime(2);
    444                 run.StartTime = r.IsDBNull(3)? null : new Nullable<DateTime>(r.GetDateTime(3));
    445                 run.FinishedTime = r.IsDBNull(4) ? null:new Nullable<DateTime>(r.GetDateTime(4));
     482                run.StartTime = r.IsDBNull(3) ? null : new Nullable<DateTime>(r.GetDateTime(3));
     483                run.FinishedTime = r.IsDBNull(4) ? null : new Nullable<DateTime>(r.GetDateTime(4));
    446484                run.Status = (ProcessStatus)Enum.Parse(typeof(ProcessStatus), r.GetString(5));
    447485                run.RawData = (byte[])r.GetValue(6);
     
    451489          }
    452490        }
    453         return runs;
    454       }
    455     }
    456 
    457     // (ID integer primary key autoincrement, RunId integer, ResultId integer, CreationTime DateTime, RawData Blob)";
     491      } finally {
     492        rwLock.ExitReadLock();
     493      }
     494      return runs;
     495    }
     496
    458497    public ICollection<ResultEntry> GetResults(long runId) {
    459       lock(bigLock) {
    460         List<ResultEntry> results = new List<ResultEntry>();
     498      List<ResultEntry> results = new List<ResultEntry>();
     499      rwLock.EnterReadLock();
     500      try {
    461501        using(DbConnection cnn = new SQLiteConnection(connectionString)) {
    462502          cnn.Open();
     
    475515          }
    476516        }
    477         return results;
    478       }
     517      } finally {
     518        rwLock.ExitReadLock();
     519      }
     520      return results;
    479521    }
    480522
  • trunk/sources/HeuristicLab.CEDMA.Server/AgentScheduler.cs

    r377 r380  
    4646        ClearFinishedEngines();
    4747        CreateNewEngines();
    48         StepAllEngines();
     48        if(engines.Count == 0) Thread.Sleep(10000);
     49        else StepAllEngines();
    4950      }
    5051    }
  • trunk/sources/HeuristicLab.CEDMA.Server/RunScheduler.cs

    r378 r380  
    3737    private const int RELEASE_INTERVAL = 5;
    3838    private object remoteCommLock = new object();
     39    private object collectionsLock = new object();
     40    private Queue<WaitHandle> waithandles;
     41    private Dictionary<WaitHandle, AtomicOperation> runningOperations;
     42    private Dictionary<WaitHandle, long> runningEntries;
    3943
    4044    public RunScheduler(Database database, JobManager jobManager) {
    4145      this.database = database;
    4246      this.jobManager = jobManager;
     47      runningOperations = new Dictionary<WaitHandle, AtomicOperation>();
     48      runningEntries = new Dictionary<WaitHandle, long>();
     49      waithandles = new Queue<WaitHandle>();
     50      Thread resultsGatheringThread = new Thread(GatherResults);
     51      resultsGatheringThread.Start();
    4352    }
    4453    public void Run() {
     
    6473        }
    6574
    66         ThreadPool.QueueUserWorkItem(WaitForFinishedRun, new object[] {wHandle, op, entry});
     75        lock(collectionsLock) {
     76          waithandles.Enqueue(wHandle);
     77          runningOperations[wHandle] = op;
     78          runningEntries[wHandle] = entry.Id;
     79        }
    6780      }
    6881    }
    6982
    70     private void WaitForFinishedRun(object state) {
    71       object[] param = (object[])state;
    72       WaitHandle wHandle = (WaitHandle)param[0];
    73       AtomicOperation op = (AtomicOperation)param[1];
    74       RunEntry entry = (RunEntry)param[2];
    75       wHandle.WaitOne();
    76       wHandle.Close();
    77       lock(remoteCommLock) {
    78         jobManager.EndExecuteOperation(op);
    79         database.UpdateRunStatus(entry.Id, ProcessStatus.Finished);
    80         database.UpdateRunFinished(entry.Id, DateTime.Now);
     83    private void GatherResults() {
     84      while(true) {
     85        if(waithandles.Count == 0) Thread.Sleep(1000);
     86        else {
     87          WaitHandle w;
     88          lock(collectionsLock) {
     89            w = waithandles.Dequeue();
     90          }
     91          w.WaitOne();
     92          long id;
     93          AtomicOperation op;
     94          lock(collectionsLock) {
     95            id = runningEntries[w];
     96            runningEntries.Remove(w);
     97            op = runningOperations[w];
     98            runningOperations.Remove(w);
     99          }
     100          w.Close();
     101          lock(remoteCommLock) {
     102            jobManager.EndExecuteOperation(op);
     103            database.UpdateRunStatus(id, ProcessStatus.Finished);
     104            database.UpdateRunFinished(id, DateTime.Now);
     105          }
     106        }
    81107      }
    82108    }
  • trunk/sources/HeuristicLab.CEDMA.Server/ServerForm.cs

    r378 r380  
    5656    private void InitAgentScheduler() {
    5757      AgentScheduler scheduler = new AgentScheduler(database);
    58       ThreadPool.QueueUserWorkItem(delegate(object status) { scheduler.Run(); });
     58      Thread agentSchedulerThread = new Thread(scheduler.Run);
     59      agentSchedulerThread.Start();
    5960    }
    6061
     
    6364      jobManager.Reset();
    6465      RunScheduler scheduler = new RunScheduler(database, jobManager);
    65       ThreadPool.QueueUserWorkItem(delegate(object status) { scheduler.Run(); });
     66      Thread runSchedulerThread = new Thread(scheduler.Run);
     67      runSchedulerThread.Start();
    6668    }
    6769
Note: See TracChangeset for help on using the changeset viewer.