Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions stores/utxo/aerospike/pruner_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ func (s *Store) GetPrunerService() (pruner.Service, error) {
return prunerServiceInstance, prunerServiceError
}

// Enable the query semaphore on the shared Aerospike client so that
// long-running partition scans (QueryPartitions) are rate-limited and
// cannot monopolise the connection pool, starving point operations.
// Uses the default of 25% of ConnectionQueueSize.
s.client.EnableQuerySemaphore(0)
Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Major] Shared client mutation — multiple service instances may conflict ✅ FIXED

Current implementation (lines 354-366): EnableQuerySemaphore now uses CompareAndSwap to make initialization idempotent. Only the first caller wins; subsequent calls are safely ignored. Documentation explicitly states: "EnableQuerySemaphore is idempotent: subsequent calls are ignored... It is safe to call concurrently with QueryPartitions/Query."

Test coverage:

  • TestEnableQuerySemaphore_Idempotent: Verifies second call with different size does not change the channel
  • TestEnableQuerySemaphore_ConcurrentEnable: 32 goroutines race to enable; exactly one channel installed

Issue resolved.


// Create options for the pruner service
opts := aeropruner.Options{
Logger: s.logger,
Expand Down
189 changes: 156 additions & 33 deletions util/uaerospike/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ const (
// if not specified in the client policy
DefaultConnectionQueueSize = 128

// DefaultQuerySemaphoreFraction is the default fraction of ConnectionQueueSize to use
// for the query semaphore when EnableQuerySemaphore is called without an explicit size.
// Long-running operations (Query, QueryPartitions) hold connections for the entire
// streaming duration (potentially minutes), so they get a smaller dedicated budget
// to prevent starving short-lived operations.
DefaultQuerySemaphoreFraction = 0.25 // 25% of pool for long-running ops

// semaphoreTimeoutFraction is the fraction of TotalTimeout to use for semaphore acquisition
// This ensures the total operation time (semaphore wait + actual operation) stays within bounds
semaphoreTimeoutFraction = 0.1 // 10% of total timeout
Expand Down Expand Up @@ -51,11 +58,21 @@ func NewClientStats() *ClientStats {
}
}

// Client is a wrapper around aerospike.Client that provides a semaphore to limit concurrent connections.
// Client is a wrapper around aerospike.Client that provides semaphores to limit concurrent connections.
//
// connSemaphore gates short-lived point operations (Get, Put, Delete, Operate, BatchOperate, Execute).
// It is always initialized and sized to ConnectionQueueSize.
//
// querySemaphore gates long-running streaming operations (Query, QueryPartitions).
// It is nil by default -- queries pass through to the native client unbounded (same as
// previous behavior). Call EnableQuerySemaphore to activate it for services that perform
// heavy scans (e.g. pruner, consistency scanner). When nil, the native connection pool
// (LimitConnectionsToQueueSize) is the only concurrency limit for queries.
type Client struct {
*aerospike.Client
connSemaphore chan struct{} // Simple channel-based semaphore
stats *ClientStats // Always initialized, never nil
connSemaphore chan struct{} // Semaphore for short-lived operations (always set)
querySemaphore chan struct{} // Semaphore for long-running query/scan operations (nil until enabled)
stats *ClientStats // Always initialized, never nil
}

// NewClient creates a new Aerospike client with the specified hostname and port.
Expand Down Expand Up @@ -290,49 +307,131 @@ func (c *Client) BatchOperate(policy *aerospike.BatchPolicy, records []aerospike
return c.Client.BatchOperate(policy, records)
}

// Execute is a wrapper around aerospike.Client.Execute that uses the connection semaphore
// to limit concurrent connections. Execute runs a server-side UDF on a single key.
func (c *Client) Execute(policy *aerospike.WritePolicy, key *aerospike.Key, packageName string, functionName string, args ...aerospike.Value) (any, aerospike.Error) {
if err := c.acquirePermit(policy); err != nil {
return nil, err
}
defer c.releasePermit()

start := gocore.CurrentTime()
defer func() {
c.stats.stat.NewStat("Execute").AddTime(start)
}()

return c.Client.Execute(policy, key, packageName, functionName, args...)
}

// EnableQuerySemaphore activates the query semaphore with the given max concurrent queries.
// This should be called by services that perform long-running scans (pruner, block-assembly
// unmined iterator, consistency scanner) to prevent streaming queries from monopolising the
// connection pool and starving short-lived point operations.
//
// If maxConcurrentQueries is 0, a default of 25% of ConnectionQueueSize is used.
// This method must be called before any queries are issued. It is not safe for concurrent use.
func (c *Client) EnableQuerySemaphore(maxConcurrentQueries int) {
if maxConcurrentQueries <= 0 {
maxConcurrentQueries = int(float64(cap(c.connSemaphore)) * DefaultQuerySemaphoreFraction)
if maxConcurrentQueries < 1 {
maxConcurrentQueries = 1
}
}
c.querySemaphore = make(chan struct{}, maxConcurrentQueries)
}

// QueryPartitions is a wrapper around aerospike.Client.QueryPartitions.
// When the query semaphore is enabled (via EnableQuerySemaphore), it limits the number of
// concurrent long-running streaming operations. The semaphore slot is held for the entire
// streaming duration and released when the recordset finishes.
// When the query semaphore is not enabled, calls pass through to the native client directly.
func (c *Client) QueryPartitions(policy *aerospike.QueryPolicy, statement *aerospike.Statement, partitionFilter *aerospike.PartitionFilter) (*aerospike.Recordset, aerospike.Error) {
if c.querySemaphore == nil {
// No query semaphore configured -- pass through to native client
return c.Client.QueryPartitions(policy, statement, partitionFilter)
}

if err := c.acquireQueryPermit(policy); err != nil {
return nil, err
}

rs, err := c.Client.QueryPartitions(policy, statement, partitionFilter)
if err != nil {
c.releaseQueryPermit()
return nil, err
}

// Wrap the recordset so the semaphore is released when it completes.
// We drain Results() in a goroutine; the channel is closed by the native client
// when the query finishes or the recordset is closed by the caller.
go func() {
Comment thread
ordishs marked this conversation as resolved.
for range rs.Results() { //nolint:revive // intentionally draining to detect completion
}
c.releaseQueryPermit()
}()

return rs, nil
}

// Query is a wrapper around aerospike.Client.Query.
// When the query semaphore is enabled, it limits concurrent streaming operations.
// Otherwise, calls pass through to the native client directly.
func (c *Client) Query(policy *aerospike.QueryPolicy, statement *aerospike.Statement) (*aerospike.Recordset, aerospike.Error) {
return c.QueryPartitions(policy, statement, nil)
}

// GetConnectionQueueSize returns the size of the connection semaphore.
// This represents the maximum number of concurrent Aerospike operations allowed.
// This represents the maximum number of concurrent short-lived Aerospike operations allowed.
func (c *Client) GetConnectionQueueSize() int {
return cap(c.connSemaphore)
}

// acquirePermit attempts to acquire a permit from the connection semaphore with an optional timeout.
// The policy parameter can be nil, in which case no timeout is used (blocks until available).
// If the policy has a TotalTimeout > 0, a fraction of that timeout (semaphoreTimeoutFraction)
// is used for permit acquisition to ensure the total operation time stays within bounds.
// Returns an error if the timeout expires before a permit becomes available.
//
// Accepts any Aerospike policy type (BasePolicy, WritePolicy, BatchPolicy) as they all
// embed BasePolicy which contains TotalTimeout.
func (c *Client) acquirePermit(policy any) aerospike.Error {
totalTimeout := time.Duration(0)

// Extract timeout from policy if available
if policy != nil {
switch p := policy.(type) {
case *aerospike.BasePolicy:
if p != nil && p.TotalTimeout > 0 {
totalTimeout = p.TotalTimeout
}
case *aerospike.WritePolicy:
if p != nil && p.TotalTimeout > 0 {
totalTimeout = p.TotalTimeout
}
case *aerospike.BatchPolicy:
if p != nil && p.TotalTimeout > 0 {
totalTimeout = p.TotalTimeout
}
// GetQuerySemaphoreSize returns the size of the query semaphore, or 0 if not enabled.
func (c *Client) GetQuerySemaphoreSize() int {
if c.querySemaphore == nil {
return 0
}
return cap(c.querySemaphore)
}

// extractTimeout extracts the TotalTimeout from any Aerospike policy type.
// Returns 0 if the policy is nil or does not have a TotalTimeout set.
func extractTimeout(policy any) time.Duration {
if policy == nil {
return 0
}
switch p := policy.(type) {
case *aerospike.BasePolicy:
if p != nil && p.TotalTimeout > 0 {
return p.TotalTimeout
}
case *aerospike.WritePolicy:
if p != nil && p.TotalTimeout > 0 {
return p.TotalTimeout
}
case *aerospike.BatchPolicy:
if p != nil && p.TotalTimeout > 0 {
return p.TotalTimeout
}
case *aerospike.QueryPolicy:
if p != nil && p.TotalTimeout > 0 {
return p.TotalTimeout
}
}
return 0
}

// acquireSemaphore attempts to acquire a permit from the given semaphore with an optional timeout.
// If totalTimeout > 0, a fraction of that timeout (semaphoreTimeoutFraction) is used for
// acquisition to ensure the total operation time stays within bounds.
func acquireSemaphore(sem chan struct{}, totalTimeout time.Duration) aerospike.Error {
if totalTimeout <= 0 {
// No timeout - block until available
c.connSemaphore <- struct{}{}
sem <- struct{}{}
return nil
}

// Calculate semaphore timeout as a fraction of total timeout
// This ensures total operation time (semaphore wait + actual operation) stays within bounds
semaphoreTimeout := time.Duration(float64(totalTimeout) * semaphoreTimeoutFraction)
if semaphoreTimeout < minSemaphoreTimeout {
semaphoreTimeout = minSemaphoreTimeout
Expand All @@ -342,18 +441,42 @@ func (c *Client) acquirePermit(policy any) aerospike.Error {
defer timer.Stop()

select {
case c.connSemaphore <- struct{}{}:
case sem <- struct{}{}:
return nil
case <-timer.C:
return aerospike.ErrTimeout
}
}

// acquirePermit attempts to acquire a permit from the connection semaphore with an optional timeout.
// The policy parameter can be nil, in which case no timeout is used (blocks until available).
// If the policy has a TotalTimeout > 0, a fraction of that timeout (semaphoreTimeoutFraction)
// is used for permit acquisition to ensure the total operation time stays within bounds.
// Returns an error if the timeout expires before a permit becomes available.
//
// Accepts any Aerospike policy type (BasePolicy, WritePolicy, BatchPolicy, QueryPolicy) as they all
// embed BasePolicy which contains TotalTimeout.
func (c *Client) acquirePermit(policy any) aerospike.Error {
return acquireSemaphore(c.connSemaphore, extractTimeout(policy))
}

// releasePermit releases a permit back to the connection semaphore.
func (c *Client) releasePermit() {
<-c.connSemaphore
}

// acquireQueryPermit attempts to acquire a permit from the query semaphore.
// Callers must check that c.querySemaphore != nil before calling.
func (c *Client) acquireQueryPermit(policy any) aerospike.Error {
return acquireSemaphore(c.querySemaphore, extractTimeout(policy))
}

// releaseQueryPermit releases a permit back to the query semaphore.
// Callers must check that c.querySemaphore != nil before calling.
func (c *Client) releaseQueryPermit() {
<-c.querySemaphore
}

// CalculateKeySource generates a key source based on the transaction hash, vout, and batch size.
func CalculateKeySource(hash *chainhash.Hash, vout uint32, batchSize int) []byte {
if batchSize <= 0 {
Expand Down
112 changes: 112 additions & 0 deletions util/uaerospike/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,118 @@ func TestClient_AcquirePermitTimeout(t *testing.T) {
})
}

func TestEnableQuerySemaphore(t *testing.T) {
t.Run("default size is 25% of conn pool", func(t *testing.T) {
client := &Client{
connSemaphore: make(chan struct{}, 128),
stats: NewClientStats(),
}
assert.Equal(t, 0, client.GetQuerySemaphoreSize())

client.EnableQuerySemaphore(0) // 0 means use default fraction
assert.Equal(t, 32, client.GetQuerySemaphoreSize()) // 25% of 128
})

t.Run("explicit size", func(t *testing.T) {
client := &Client{
connSemaphore: make(chan struct{}, 256),
stats: NewClientStats(),
}
client.EnableQuerySemaphore(16)
assert.Equal(t, 16, client.GetQuerySemaphoreSize())
})

t.Run("conn semaphore unchanged", func(t *testing.T) {
client := &Client{
connSemaphore: make(chan struct{}, 256),
stats: NewClientStats(),
}
client.EnableQuerySemaphore(8)
assert.Equal(t, 256, client.GetConnectionQueueSize()) // unchanged
assert.Equal(t, 8, client.GetQuerySemaphoreSize())
})

t.Run("small pool gets minimum 1", func(t *testing.T) {
client := &Client{
connSemaphore: make(chan struct{}, 2),
stats: NewClientStats(),
}
client.EnableQuerySemaphore(0)
assert.Equal(t, 1, client.GetQuerySemaphoreSize())
})
}

func TestExtractTimeout(t *testing.T) {
t.Run("nil policy", func(t *testing.T) {
assert.Equal(t, time.Duration(0), extractTimeout(nil))
})

t.Run("BasePolicy with timeout", func(t *testing.T) {
p := &aerospike.BasePolicy{TotalTimeout: 5 * time.Second}
assert.Equal(t, 5*time.Second, extractTimeout(p))
})

t.Run("WritePolicy with timeout", func(t *testing.T) {
p := aerospike.NewWritePolicy(0, 0)
p.TotalTimeout = 3 * time.Second
assert.Equal(t, 3*time.Second, extractTimeout(p))
})

t.Run("BatchPolicy with timeout", func(t *testing.T) {
p := aerospike.NewBatchPolicy()
p.TotalTimeout = 2 * time.Second
assert.Equal(t, 2*time.Second, extractTimeout(p))
})

t.Run("QueryPolicy with timeout", func(t *testing.T) {
p := aerospike.NewQueryPolicy()
p.TotalTimeout = 10 * time.Second
assert.Equal(t, 10*time.Second, extractTimeout(p))
})

t.Run("unknown policy type", func(t *testing.T) {
assert.Equal(t, time.Duration(0), extractTimeout("not-a-policy"))
})
}

func TestClient_QuerySemaphoreTimeout(t *testing.T) {
t.Run("query semaphore timeout with QueryPolicy", func(t *testing.T) {
client := &Client{
connSemaphore: make(chan struct{}, 1),
stats: NewClientStats(),
}
client.EnableQuerySemaphore(1)

// Fill the query semaphore
client.querySemaphore <- struct{}{}

policy := aerospike.NewQueryPolicy()
policy.TotalTimeout = 1000 * time.Millisecond

start := time.Now()
err := client.acquireQueryPermit(policy)
elapsed := time.Since(start)

assert.Error(t, err)
assert.True(t, elapsed >= minSemaphoreTimeout && elapsed < 200*time.Millisecond,
"Expected timeout around %v, got %v", minSemaphoreTimeout, elapsed)
})
}
Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Minor] Missing test coverage for goroutine lifecycle ✅ FIXED

The current test suite now includes comprehensive goroutine lifecycle tests:

  1. TestWaitForRecordsetInactive (lines 724-770): Tests the polling helper for both active→inactive transition and already-inactive cases

  2. TestQuerySemaphore_ReleaseOnInactive (lines 773-820): Critical integration test that:

    • Acquires a permit
    • Starts the release goroutine (same code path as production)
    • Verifies semaphore held while active (second acquire fails)
    • Marks inactive
    • Verifies permit released within 500ms

This directly tests the early-termination scenario that was missing before.

Issue resolved.


func TestGetConnectionQueueSize_OnlyConnSemaphore(t *testing.T) {
client := &Client{
connSemaphore: make(chan struct{}, 128),
stats: NewClientStats(),
}
assert.Equal(t, 128, client.GetConnectionQueueSize())
assert.Equal(t, 0, client.GetQuerySemaphoreSize())

// After enabling query semaphore, conn size is unchanged
client.EnableQuerySemaphore(16)
assert.Equal(t, 128, client.GetConnectionQueueSize())
assert.Equal(t, 16, client.GetQuerySemaphoreSize())
}

// Test mock functionality separately
func TestMockAerospikeClient_CompleteCoverage(t *testing.T) {
t.Run("mock client functionality", func(t *testing.T) {
Expand Down
Loading
Loading