Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions services/blockassembly/BlockAssembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ type BlockAssembler struct {
// unminedTransactionsLoading indicates if unmined transactions are currently being loaded
unminedTransactionsLoading atomic.Bool

// unminedDropHashes accumulates hashes that should be dropped from the
// input queue at the end of loadUnminedTransactions. Populated by
// markAsConflicting via the cascade returned from MarkConflictingRecursively.
// Read once by Start / postProcessFn after loadUnminedTransactions returns,
// then handed to subtreeProcessor.DrainQueue. Serialized by
// unminedTransactionsLoading; must not be touched concurrently.
unminedDropHashes map[chainhash.Hash]struct{}

// wg tracks background goroutines for clean shutdown
wg sync.WaitGroup
}
Expand Down Expand Up @@ -559,6 +567,14 @@ func (b *BlockAssembler) reset(ctx context.Context, validateInputs ...bool) erro
return errors.NewProcessingError("[Reset] error loading unmined transactions", err)
}

// Drop any in-flight children of cascaded conflicting parents from
// the input queue before the existing post-postProcess drain runs
// and before default-case dequeue resumes.
if drop := b.unminedDropHashes; len(drop) > 0 {
b.subtreeProcessor.DrainQueue(drop)
}
b.unminedDropHashes = nil

return nil
}

Expand Down Expand Up @@ -841,6 +857,16 @@ func (b *BlockAssembler) Start(ctx context.Context) (err error) {
return errors.NewStorageError("[BlockAssembler] failed to load un-mined transactions: %v", err)
}

// AddTx is already enqueueing on the gRPC side. If loadUnminedTransactions
// flagged any tx as conflicting (and cascaded its descendants), drain the
// input queue with that set as a drop filter before the event-loop
// goroutine starts — otherwise in-flight children whose parent was just
// flagged would be admitted to the next mining candidate.
if drop := b.unminedDropHashes; len(drop) > 0 {
b.subtreeProcessor.DrainQueue(drop)
}
b.unminedDropHashes = nil

// Start SubtreeProcessor goroutine after loading unmined transactions to avoid race conditions
b.subtreeProcessor.Start(ctx)

Expand Down Expand Up @@ -2043,6 +2069,10 @@ func (b *BlockAssembler) loadUnminedTransactions(ctx context.Context, validateIn
b.logger.Infof("[loadUnminedTransactions] unmined transaction loading completed")
}()

// Reset the accumulator: any cascade fired during this load goes here so
// the caller can drain the input queue with this set as a drop filter.
b.unminedDropHashes = make(map[chainhash.Hash]struct{})

if b.utxoStore == nil {
return errors.NewServiceError("[BlockAssembler] no utxostore")
}
Expand Down Expand Up @@ -2509,6 +2539,14 @@ func (b *BlockAssembler) markAsConflicting(ctx context.Context, txHash chainhash
return
}

// Stash cascade hashes for the post-load DrainQueue call. Safe because
// loadUnminedTransactions is serialised by unminedTransactionsLoading.
if b.unminedDropHashes != nil {
for _, h := range cascadedHashes {
b.unminedDropHashes[h] = struct{}{}
}
}

