- Timestamp:
- 07/10/08 17:22:04 (16 years ago)
- Location:
- trunk/sources
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/sources/HeuristicLab.CEDMA.DB/Database.cs
r375 r380 30 30 using System.Data.SQLite; 31 31 using System.Data.Common; 32 using System.Threading; 32 33 33 34 namespace HeuristicLab.CEDMA.DB { … … 35 36 public class Database : IDatabase { 36 37 private string connectionString; 37 private object bigLock = new object();38 private ReaderWriterLockSlim rwLock; 38 39 public Database(string connectionString) { 39 40 this.connectionString = connectionString; 41 rwLock = new ReaderWriterLockSlim(); 40 42 } 41 43 42 44 #region create empty database 43 45 public void CreateNew() { 44 lock(bigLock) { 46 rwLock.EnterWriteLock(); 47 try { 45 48 using(DbConnection cnn = new SQLiteConnection(connectionString)) { 46 49 cnn.Open(); … … 69 72 } 70 73 } 74 } finally { 75 rwLock.ExitWriteLock(); 71 76 } 72 77 } … … 75 80 #region insert agent/run/result/sub-result 76 81 public long InsertAgent(string name, byte[] rawData) { 77 lock(bigLock) { 82 rwLock.EnterWriteLock(); 83 try { 78 84 using(DbConnection cnn = new SQLiteConnection(connectionString)) { 79 85 cnn.Open(); … … 97 103 } 98 104 } 105 } finally { 106 rwLock.ExitWriteLock(); 99 107 } 100 108 } 101 109 102 110 public long InsertRun(long agentId, byte[] rawData) { 103 lock(bigLock) { 111 rwLock.EnterWriteLock(); 112 try { 104 113 using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) { 105 114 cnn.Open(); … … 128 137 } 129 138 } 139 } finally { 140 rwLock.ExitWriteLock(); 130 141 } 131 142 } 132 143 133 144 public long InsertResult(long runId, byte[] rawData) { 134 lock(bigLock) { 145 rwLock.EnterWriteLock(); 146 try { 135 147 using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) { 136 148 cnn.Open(); … … 159 171 } 160 172 } 173 } finally { 174 rwLock.ExitWriteLock(); 161 175 } 162 176 } 163 177 164 178 public long InsertSubResult(long resultId, byte[] rawData) { 165 lock(bigLock) {166 179 rwLock.EnterWriteLock(); 180 try { 167 181 using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) { 168 182 cnn.Open(); … … 191 205 } 192 206 } 207 } finally { 208 rwLock.ExitWriteLock(); 193 209 } 194 210 } … … 197 213 #region update agent/run 198 214 public void UpdateAgent(long id, string name) { 199 lock(bigLock) { 215 rwLock.EnterWriteLock(); 216 try { 200 217 using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) { 201 218 cnn.Open(); … … 217 234 } 218 235 } 236 } finally { 237 rwLock.ExitWriteLock(); 219 238 } 220 239 } 221 240 222 241 public void UpdateAgent(long id, ProcessStatus status) { 223 lock(bigLock) { 242 rwLock.EnterWriteLock(); 243 try { 224 244 using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) { 225 245 cnn.Open(); … … 241 261 } 242 262 } 263 } finally { 264 rwLock.ExitWriteLock(); 243 265 } 244 266 } 245 267 246 268 public void UpdateAgent(long id, byte[] rawData) { 247 lock(bigLock) { 269 rwLock.EnterWriteLock(); 270 try { 248 271 using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) { 249 272 cnn.Open(); … … 265 288 } 266 289 } 290 } finally { 291 rwLock.ExitWriteLock(); 267 292 } 268 293 } 269 294 270 295 public void UpdateRunStart(long runId, DateTime startTime) { 271 lock(bigLock) { 296 rwLock.EnterWriteLock(); 297 try { 272 298 using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) { 273 299 cnn.Open(); … … 289 315 } 290 316 } 317 } finally { 318 rwLock.ExitWriteLock(); 291 319 } 292 320 } 293 321 294 322 public void UpdateRunFinished(long runId, DateTime finishedTime) { 295 lock(bigLock) { 323 rwLock.EnterWriteLock(); 324 try { 296 325 using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) { 297 326 cnn.Open(); … … 313 342 } 314 343 } 344 } finally { 345 rwLock.ExitWriteLock(); 315 346 } 316 347 } 317 348 318 349 public void UpdateRunStatus(long runId, ProcessStatus status) { 319 lock(bigLock) { 350 rwLock.EnterWriteLock(); 351 try { 320 352 using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) { 321 353 cnn.Open(); … … 337 369 } 338 370 } 339 } 340 } 341 342 343 371 } finally { 372 rwLock.ExitWriteLock(); 373 } 374 } 344 375 #endregion 345 376 346 347 377 #region get agent/run/result/sub-result 348 378 349 379 public ICollection<AgentEntry> GetAgents(ProcessStatus status) { 350 lock(bigLock) { 351 List<AgentEntry> agents = new List<AgentEntry>(); 380 rwLock.EnterReadLock(); 381 List<AgentEntry> agents = new List<AgentEntry>(); 382 try { 352 383 using(SQLiteConnection cnn = new SQLiteConnection(connectionString)) { 353 384 cnn.Open(); … … 369 400 } 370 401 } 371 return agents; 372 } 402 } finally { 403 rwLock.ExitReadLock(); 404 } 405 return agents; 373 406 } 374 407 375 408 public ICollection<AgentEntry> GetAgents() { 376 lock(bigLock) { 377 List<AgentEntry> agents = new List<AgentEntry>(); 409 rwLock.EnterReadLock(); 410 List<AgentEntry> agents = new List<AgentEntry>(); 411 try { 378 412 using(DbConnection cnn = new SQLiteConnection(connectionString)) { 379 413 cnn.Open(); … … 392 426 } 393 427 } 394 return agents;395 }396 }397 398 //(ID integer primary key autoincrement, AgentId integer, CreationTime DateTime, StartTime DateTime, FinishedTime DateTime, Status text default 'Unknown', RawData Blob)";428 } finally { 429 rwLock.ExitReadLock(); 430 } 431 return agents; 432 } 399 433 400 434 public ICollection<RunEntry> GetRuns() { 401 lock(bigLock) { 402 List<RunEntry> runs = new List<RunEntry>(); 435 List<RunEntry> runs = new List<RunEntry>(); 436 rwLock.EnterReadLock(); 437 try { 403 438 using(DbConnection cnn = new SQLiteConnection(connectionString)) { 404 439 cnn.Open(); … … 420 455 } 421 456 } 422 return runs; 423 } 457 } finally { 458 rwLock.ExitReadLock(); 459 } 460 return runs; 424 461 } 425 462 426 463 public ICollection<RunEntry> GetRuns(ProcessStatus status) { 427 lock(bigLock) { 428 List<RunEntry> runs = new List<RunEntry>(); 464 List<RunEntry> runs = new List<RunEntry>(); 465 rwLock.EnterReadLock(); 466 try { 429 467 using(DbConnection cnn = new SQLiteConnection(connectionString)) { 430 468 cnn.Open(); … … 442 480 run.AgentId = r.GetInt32(1); 443 481 run.CreationTime = r.GetDateTime(2); 444 run.StartTime = r.IsDBNull(3) ? null : new Nullable<DateTime>(r.GetDateTime(3));445 run.FinishedTime = r.IsDBNull(4) ? null :new Nullable<DateTime>(r.GetDateTime(4));482 run.StartTime = r.IsDBNull(3) ? null : new Nullable<DateTime>(r.GetDateTime(3)); 483 run.FinishedTime = r.IsDBNull(4) ? null : new Nullable<DateTime>(r.GetDateTime(4)); 446 484 run.Status = (ProcessStatus)Enum.Parse(typeof(ProcessStatus), r.GetString(5)); 447 485 run.RawData = (byte[])r.GetValue(6); … … 451 489 } 452 490 } 453 return runs; 454 } 455 } 456 457 // (ID integer primary key autoincrement, RunId integer, ResultId integer, CreationTime DateTime, RawData Blob)"; 491 } finally { 492 rwLock.ExitReadLock(); 493 } 494 return runs; 495 } 496 458 497 public ICollection<ResultEntry> GetResults(long runId) { 459 lock(bigLock) { 460 List<ResultEntry> results = new List<ResultEntry>(); 498 List<ResultEntry> results = new List<ResultEntry>(); 499 rwLock.EnterReadLock(); 500 try { 461 501 using(DbConnection cnn = new SQLiteConnection(connectionString)) { 462 502 cnn.Open(); … … 475 515 } 476 516 } 477 return results; 478 } 517 } finally { 518 rwLock.ExitReadLock(); 519 } 520 return results; 479 521 } 480 522 -
trunk/sources/HeuristicLab.CEDMA.Server/AgentScheduler.cs
r377 r380 46 46 ClearFinishedEngines(); 47 47 CreateNewEngines(); 48 StepAllEngines(); 48 if(engines.Count == 0) Thread.Sleep(10000); 49 else StepAllEngines(); 49 50 } 50 51 } -
trunk/sources/HeuristicLab.CEDMA.Server/RunScheduler.cs
r378 r380 37 37 private const int RELEASE_INTERVAL = 5; 38 38 private object remoteCommLock = new object(); 39 private object collectionsLock = new object(); 40 private Queue<WaitHandle> waithandles; 41 private Dictionary<WaitHandle, AtomicOperation> runningOperations; 42 private Dictionary<WaitHandle, long> runningEntries; 39 43 40 44 public RunScheduler(Database database, JobManager jobManager) { 41 45 this.database = database; 42 46 this.jobManager = jobManager; 47 runningOperations = new Dictionary<WaitHandle, AtomicOperation>(); 48 runningEntries = new Dictionary<WaitHandle, long>(); 49 waithandles = new Queue<WaitHandle>(); 50 Thread resultsGatheringThread = new Thread(GatherResults); 51 resultsGatheringThread.Start(); 43 52 } 44 53 public void Run() { … … 64 73 } 65 74 66 ThreadPool.QueueUserWorkItem(WaitForFinishedRun, new object[] {wHandle, op, entry}); 75 lock(collectionsLock) { 76 waithandles.Enqueue(wHandle); 77 runningOperations[wHandle] = op; 78 runningEntries[wHandle] = entry.Id; 79 } 67 80 } 68 81 } 69 82 70 private void WaitForFinishedRun(object state) { 71 object[] param = (object[])state; 72 WaitHandle wHandle = (WaitHandle)param[0]; 73 AtomicOperation op = (AtomicOperation)param[1]; 74 RunEntry entry = (RunEntry)param[2]; 75 wHandle.WaitOne(); 76 wHandle.Close(); 77 lock(remoteCommLock) { 78 jobManager.EndExecuteOperation(op); 79 database.UpdateRunStatus(entry.Id, ProcessStatus.Finished); 80 database.UpdateRunFinished(entry.Id, DateTime.Now); 83 private void GatherResults() { 84 while(true) { 85 if(waithandles.Count == 0) Thread.Sleep(1000); 86 else { 87 WaitHandle w; 88 lock(collectionsLock) { 89 w = waithandles.Dequeue(); 90 } 91 w.WaitOne(); 92 long id; 93 AtomicOperation op; 94 lock(collectionsLock) { 95 id = runningEntries[w]; 96 runningEntries.Remove(w); 97 op = runningOperations[w]; 98 runningOperations.Remove(w); 99 } 100 w.Close(); 101 lock(remoteCommLock) { 102 jobManager.EndExecuteOperation(op); 103 database.UpdateRunStatus(id, ProcessStatus.Finished); 104 database.UpdateRunFinished(id, DateTime.Now); 105 } 106 } 81 107 } 82 108 } -
trunk/sources/HeuristicLab.CEDMA.Server/ServerForm.cs
r378 r380 56 56 private void InitAgentScheduler() { 57 57 AgentScheduler scheduler = new AgentScheduler(database); 58 ThreadPool.QueueUserWorkItem(delegate(object status) { scheduler.Run(); }); 58 Thread agentSchedulerThread = new Thread(scheduler.Run); 59 agentSchedulerThread.Start(); 59 60 } 60 61 … … 63 64 jobManager.Reset(); 64 65 RunScheduler scheduler = new RunScheduler(database, jobManager); 65 ThreadPool.QueueUserWorkItem(delegate(object status) { scheduler.Run(); }); 66 Thread runSchedulerThread = new Thread(scheduler.Run); 67 runSchedulerThread.Start(); 66 68 } 67 69
Note: See TracChangeset
for help on using the changeset viewer.