Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 47 additions & 16 deletions stores/blockchain/sql/GetBlockHeaderIDs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,11 @@ func (s *SQL) GetBlockHeaderIDs(ctx context.Context, blockHashFrom *chainhash.Ha
}
ids := make([]uint32, 0, initialCap)

q := `
WITH RECURSIVE ChainBlocks AS (
SELECT id, parent_id, 1 AS depth
FROM blocks
WHERE hash = $1
UNION ALL
SELECT bb.id, bb.parent_id, cb.depth + 1
FROM blocks bb
JOIN ChainBlocks cb ON bb.id = cb.parent_id
WHERE bb.id != cb.id
AND cb.depth < $2
)
SELECT id FROM ChainBlocks
LIMIT $2
`
rows, err := s.db.QueryContext(ctx, q, blockHashFrom[:], numberOfHeaders)
// Try the on_main_chain fast path; fall back to the recursive CTE on fork
// tips, unknown hashes, or while a main-chain rebuild is in flight. Same
// semantics as buildGetBlockHeadersQuery — see comment there.
q, args := s.buildGetBlockHeaderIDsQuery(ctx, blockHashFrom, numberOfHeaders)
rows, err := s.db.QueryContext(ctx, q, args...)

if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Expand Down Expand Up @@ -170,3 +159,45 @@ func (s *SQL) GetBlockHeaderIDs(ctx context.Context, blockHashFrom *chainhash.Ha

return ids, nil
}