for _, h := range cascadedHashes {
if removeErr := b.subtreeProcessor.Remove(ctx, h); removeErr != nil {
b.logger.Warnf("[validateUnminedTxInputs][%s] failed to evict cascaded tx from subtree processor: %v", h.String(), removeErr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestProcessConflictingTransactions(t *testing.T) {
// This is a complex method that would require extensive mocking, so we'll test it separately

// Call the method under test
result, err := stp.processConflictingTransactions(context.Background(), block, conflictingNodes, map[chainhash.Hash]bool{})
result, _, err := stp.processConflictingTransactions(context.Background(), block, conflictingNodes, map[chainhash.Hash]bool{})

// Verify results
require.NoError(t, err)
Expand Down
113 changes: 97 additions & 16 deletions services/blockassembly/subtreeprocessor/SubtreeProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ type RemainderTransactionParams struct {
CurrentSubtree *subtreepkg.Subtree
TransactionMap *SplitSwissMap
LosingTxHashesMap txmap.TxMap
// ConflictingHashes is the transient set of every tx hash flagged
// Conflicting=true by the cascade that triggered this drain (immediate
// losers + every descendant returned by MarkConflictingRecursively).
// Scoped to a single drain — the dequeue filter checks self-hash and
// every TxInpoints.ParentTxHashes entry, then the set is discarded.
ConflictingHashes map[chainhash.Hash]struct{}
CurrentTxMap TxInpointsMap
SkipDequeue bool
SkipNotification bool
Expand Down Expand Up @@ -1274,7 +1280,12 @@ func (stp *SubtreeProcessor) reset(blockHeader *model.BlockHeader, moveBackBlock
block.Height = blockHeaderMeta.Height
}

losingTxHashesMap, err := utxostore.ProcessConflicting(ctx, stp.utxoStore, block.Height, conflictingNodes, processedConflictingHashesMap)
// The Reset path replays moveForwardBlocks and discards the entire
// queue at the end (see the validUntilMillis drain after
// postProcess). Any in-flight tx whose parent gets cascaded here
// is dropped by that drain regardless of cascade discovery, so
// the per-block transient set is not needed here.
losingTxHashesMap, _, err := utxostore.ProcessConflicting(ctx, stp.utxoStore, block.Height, conflictingNodes, processedConflictingHashesMap)
if err != nil {
return errors.NewProcessingError("[moveForwardBlock][%s] error processing conflicting transactions in Reset()", block.String(), err)
}
Expand Down Expand Up @@ -2641,7 +2652,7 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []*
}

// dequeueDuringBlockMovement all transactions that are in the queue
if err = stp.dequeueDuringBlockMovement(nil, nil, true); err != nil {
if err = stp.dequeueDuringBlockMovement(nil, nil, nil, true); err != nil {
return errors.NewProcessingError("[reorgBlocks] error dequeueing transactions during block movement", err)
}

Expand Down Expand Up @@ -3257,10 +3268,19 @@ func (stp *SubtreeProcessor) createTransactionMapIfNeeded(ctx context.Context, b
return transactionMap, conflictingNodes, nil
}

// processConflictingTransactions handles conflicting transactions and returns losing transaction hashes
// processConflictingTransactions handles conflicting transactions and returns
// losing transaction hashes plus the transient conflicting-hash set populated
// from the BFS cascade. Callers feed the set into the immediately-following
// dequeueDuringBlockMovement so any queue-resident children of cascaded
// parents are rejected before the default-case dequeue picks them up. The
// set is scoped to this single block-movement event and not persisted on the
// processor.
func (stp *SubtreeProcessor) processConflictingTransactions(ctx context.Context, block *model.Block,
conflictingNodes []chainhash.Hash, processedConflictingHashesMap map[chainhash.Hash]bool) (txmap.TxMap, error) {
var losingTxHashesMap txmap.TxMap
conflictingNodes []chainhash.Hash, processedConflictingHashesMap map[chainhash.Hash]bool) (txmap.TxMap, map[chainhash.Hash]struct{}, error) {
var (
losingTxHashesMap txmap.TxMap
conflictingSet map[chainhash.Hash]struct{}
)

// process conflicting txs
if len(conflictingNodes) > 0 {
Expand All @@ -3276,32 +3296,40 @@ func (stp *SubtreeProcessor) processConflictingTransactions(ctx context.Context,
// we can then process the conflicting transactions
_, err := stp.waitForBlockBeingMined(ctx, block.Header.Hash())
if err != nil {
return nil, errors.NewProcessingError("[moveForwardBlock][%s] error waiting for block to be mined", block.String(), err)
return nil, nil, errors.NewProcessingError("[moveForwardBlock][%s] error waiting for block to be mined", block.String(), err)
}

if block.Height == 0 {
// get the block height from the blockchain client
_, blockHeaderMeta, err := stp.blockchainClient.GetBlockHeader(ctx, block.Header.Hash())
if err != nil {
return nil, errors.NewProcessingError("[moveForwardBlock][%s] error getting block header for genesis block", block.String(), err)
return nil, nil, errors.NewProcessingError("[moveForwardBlock][%s] error getting block header for genesis block", block.String(), err)
}

block.Height = blockHeaderMeta.Height
}

if losingTxHashesMap, err = utxostore.ProcessConflicting(ctx, stp.utxoStore, block.Height, conflictingNodes, processedConflictingHashesMap); err != nil {
return nil, errors.NewProcessingError("[moveForwardBlock][%s] error processing conflicting transactions", block.String(), err)
var allMarkedConflicting []chainhash.Hash
if losingTxHashesMap, allMarkedConflicting, err = utxostore.ProcessConflicting(ctx, stp.utxoStore, block.Height, conflictingNodes, processedConflictingHashesMap); err != nil {
return nil, nil, errors.NewProcessingError("[moveForwardBlock][%s] error processing conflicting transactions", block.String(), err)
}

if len(allMarkedConflicting) > 0 {
conflictingSet = make(map[chainhash.Hash]struct{}, len(allMarkedConflicting))
for _, h := range allMarkedConflicting {
conflictingSet[h] = struct{}{}
}
}

if losingTxHashesMap.Length() > 0 {
// mark all the losing txs in the subtrees in the blocks they were mined into as conflicting
if err = stp.markConflictingTxsInSubtrees(ctx, losingTxHashesMap); err != nil {
return nil, errors.NewProcessingError("[moveForwardBlock][%s] error marking conflicting transactions", block.String(), err)
return nil, nil, errors.NewProcessingError("[moveForwardBlock][%s] error marking conflicting transactions", block.String(), err)
}
}
}

return losingTxHashesMap, nil
return losingTxHashesMap, conflictingSet, nil
}

// resetSubtreeState resets the current subtree state and returns the old state
Expand Down Expand Up @@ -3382,7 +3410,7 @@ func (stp *SubtreeProcessor) processRemainderTransactionsAndDequeue(ctx context.
stp.logger.Debugf("[moveForwardBlock][%s] processing queue while moveForwardBlock: %d", params.Block.String(), stp.queue.length())

if !params.SkipDequeue {
if err := stp.dequeueDuringBlockMovement(params.TransactionMap, params.LosingTxHashesMap, params.SkipNotification); err != nil {
if err := stp.dequeueDuringBlockMovement(params.TransactionMap, params.LosingTxHashesMap, params.ConflictingHashes, params.SkipNotification); err != nil {
return errors.NewProcessingError("[moveForwardBlock][%s] error moving up block deQueue", params.Block.String(), err)
}
}
Expand Down Expand Up @@ -3569,7 +3597,8 @@ func (stp *SubtreeProcessor) moveForwardBlock(ctx context.Context, block *model.
}

// Process conflicting transactions
losingTxHashesMap, err = stp.processConflictingTransactions(ctx, block, conflictingNodes, processedConflictingHashesMap)
var conflictingHashes map[chainhash.Hash]struct{}
losingTxHashesMap, conflictingHashes, err = stp.processConflictingTransactions(ctx, block, conflictingNodes, processedConflictingHashesMap)
if err != nil {
return nil, nil, err
}
Expand All @@ -3589,6 +3618,7 @@ func (stp *SubtreeProcessor) moveForwardBlock(ctx context.Context, block *model.
CurrentSubtree: originalCurrentSubtree,
TransactionMap: transactionMap,
LosingTxHashesMap: losingTxHashesMap,
ConflictingHashes: conflictingHashes,
CurrentTxMap: originalCurrentTxMap,
SkipDequeue: skipDequeue,
SkipNotification: skipNotification,
Expand Down Expand Up @@ -3703,16 +3733,43 @@ func (stp *SubtreeProcessor) WaitForPendingBlocks(ctx context.Context) error {
return err
}

// DrainQueue is the public entry point for BlockAssembler to flush in-flight
// children of conflicting parents from the input queue before the event-loop
// goroutine starts (BA.Start path) or before the existing post-postProcess
// drain runs (Reset path's postProcess closure).
//
// Internally calls dequeueDuringBlockMovement with the supplied transient
// drop set as the filter. Non-conflicting txs in the queue are added to the
// current subtree as part of the same drain — same routing as default-case
// dequeue would have used. skipNotification=true so this is safe to invoke
// before subtree-announcement listeners are wired up.
func (stp *SubtreeProcessor) DrainQueue(dropHashes map[chainhash.Hash]struct{}) {
if len(dropHashes) == 0 {
return
}
if err := stp.dequeueDuringBlockMovement(nil, nil, dropHashes, true); err != nil {
stp.logger.Errorf("[SubtreeProcessor][DrainQueue] error: %v", err)
}
}

// dequeueDuringBlockMovement processes the transaction queue during block movement.
//
// Parameters:
// - transactionMap: Map of transactions that were in the block and need to be removed
// - losingTxHashesMap: Map of transactions that were conflicting and need to be removed
// - conflictingHashes: transient set of every tx hash flagged Conflicting=true
// by the cascade that triggered this drain (immediate losers + every
// descendant returned by MarkConflictingRecursively). May be nil/empty.
// A queued tx is rejected if its own hash is in the set OR any hash in
// its TxInpoints.ParentTxHashes is in the set; on rejection by parent
// match the tx's own hash is added to the set so any later-in-batch
// descendants are also caught. The set is scoped to this single drain
// and discarded by the caller.
// - skipNotification: Whether to skip notification of new subtrees
//
// Returns:
// - error: Any error encountered during processing
func (stp *SubtreeProcessor) dequeueDuringBlockMovement(transactionMap *SplitSwissMap, losingTxHashesMap txmap.TxMap, skipNotification bool) (err error) {
func (stp *SubtreeProcessor) dequeueDuringBlockMovement(transactionMap *SplitSwissMap, losingTxHashesMap txmap.TxMap, conflictingHashes map[chainhash.Hash]struct{}, skipNotification bool) (err error) {
queueLength := stp.queue.length()
if queueLength > 0 {
nrBatchesProcessed := int64(0)
Expand All @@ -3728,9 +3785,33 @@ func (stp *SubtreeProcessor) dequeueDuringBlockMovement(transactionMap *SplitSwi
for i, node := range batch.nodes {
txInpoints := batch.txInpoints[i]

if (transactionMap == nil || !transactionMap.Exists(node.Hash)) && (losingTxHashesMap == nil || !losingTxHashesMap.Exists(node.Hash)) {
_ = stp.addNode(node, txInpoints, skipNotification)
if transactionMap != nil && transactionMap.Exists(node.Hash) {
continue
}
if losingTxHashesMap != nil && losingTxHashesMap.Exists(node.Hash) {
continue
}

if len(conflictingHashes) > 0 {
if _, ok := conflictingHashes[node.Hash]; ok {
continue
}
if txInpoints != nil {
matched := false
for _, parent := range txInpoints.ParentTxHashes {
if _, ok := conflictingHashes[parent]; ok {
matched = true
break
}
}
if matched {
conflictingHashes[node.Hash] = struct{}{}
continue
}
}
}

_ = stp.addNode(node, txInpoints, skipNotification)
}

prometheusSubtreeProcessorDequeuedTxs.Add(float64(len(batch.nodes)))
Expand Down
Loading
Loading