Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions stores/utxo/aerospike/aerospike.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,33 @@ func (s *Store) GetSet() string {
return s.setName
}

// registerScanBudget declares a long-running scan service's expected max concurrent
// connection use with the shared uaerospike client. When the cumulative budget across
// services exceeds ConnectionQueueSize × pruner_connectionPoolWarningThreshold, a single
// WARN is emitted with the per-service breakdown so an operator can see exactly which
// configured concurrencies over-subscribe the pool.
//
// Re-registering the same service replaces the prior value, so it is safe to call from
// scan-construction code that may run multiple times across a process lifetime.
func (s *Store) registerScanBudget(service string, budget int) {
if s.client == nil {
return
}
threshold := s.settings.Pruner.ConnectionPoolWarningThreshold
report := s.client.RegisterConnectionBudget(service, budget, threshold)
if report.Exceeded {
s.logger.Warnf(
"Aerospike connection budget exceeded: %d/%d declared (%.1f%% of pool, threshold %.1f%%, recommended max %d). "+
"Breakdown by service: %v. "+
"Increase ConnectionQueueSize or reduce per-service concurrency to avoid pool starvation.",
report.TotalBudget, report.PoolSize,
float64(report.TotalBudget)/float64(report.PoolSize)*100,
report.Threshold*100,
report.Recommended, report.Breakdown,
)
}
}

