Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 56 additions & 16 deletions stores/blockchain/sql/GetBlockHeaderIDs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,11 @@ func (s *SQL) GetBlockHeaderIDs(ctx context.Context, blockHashFrom *chainhash.Ha
}
ids := make([]uint32, 0, initialCap)

q := `
WITH RECURSIVE ChainBlocks AS (
SELECT id, parent_id, 1 AS depth
FROM blocks
WHERE hash = $1
UNION ALL
SELECT bb.id, bb.parent_id, cb.depth + 1
FROM blocks bb
JOIN ChainBlocks cb ON bb.id = cb.parent_id
WHERE bb.id != cb.id
AND cb.depth < $2
)
SELECT id FROM ChainBlocks
LIMIT $2
`
rows, err := s.db.QueryContext(ctx, q, blockHashFrom[:], numberOfHeaders)
// Try the on_main_chain fast path; fall back to the recursive CTE on fork
// tips, unknown hashes, or while a main-chain rebuild is in flight. Same
// semantics as buildGetBlockHeadersQuery — see comment there.
q, args := s.buildGetBlockHeaderIDsQuery(ctx, blockHashFrom, numberOfHeaders)
rows, err := s.db.QueryContext(ctx, q, args...)

if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Expand Down Expand Up @@ -170,3 +159,54 @@ func (s *SQL) GetBlockHeaderIDs(ctx context.Context, blockHashFrom *chainhash.Ha

return ids, nil
}

