[Add] semaphore for blocking aerospike queries#813
[Add] semaphore for blocking aerospike queries#813teracoder wants to merge 4 commits intobsv-blockchain:mainfrom
Conversation
|
🤖 Claude Code Review Status: Complete Current Review: No new issues found. All previously reported issues have been fixed: History:
Summary: This PR implements a query semaphore to prevent long-running Aerospike scans from exhausting the connection pool. The implementation is well-designed with strong concurrency safety, comprehensive test coverage (442 lines of new tests), and clear documentation. All critical issues from the previous review have been addressed. Key strengths:
|
| // long-running partition scans (QueryPartitions) are rate-limited and | ||
| // cannot monopolise the connection pool, starving point operations. | ||
| // Uses the default of 25% of ConnectionQueueSize. | ||
| s.client.EnableQuerySemaphore(0) |
There was a problem hiding this comment.
[Major] Shared client mutation — multiple service instances may conflict ✅ FIXED
Current implementation (lines 354-366): EnableQuerySemaphore now uses CompareAndSwap to make initialization idempotent. Only the first caller wins; subsequent calls are safely ignored. Documentation explicitly states: "EnableQuerySemaphore is idempotent: subsequent calls are ignored... It is safe to call concurrently with QueryPartitions/Query."
Test coverage:
TestEnableQuerySemaphore_Idempotent: Verifies second call with different size does not change the channelTestEnableQuerySemaphore_ConcurrentEnable: 32 goroutines race to enable; exactly one channel installed
Issue resolved.
| assert.True(t, elapsed >= minSemaphoreTimeout && elapsed < 200*time.Millisecond, | ||
| "Expected timeout around %v, got %v", minSemaphoreTimeout, elapsed) | ||
| }) | ||
| } |
There was a problem hiding this comment.
[Minor] Missing test coverage for goroutine lifecycle ✅ FIXED
The current test suite now includes comprehensive goroutine lifecycle tests:
-
TestWaitForRecordsetInactive(lines 724-770): Tests the polling helper for both active→inactive transition and already-inactive cases -
TestQuerySemaphore_ReleaseOnInactive(lines 773-820): Critical integration test that:- Acquires a permit
- Starts the release goroutine (same code path as production)
- Verifies semaphore held while active (second acquire fails)
- Marks inactive
- Verifies permit released within 500ms
This directly tests the early-termination scenario that was missing before.
Issue resolved.
Benchmark Comparison ReportBaseline: Current: Summary
All benchmark results (sec/op)
Threshold: >10% with p < 0.05 | Generated: 2026-05-05 14:32 UTC |
The previous query-semaphore wrapper detected recordset completion by draining rs.Results() in a goroutine. Recordset.Results() returns the same channel the caller iterates, so the drain goroutine and the caller competed for records -- roughly half were silently consumed and discarded whenever EnableQuerySemaphore was active. This affected the pruner, unmined iterator, consistency scan, and the Query callers in QueryOldUnminedTransactions and ProcessExpiredPreservations. Replace channel-draining with IsActive() polling so the caller observes every record. While here, also: - Store querySemaphore as an atomic.Pointer to fix the publication race between EnableQuerySemaphore and concurrent QueryPartitions readers, and snapshot the channel inside QueryPartitions so acquire and release always operate on the same instance. - Make EnableQuerySemaphore idempotent via CompareAndSwap so re-entry cannot orphan in-flight permits. - Add Query and QuerySemaphoreWait stats for observability into both semaphore wait time and end-to-end query duration. - Move the EnableQuerySemaphore call in GetPrunerService to after NewService validates the client, preserving the existing nil-client error path.
Adds three unit tests motivated by review feedback that the existing suite verified semaphore acquisition mechanics but not the wrapper's core promised behaviour or its goroutine lifecycle: - TestQuerySemaphore_ConcurrentQueryLimit: holds N permits, asserts an (N+1)th acquire blocks, then verifies it unblocks once a permit is released. Directly exercises the contract the feature exists for. - TestQuerySemaphore_GoroutineCleanup: drives 20 acquire/poll/release cycles using runtime.NumGoroutine() deltas to catch any leak in the release goroutine. - TestQuerySemaphore_RepeatedAcquireRelease: 1000 acquire/release cycles to surface permit drift or double-release bugs. The recordset-coupled scenarios (early termination, full consumption) require an *aerospike.Recordset, which has no public constructor; they belong with the TestContainers-backed integration tests in stores/utxo/aerospike/ rather than as unit tests here.
|



Long running connections that perform large scans on aerospike, like the ones used by the pruner, are blocking as they take a long time. the problem is that these connections share the pool with normal connections. this leads to two problems