func (s *Store) SetBlockHeight(blockHeight uint32) error {
if blockHeight == 0 {
return errors.NewInvalidArgumentError("block height cannot be zero")
Expand Down
4 changes: 4 additions & 0 deletions stores/utxo/aerospike/consistency_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func launchConsistencyScan(store *Store, numPartitionQueries int, workerFunc fun
partitionsPerQuery := totalPartitions / numPartitionQueries
remainingPartitions := totalPartitions % numPartitionQueries

// Declare this scan's connection use so the shared client can sum across services
// and warn when configured concurrency over-subscribes the pool.
store.registerScanBudget("consistencyScan", numPartitionQueries)

workerCtx, cancel := context.WithCancel(context.Background())

it := &consistencyScanIterator{
Expand Down
32 changes: 32 additions & 0 deletions stores/utxo/aerospike/pruner/pruner_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@ func (s *Service) getConnectionQueueSize() int {
// validateConnectionPoolSettings validates that pruner concurrency settings won't exceed
// the Aerospike connection pool. If they would, automatically adjusts chunkGroupLimit
// to prevent connection pool exhaustion and logs a WARNING.
//
// After auto-adjustment the (possibly reduced) pruner budget is registered with the
// shared uaerospike client so other long-running query consumers (unmined iterator,
// consistency scanner) can be summed against the pool and produce a single WARN log
// when the total over-subscribes ConnectionQueueSize × ConnectionPoolWarningThreshold.
func (s *Service) validateConnectionPoolSettings() {
// Get Aerospike ConnectionQueueSize from client
connectionQueueSize := s.getConnectionQueueSize()
Expand All @@ -425,6 +430,7 @@ func (s *Service) validateConnectionPoolSettings() {
s.chunkGroupLimit, adjusted,
)
s.chunkGroupLimit = adjusted
maxPrunerConnections = (numWorkers * s.chunkGroupLimit) + numWorkers
} else {
s.logger.Infof(
"Pruner connection pool validation passed. Max pruner connections: %d, "+
Expand All @@ -433,6 +439,32 @@ func (s *Service) validateConnectionPoolSettings() {
float64(maxPrunerConnections)/float64(connectionQueueSize)*100,
)
}

s.registerConnectionBudget("pruner", maxPrunerConnections)
}

// registerConnectionBudget declares this service's expected max concurrent connection
// use on the shared uaerospike client and logs a WARN if the cumulative budget across
// all registered services exceeds ConnectionQueueSize × ConnectionPoolWarningThreshold.
// The pruner's own auto-adjust above keeps the pruner under threshold in isolation;
// this log fires when other services (unmined iterator, consistency scanner) push the
// shared total over the line.
func (s *Service) registerConnectionBudget(service string, budget int) {
if s.client == nil {
return
}
report := s.client.RegisterConnectionBudget(service, budget, s.connectionPoolWarningThreshold)
if report.Exceeded {
s.logger.Warnf(
"Aerospike connection budget exceeded: %d/%d declared (%.1f%% of pool, threshold %.1f%%, recommended max %d). "+
"Breakdown by service: %v. "+
"Increase ConnectionQueueSize or reduce per-service concurrency to avoid pool starvation.",
report.TotalBudget, report.PoolSize,
float64(report.TotalBudget)/float64(report.PoolSize)*100,
report.Threshold*100,
report.Recommended, report.Breakdown,
)
}
}

// partitionWorker processes a range of Aerospike partitions and returns counts
Expand Down
2 changes: 1 addition & 1 deletion stores/utxo/aerospike/pruner_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *Store) GetPrunerService() (pruner.Service, error) {
LuaPackage: LuaPackage,
}

// Create a new pruner service
// Create a new pruner service.
prunerService, err := aeropruner.NewService(s.settings, opts)
if err != nil {
prunerServiceError = err
Expand Down
4 changes: 4 additions & 0 deletions stores/utxo/aerospike/unmined_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ func launchPartitionIterator(store *Store, numPartitionQueries int, prunerMode b
partitionsPerQuery := totalPartitions / numPartitionQueries
remainingPartitions := totalPartitions % numPartitionQueries

// Declare this scan's connection use so the shared client can sum across services
// and warn when configured concurrency over-subscribes the pool.
store.registerScanBudget("unminedIterator", numPartitionQueries)

policy := util.GetAerospikeQueryPolicy(store.settings)
policy.IncludeBinData = true
policy.RecordQueueSize = 512
Expand Down
174 changes: 142 additions & 32 deletions util/uaerospike/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"sort"
"strings"
"sync"
"time"

"github.com/aerospike/aerospike-client-go/v8"
Expand Down Expand Up @@ -51,11 +52,34 @@ func NewClientStats() *ClientStats {
}
}

// Client is a wrapper around aerospike.Client that provides a semaphore to limit concurrent connections.
// ConnectionBudgetReport summarises declared connection use across services so
// operators can see when configured concurrency over-subscribes the pool.
//
// Recommended is computed as int(PoolSize * Threshold). Exceeded is true when
// TotalBudget > Recommended. The breakdown is a snapshot of all currently
// registered service budgets, safe to read or mutate by the caller.
type ConnectionBudgetReport struct {
TotalBudget int
PoolSize int
Threshold float64
Recommended int
Exceeded bool
Breakdown map[string]int
}

// Client is a wrapper around aerospike.Client that limits concurrent connections
// via a single connSemaphore sized to ConnectionQueueSize and exposes a
// connection-budget registry so each long-running query consumer can declare
// its max concurrent use. The registry is diagnostic only -- it does not
// throttle; it lets operators see when configured concurrency would
// over-subscribe the pool (see RegisterConnectionBudget).
type Client struct {
*aerospike.Client
connSemaphore chan struct{} // Simple channel-based semaphore
stats *ClientStats // Always initialized, never nil
connSemaphore chan struct{}
stats *ClientStats

budgetMu sync.Mutex
budgets map[string]int
}

// NewClient creates a new Aerospike client with the specified hostname and port.
Expand All @@ -73,6 +97,7 @@ func NewClient(hostname string, port int) (*Client, error) {
Client: client,
connSemaphore: make(chan struct{}, queueSize),
stats: NewClientStats(),
budgets: make(map[string]int),
}, nil
}

Expand Down Expand Up @@ -132,6 +157,7 @@ func NewClientWithPolicyAndHost(policy *aerospike.ClientPolicy, hosts ...*aerosp
Client: client,
connSemaphore: make(chan struct{}, queueSize),
stats: NewClientStats(),
budgets: make(map[string]int),
}, nil
}

Expand Down Expand Up @@ -290,49 +316,121 @@ func (c *Client) BatchOperate(policy *aerospike.BatchPolicy, records []aerospike
return c.Client.BatchOperate(policy, records)
}

// Execute is a wrapper around aerospike.Client.Execute that uses the connection semaphore
// to limit concurrent connections. Execute runs a server-side UDF on a single key.
func (c *Client) Execute(policy *aerospike.WritePolicy, key *aerospike.Key, packageName string, functionName string, args ...aerospike.Value) (any, aerospike.Error) {
if err := c.acquirePermit(policy); err != nil {
return nil, err
}
defer c.releasePermit()

start := gocore.CurrentTime()
defer func() {
c.stats.stat.NewStat("Execute").AddTime(start)
}()

return c.Client.Execute(policy, key, packageName, functionName, args...)
}

// RegisterConnectionBudget records a service's expected max concurrent connection
// use and returns a report covering all currently registered services. The
// registry is diagnostic only: callers use the returned report to emit an
// operator-facing log when Exceeded is true. It does not throttle.
//
// Re-registering the same service replaces the prior value (use this when a
// service's worker count is re-computed). Passing a budget of 0 removes the
// service from the breakdown.
//
// Concurrent calls from different services are safe. The threshold parameter
// is applied to the returned report only; it is not persisted.
func (c *Client) RegisterConnectionBudget(service string, budget int, threshold float64) ConnectionBudgetReport {
c.budgetMu.Lock()
defer c.budgetMu.Unlock()

if c.budgets == nil {
c.budgets = make(map[string]int)
}
if budget <= 0 {
delete(c.budgets, service)
} else {
c.budgets[service] = budget
}

return c.connectionBudgetReportLocked(threshold)
}

// ConnectionBudget returns the current cumulative report without changing any
// registration. Use this for periodic diagnostics or in tests.
func (c *Client) ConnectionBudget(threshold float64) ConnectionBudgetReport {
c.budgetMu.Lock()
defer c.budgetMu.Unlock()
return c.connectionBudgetReportLocked(threshold)
}

func (c *Client) connectionBudgetReportLocked(threshold float64) ConnectionBudgetReport {
poolSize := cap(c.connSemaphore)
total := 0
breakdown := make(map[string]int, len(c.budgets))
for k, v := range c.budgets {
total += v
breakdown[k] = v
}

recommended := int(float64(poolSize) * threshold)
return ConnectionBudgetReport{
TotalBudget: total,
PoolSize: poolSize,
Threshold: threshold,
Recommended: recommended,
Exceeded: total > recommended,
Breakdown: breakdown,
}
}

// GetConnectionQueueSize returns the size of the connection semaphore.
// This represents the maximum number of concurrent Aerospike operations allowed.
func (c *Client) GetConnectionQueueSize() int {
return cap(c.connSemaphore)
}

// acquirePermit attempts to acquire a permit from the connection semaphore with an optional timeout.
// The policy parameter can be nil, in which case no timeout is used (blocks until available).
// If the policy has a TotalTimeout > 0, a fraction of that timeout (semaphoreTimeoutFraction)
// is used for permit acquisition to ensure the total operation time stays within bounds.
// Returns an error if the timeout expires before a permit becomes available.
//
// Accepts any Aerospike policy type (BasePolicy, WritePolicy, BatchPolicy) as they all
// embed BasePolicy which contains TotalTimeout.
func (c *Client) acquirePermit(policy any) aerospike.Error {
totalTimeout := time.Duration(0)

// Extract timeout from policy if available
if policy != nil {
switch p := policy.(type) {
case *aerospike.BasePolicy:
if p != nil && p.TotalTimeout > 0 {
totalTimeout = p.TotalTimeout
}
case *aerospike.WritePolicy:
if p != nil && p.TotalTimeout > 0 {
totalTimeout = p.TotalTimeout
}
case *aerospike.BatchPolicy:
if p != nil && p.TotalTimeout > 0 {
totalTimeout = p.TotalTimeout
}
// extractTimeout extracts the TotalTimeout from any Aerospike policy type.
// Returns 0 if the policy is nil or does not have a TotalTimeout set.
func extractTimeout(policy any) time.Duration {
if policy == nil {
return 0
}
switch p := policy.(type) {
case *aerospike.BasePolicy:
if p != nil && p.TotalTimeout > 0 {
return p.TotalTimeout
}
case *aerospike.WritePolicy:
if p != nil && p.TotalTimeout > 0 {
return p.TotalTimeout
}
case *aerospike.BatchPolicy:
if p != nil && p.TotalTimeout > 0 {
return p.TotalTimeout
}
case *aerospike.QueryPolicy:
if p != nil && p.TotalTimeout > 0 {
return p.TotalTimeout
}
}
return 0
}

// acquireSemaphore attempts to acquire a permit from the given semaphore with an optional timeout.
// If totalTimeout > 0, a fraction of that timeout (semaphoreTimeoutFraction) is used for
// acquisition to ensure the total operation time stays within bounds.
func acquireSemaphore(sem chan struct{}, totalTimeout time.Duration) aerospike.Error {
if totalTimeout <= 0 {
// No timeout - block until available
c.connSemaphore <- struct{}{}
sem <- struct{}{}
return nil
}

// Calculate semaphore timeout as a fraction of total timeout
// This ensures total operation time (semaphore wait + actual operation) stays within bounds
semaphoreTimeout := time.Duration(float64(totalTimeout) * semaphoreTimeoutFraction)
if semaphoreTimeout < minSemaphoreTimeout {
semaphoreTimeout = minSemaphoreTimeout
Expand All @@ -342,13 +440,25 @@ func (c *Client) acquirePermit(policy any) aerospike.Error {
defer timer.Stop()

select {
case c.connSemaphore <- struct{}{}:
case sem <- struct{}{}:
return nil
case <-timer.C:
return aerospike.ErrTimeout
}
}

// acquirePermit attempts to acquire a permit from the connection semaphore with an optional timeout.
// The policy parameter can be nil, in which case no timeout is used (blocks until available).
// If the policy has a TotalTimeout > 0, a fraction of that timeout (semaphoreTimeoutFraction)
// is used for permit acquisition to ensure the total operation time stays within bounds.
// Returns an error if the timeout expires before a permit becomes available.
//
// Accepts any Aerospike policy type (BasePolicy, WritePolicy, BatchPolicy, QueryPolicy) as they all
// embed BasePolicy which contains TotalTimeout.
func (c *Client) acquirePermit(policy any) aerospike.Error {
return acquireSemaphore(c.connSemaphore, extractTimeout(policy))
}

// releasePermit releases a permit back to the connection semaphore.
func (c *Client) releasePermit() {
<-c.connSemaphore
Expand Down
Loading
Loading