// buildGetBlockHeaderIDsQuery returns the SQL query and args for GetBlockHeaderIDs.
Comment thread
oskarszoon marked this conversation as resolved.
// The fast path uses the on_main_chain partial index when the start hash is on
// the main chain. Otherwise the recursive CTE walks parent_id pointers and is
// authoritative for fork tips and rebuilds.
func (s *SQL) buildGetBlockHeaderIDsQuery(ctx context.Context, blockHashFrom *chainhash.Hash, numberOfHeaders uint64) (string, []interface{}) {
if s.mainChainRebuilding.Load() == 0 {
var onMain bool
if scanErr := s.db.QueryRowContext(ctx,
`SELECT COALESCE((SELECT on_main_chain FROM blocks WHERE hash = $1 LIMIT 1), false)`,
blockHashFrom[:],
).Scan(&onMain); scanErr == nil && onMain {
fastPath := `
SELECT b.id
FROM blocks b
WHERE b.on_main_chain = true
AND b.height <= (SELECT height FROM blocks WHERE hash = $1 LIMIT 1)
Comment thread
oskarszoon marked this conversation as resolved.
Outdated
AND b.height > (SELECT height FROM blocks WHERE hash = $1 LIMIT 1) - $2
ORDER BY b.height DESC
LIMIT $2
`
return fastPath, []interface{}{blockHashFrom[:], numberOfHeaders}
}
}

cte := `
WITH RECURSIVE ChainBlocks AS (
SELECT id, parent_id, 1 AS depth
FROM blocks
WHERE hash = $1
UNION ALL
SELECT bb.id, bb.parent_id, cb.depth + 1
FROM blocks bb
JOIN ChainBlocks cb ON bb.id = cb.parent_id
WHERE bb.id != cb.id
AND cb.depth < $2
)
SELECT id FROM ChainBlocks
LIMIT $2
`
return cte, []interface{}{blockHashFrom[:], numberOfHeaders}
}
104 changes: 72 additions & 32 deletions stores/blockchain/sql/GetBlockHeaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,43 @@ func (s *SQL) GetBlockHeaders(ctx context.Context, blockHashFrom *chainhash.Hash
ctx, cancel := context.WithCancel(ctx)
defer cancel()

const q = `
WITH RECURSIVE ChainBlocks AS (
SELECT id, parent_id, 1 AS depth
FROM blocks
WHERE hash = $1
UNION ALL
SELECT bb.id, bb.parent_id, cb.depth + 1
FROM blocks bb
JOIN ChainBlocks cb ON bb.id = cb.parent_id
WHERE bb.id != cb.id
AND cb.depth < $2
)
SELECT
// Try the on_main_chain fast path when the start hash is itself on the main
Comment thread
oskarszoon marked this conversation as resolved.
// chain and no rebuild is in flight. The fast path replaces an O(N) recursive
// parent_id walk with a single backward index scan over idx_on_main_chain_height
// — measured ~3-6× faster on small datasets and 10-20× on production-sized DBs.
// Fork tips, unknown hashes, or DB errors fall back to the recursive CTE so the
// CTE remains the authoritative path. Same TOCTOU caveats apply as in
// GetLatestBlockHeaderFromBlockLocator: the guard check and main query are
// non-atomic, but the store's single-writer model bounds staleness to one call.
q, args := s.buildGetBlockHeadersQuery(ctx, blockHashFrom, numberOfHeaders)

rows, err := s.db.QueryContext(ctx, q, args...)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return []*model.BlockHeader{}, []*model.BlockHeaderMeta{}, nil
}

return nil, nil, errors.NewStorageError("failed to get headers", err)
}

defer rows.Close()

h, m, err := s.processBlockHeadersRows(rows, numberOfHeaders, false)
if err != nil {
return nil, nil, err
}

cacheOp.Set([2]interface{}{h, m}, cacheTTL)

return h, m, nil
}

// buildGetBlockHeadersQuery returns the SQL query and args for GetBlockHeaders.
// The fast path uses the on_main_chain partial index when the start hash is on
// the main chain. Otherwise the recursive CTE walks parent_id pointers and is
// authoritative for fork tips and rebuilds.
func (s *SQL) buildGetBlockHeadersQuery(ctx context.Context, blockHashFrom *chainhash.Hash, numberOfHeaders uint64) (string, []interface{}) {
const blockColumns = `
b.version
,b.block_time
,b.nonce
Expand All @@ -129,32 +153,48 @@ func (s *SQL) GetBlockHeaders(ctx context.Context, blockHashFrom *chainhash.Hash
,b.subtrees_set
,b.invalid
,b.processed_at
,b.median_time_past
,b.median_time_past`

if s.mainChainRebuilding.Load() == 0 {
var onMain bool
// Treat any error or missing row as "not on main chain"; the CTE fallback
// will surface the same condition (or no rows) to the caller.
if scanErr := s.db.QueryRowContext(ctx,
`SELECT COALESCE((SELECT on_main_chain FROM blocks WHERE hash = $1 LIMIT 1), false)`,
blockHashFrom[:],
).Scan(&onMain); scanErr == nil && onMain {
fastPath := `
SELECT` + blockColumns + `
FROM blocks b
JOIN ChainBlocks cb ON b.id = cb.id
WHERE b.on_main_chain = true
AND b.height <= (SELECT height FROM blocks WHERE hash = $1 LIMIT 1)
Comment thread
oskarszoon marked this conversation as resolved.
Outdated
AND b.height > (SELECT height FROM blocks WHERE hash = $1 LIMIT 1) - $2
ORDER BY b.height DESC
LIMIT $2
`

rows, err := s.db.QueryContext(ctx, q, blockHashFrom[:], numberOfHeaders)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return []*model.BlockHeader{}, []*model.BlockHeaderMeta{}, nil
return fastPath, []interface{}{blockHashFrom[:], numberOfHeaders}
}

return nil, nil, errors.NewStorageError("failed to get headers", err)
}

defer rows.Close()

h, m, err := s.processBlockHeadersRows(rows, numberOfHeaders, false)
if err != nil {
return nil, nil, err
}

cacheOp.Set([2]interface{}{h, m}, cacheTTL)

return h, m, nil
cte := `
WITH RECURSIVE ChainBlocks AS (
SELECT id, parent_id, 1 AS depth
FROM blocks
WHERE hash = $1
UNION ALL
SELECT bb.id, bb.parent_id, cb.depth + 1
FROM blocks bb
JOIN ChainBlocks cb ON bb.id = cb.parent_id
WHERE bb.id != cb.id
AND cb.depth < $2
)
SELECT` + blockColumns + `
FROM blocks b
JOIN ChainBlocks cb ON b.id = cb.id
ORDER BY b.height DESC
LIMIT $2
`
return cte, []interface{}{blockHashFrom[:], numberOfHeaders}
}

func (s *SQL) processBlockHeadersRows(rows *sql.Rows, numberOfHeaders uint64, hasCoinbaseColumn bool) ([]*model.BlockHeader, []*model.BlockHeaderMeta, error) {
Expand Down
Loading