// buildGetBlockHeaderIDsQuery returns the SQL query and args for GetBlockHeaderIDs.
Comment thread
oskarszoon marked this conversation as resolved.
// The fast path uses the on_main_chain partial index when the start hash is on
// the main chain. Otherwise the recursive CTE walks parent_id pointers and is
// authoritative for fork tips and rebuilds.
func (s *SQL) buildGetBlockHeaderIDsQuery(ctx context.Context, blockHashFrom *chainhash.Hash, numberOfHeaders uint64) (string, []interface{}) {
if s.mainChainRebuilding.Load() == 0 {
var (
onMain bool
startHeight uint32
)
// Resolve start-block height in the probe so the main query binds it as
// a literal parameter. This (a) lets the planner pick the
// idx_on_main_chain_height partial index for the height range, and
// (b) eliminates the intra-query race that a same-query subquery
// evaluated twice would have.
if scanErr := s.db.QueryRowContext(ctx,
`SELECT COALESCE(on_main_chain, false), COALESCE(height, 0)
FROM blocks WHERE hash = $1 LIMIT 1`,
blockHashFrom[:],
).Scan(&onMain, &startHeight); scanErr == nil && onMain {
fastPath := `
SELECT b.id
FROM blocks b
WHERE b.on_main_chain = true
AND b.height <= $1
AND b.height > $1 - $2
ORDER BY b.height DESC
LIMIT $2
`
return fastPath, []interface{}{startHeight, numberOfHeaders}
}
}

cte := `
WITH RECURSIVE ChainBlocks AS (
SELECT id, parent_id, 1 AS depth
FROM blocks
WHERE hash = $1
UNION ALL
SELECT bb.id, bb.parent_id, cb.depth + 1
FROM blocks bb
JOIN ChainBlocks cb ON bb.id = cb.parent_id
WHERE bb.id != cb.id
AND cb.depth < $2
)
SELECT id FROM ChainBlocks
LIMIT $2
`
return cte, []interface{}{blockHashFrom[:], numberOfHeaders}
}
112 changes: 80 additions & 32 deletions stores/blockchain/sql/GetBlockHeaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,43 @@ func (s *SQL) GetBlockHeaders(ctx context.Context, blockHashFrom *chainhash.Hash
ctx, cancel := context.WithCancel(ctx)
defer cancel()

const q = `
WITH RECURSIVE ChainBlocks AS (
SELECT id, parent_id, 1 AS depth
FROM blocks
WHERE hash = $1
UNION ALL
SELECT bb.id, bb.parent_id, cb.depth + 1
FROM blocks bb
JOIN ChainBlocks cb ON bb.id = cb.parent_id
WHERE bb.id != cb.id
AND cb.depth < $2
)
SELECT
// Try the on_main_chain fast path when the start hash is itself on the main
Comment thread
oskarszoon marked this conversation as resolved.
// chain and no rebuild is in flight. The fast path replaces an O(N) recursive
// parent_id walk with a single backward index scan over idx_on_main_chain_height
// — measured ~3-6× faster on small datasets and 10-20× on production-sized DBs.
// Fork tips, unknown hashes, or DB errors fall back to the recursive CTE so the
// CTE remains the authoritative path. Same TOCTOU caveats apply as in
// GetLatestBlockHeaderFromBlockLocator: the guard check and main query are
// non-atomic, but the store's single-writer model bounds staleness to one call.
q, args := s.buildGetBlockHeadersQuery(ctx, blockHashFrom, numberOfHeaders)

rows, err := s.db.QueryContext(ctx, q, args...)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return []*model.BlockHeader{}, []*model.BlockHeaderMeta{}, nil
}

return nil, nil, errors.NewStorageError("failed to get headers", err)
}

defer rows.Close()

h, m, err := s.processBlockHeadersRows(rows, numberOfHeaders, false)
if err != nil {
return nil, nil, err
}

cacheOp.Set([2]interface{}{h, m}, cacheTTL)

return h, m, nil
}

// buildGetBlockHeadersQuery returns the SQL query and args for GetBlockHeaders.
// The fast path uses the on_main_chain partial index when the start hash is on
// the main chain. Otherwise the recursive CTE walks parent_id pointers and is
// authoritative for fork tips and rebuilds.
func (s *SQL) buildGetBlockHeadersQuery(ctx context.Context, blockHashFrom *chainhash.Hash, numberOfHeaders uint64) (string, []interface{}) {
const blockColumns = `
b.version
,b.block_time
,b.nonce
Expand All @@ -129,32 +153,56 @@ func (s *SQL) GetBlockHeaders(ctx context.Context, blockHashFrom *chainhash.Hash
,b.subtrees_set
,b.invalid
,b.processed_at
,b.median_time_past
,b.median_time_past`

if s.mainChainRebuilding.Load() == 0 {
var (
onMain bool
startHeight uint32
)
// Resolve start-block height in the probe so the main query binds it as
// a literal parameter. This (a) lets the planner pick the
// idx_on_main_chain_height partial index for the height range, and
// (b) eliminates the intra-query race that a same-query subquery
// evaluated twice would have. Treat any error / missing row / off-main-chain
// as "not eligible" and fall through to the CTE.
if scanErr := s.db.QueryRowContext(ctx,
`SELECT COALESCE(on_main_chain, false), COALESCE(height, 0)
FROM blocks WHERE hash = $1 LIMIT 1`,
blockHashFrom[:],
).Scan(&onMain, &startHeight); scanErr == nil && onMain {
fastPath := `
SELECT` + blockColumns + `
FROM blocks b
JOIN ChainBlocks cb ON b.id = cb.id
WHERE b.on_main_chain = true
AND b.height <= $1
AND b.height > $1 - $2
ORDER BY b.height DESC
LIMIT $2
`

rows, err := s.db.QueryContext(ctx, q, blockHashFrom[:], numberOfHeaders)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return []*model.BlockHeader{}, []*model.BlockHeaderMeta{}, nil
return fastPath, []interface{}{startHeight, numberOfHeaders}
}

return nil, nil, errors.NewStorageError("failed to get headers", err)
}

defer rows.Close()

h, m, err := s.processBlockHeadersRows(rows, numberOfHeaders, false)
if err != nil {
return nil, nil, err
}

cacheOp.Set([2]interface{}{h, m}, cacheTTL)

return h, m, nil
cte := `
WITH RECURSIVE ChainBlocks AS (
SELECT id, parent_id, 1 AS depth
FROM blocks
WHERE hash = $1
UNION ALL
SELECT bb.id, bb.parent_id, cb.depth + 1
FROM blocks bb
JOIN ChainBlocks cb ON bb.id = cb.parent_id
WHERE bb.id != cb.id
AND cb.depth < $2
)
SELECT` + blockColumns + `
FROM blocks b
JOIN ChainBlocks cb ON b.id = cb.id
ORDER BY b.height DESC
LIMIT $2
`
return cte, []interface{}{blockHashFrom[:], numberOfHeaders}
}

func (s *SQL) processBlockHeadersRows(rows *sql.Rows, numberOfHeaders uint64, hasCoinbaseColumn bool) ([]*model.BlockHeader, []*model.BlockHeaderMeta, error) {
Expand Down
Loading