From 4a94dbfd31e17632c6c1bf807bcfa98bf64f40d8 Mon Sep 17 00:00:00 2001 From: oskarszoon <1449115+oskarszoon@users.noreply.github.com> Date: Mon, 4 May 2026 15:15:45 +0200 Subject: [PATCH 1/4] fix(blockassembly): close queue race for children of conflicting parents Production observed parent.Conflicting=true with child.Conflicting=false on teranode-mainnet-eu-1 (v0.15.0-beta-3), producing mining candidates rejected as bad-txns-inputs-missingorspent. The cascade in ProcessConflicting / MarkConflictingRecursively only walks recorded spenders, so a child whose Spend has not yet been committed when the parent flips conflicting slips past. The dequeue paths (Phase 2 default-case filter and dequeueDuringBlockMovement) had no Conflicting check on self-hash or on parent inpoints, so the in-flight child landed in the next mining candidate and the block was rejected by ValidateBlock with "parent transaction X of tx Y has no block IDs". Fix: - ProcessConflicting now returns the BFS marked-order slice from MarkConflictingRecursively (previously discarded). - SubtreeProcessor gains a conflictingMap (separate from removeMap), populated by processConflictingTransactions and by BlockAssembler.markAsConflicting via a new MarkConflicting/ GetConflictingMap pair on the Interface. - Both dequeue filters consult conflictingMap on self-hash and on every TxInpoints.ParentTxHashes entry. When a child is rejected because of a conflicting parent, its own hash is added to the map so any later-arriving descendant is also caught without a store round-trip. - Reset clears conflictingMap, mirroring removeMap, so it does not leak across resets. Adds a regression test that reproduces the queue-race shape end-to-end: fails before the fix, passes after. --- services/blockassembly/BlockAssembler.go | 7 + .../blockassembly/mark_as_conflicting_test.go | 1 + .../subtreeprocessor/SubtreeProcessor.go | 134 +++++++++++++++++- .../conflicting_queue_race_test.go | 113 +++++++++++++++ .../subtreeprocessor/interface.go | 17 +++ .../blockassembly/subtreeprocessor/mock.go | 9 ++ stores/utxo/process_conflicting.go | 34 +++-- stores/utxo/process_conflicting_test.go | 14 +- 8 files changed, 307 insertions(+), 22 deletions(-) create mode 100644 services/blockassembly/subtreeprocessor/conflicting_queue_race_test.go diff --git a/services/blockassembly/BlockAssembler.go b/services/blockassembly/BlockAssembler.go index 59e3cb26c4..1e57f68d4d 100644 --- a/services/blockassembly/BlockAssembler.go +++ b/services/blockassembly/BlockAssembler.go @@ -2420,6 +2420,13 @@ func (b *BlockAssembler) markAsConflicting(ctx context.Context, txHash chainhash return } + // Record every cascaded hash in the subtree processor's conflicting map so + // the queue→subtree dequeue path rejects in-flight children whose spend + // link to the parent has not yet been recorded in the UTXO store. Without + // this, the cascade alone (which walks recorded spenders only) misses + // queue-resident descendants and they land in the next mining candidate. + b.subtreeProcessor.MarkConflicting(cascadedHashes) + for _, h := range cascadedHashes { if removeErr := b.subtreeProcessor.Remove(ctx, h); removeErr != nil { b.logger.Warnf("[validateUnminedTxInputs][%s] failed to evict cascaded tx from subtree processor: %v", h.String(), removeErr) diff --git a/services/blockassembly/mark_as_conflicting_test.go b/services/blockassembly/mark_as_conflicting_test.go index 5b5cca61df..672f183962 100644 --- a/services/blockassembly/mark_as_conflicting_test.go +++ b/services/blockassembly/mark_as_conflicting_test.go @@ -45,6 +45,7 @@ func TestMarkAsConflicting_EvictsCascadedDescendants(t *testing.T) { Return([]*utxoStore.Spend{{TxID: &grandchildHash, Vout: 0}}, []chainhash.Hash{}, nil) mockStp := &subtreeprocessor.MockSubtreeProcessor{} + mockStp.On("MarkConflicting", []chainhash.Hash{parentHash, childHash, grandchildHash}).Once() mockStp.On("Remove", mock.Anything, parentHash).Return(nil).Once() mockStp.On("Remove", mock.Anything, childHash).Return(nil).Once() mockStp.On("Remove", mock.Anything, grandchildHash).Return(nil).Once() diff --git a/services/blockassembly/subtreeprocessor/SubtreeProcessor.go b/services/blockassembly/subtreeprocessor/SubtreeProcessor.go index 5db44f2a90..abab98dcaa 100644 --- a/services/blockassembly/subtreeprocessor/SubtreeProcessor.go +++ b/services/blockassembly/subtreeprocessor/SubtreeProcessor.go @@ -261,6 +261,22 @@ type SubtreeProcessor struct { // removeMap tracks transactions marked for removal removeMap txmap.TxMap + // conflictingMap tracks transactions known to be conflicting (and any of + // their queue-resident descendants we discover at dequeue time). The + // dequeue paths consult this map to reject children whose parents are + // flagged conflicting in the UTXO store but whose own conflicting flag + // has not been cascaded yet — typically because the child's spend + // has not been recorded when SetConflicting walked the parent's outputs. + // + // Populated by: + // - processConflictingTransactions (cascade hashes from + // ProcessConflicting / MarkConflictingRecursively) + // - BlockAssembler.markAsConflicting (reload-time input revalidation) + // - dequeue filter itself, when a child is rejected because of a + // conflicting parent: the child's hash is added so any later-arriving + // descendants are also caught. + conflictingMap txmap.TxMap + // blockchainClient provides access to blockchain data blockchainClient blockchain.ClientI @@ -436,6 +452,7 @@ func NewSubtreeProcessor(_ context.Context, logger ulogger.Logger, tSettings *se currentTxMap: NewSplitTxInpointsMap(splitMapBuckets), deletedTxs: txmap.NewSyncedMap[chainhash.Hash, subtreepkg.TxInpoints](), removeMap: txmap.NewSplitSwissMap(256, 16), + conflictingMap: txmap.NewSplitSwissMap(256, 16), blockchainClient: blockchainClient, subtreeStore: subtreeStore, utxoStore: utxoStore, @@ -826,6 +843,8 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) { // Cache these — they are read on every single iteration removeMap := stp.removeMap mapLength := removeMap.Length() + conflictingMap := stp.conflictingMap + conflictingMapLength := conflictingMap.Length() currentTxMap := stp.currentTxMap currentItemsPerFile := int(stp.currentItemsPerFile.Load()) addedCount := uint64(0) @@ -875,6 +894,35 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) { continue } + // Conflicting filter: reject the tx if either it + // is itself flagged conflicting, or any of its + // parents in TxInpoints is flagged conflicting. + // Cascading: when a child is rejected because of + // a conflicting parent, add the child's own hash + // to the map so that any later-arriving + // descendants (whose spend was not yet recorded + // when the cascade ran) are also caught here. + if conflictingMapLength > 0 { + if conflictingMap.Exists(hash) { + b.nodes[i].Hash = zeroHash + continue + } + if inpoints != nil { + conflictingParent := false + for _, parent := range inpoints.ParentTxHashes { + if conflictingMap.Exists(parent) { + conflictingParent = true + break + } + } + if conflictingParent { + _ = conflictingMap.Put(hash, 1) + b.nodes[i].Hash = zeroHash + continue + } + } + } + // Check for duplicates and insert into txMap if _, wasSet := currentTxMap.SetIfNotExists(hash, inpoints); !wasSet { b.nodes[i].Hash = zeroHash // Mark as duplicate @@ -1166,6 +1214,10 @@ func (stp *SubtreeProcessor) reset(blockHeader *model.BlockHeader, moveBackBlock // never dequeued would otherwise accumulate indefinitely across resets stp.removeMap = txmap.NewSplitSwissMap(256, 16) + // clear conflicting map for the same reason: after reset, the unmined-tx reload + // path re-validates from the store and any new conflicts will be re-marked. + stp.conflictingMap = txmap.NewSplitSwissMap(256, 16) + // reset tx count stp.setTxCountFromSubtrees() @@ -1274,11 +1326,18 @@ func (stp *SubtreeProcessor) reset(blockHeader *model.BlockHeader, moveBackBlock block.Height = blockHeaderMeta.Height } - losingTxHashesMap, err := utxostore.ProcessConflicting(ctx, stp.utxoStore, block.Height, conflictingNodes, processedConflictingHashesMap) + losingTxHashesMap, allMarkedConflicting, err := utxostore.ProcessConflicting(ctx, stp.utxoStore, block.Height, conflictingNodes, processedConflictingHashesMap) if err != nil { return errors.NewProcessingError("[moveForwardBlock][%s] error processing conflicting transactions in Reset()", block.String(), err) } + // Record every cascaded hash so the queue→subtree dequeue path + // can reject children of any of these txs that arrive later or + // are already in flight. Without this, a child whose spend + // hasn't been recorded would slip past the cascade (which only + // walks recorded spenders) and land in the next mining candidate. + stp.recordConflictingHashes(allMarkedConflicting) + if losingTxHashesMap.Length() > 0 { // mark all the losing txs in the subtrees in the blocks they were mined into as conflicting if err = stp.markConflictingTxsInSubtrees(ctx, losingTxHashesMap); err != nil { @@ -1448,6 +1507,38 @@ func (stp *SubtreeProcessor) GetRemoveMap() txmap.TxMap { return stp.removeMap } +// GetConflictingMap returns the map of transactions known to be conflicting. +// The dequeue path consults this map to reject children of conflicting parents +// even when the parent's output→spender link has not been recorded yet. +// +// Returns: +// - txmap.TxMap: Map of conflicting transaction hashes +func (stp *SubtreeProcessor) GetConflictingMap() txmap.TxMap { + return stp.conflictingMap +} + +// MarkConflicting records a set of transaction hashes as conflicting in the +// processor's conflictingMap. Callers (e.g. BlockAssembler.markAsConflicting, +// processConflictingTransactions) should pass every hash that the UTXO store +// has flagged Conflicting=true — both the immediate losers and every cascaded +// descendant returned by MarkConflictingRecursively. The dequeue path will +// then reject any node whose hash is in the map or whose TxInpoints reference +// any hash in the map. +func (stp *SubtreeProcessor) MarkConflicting(hashes []chainhash.Hash) { + stp.recordConflictingHashes(hashes) +} + +// recordConflictingHashes pushes every hash into the conflictingMap. Safe for +// concurrent callers; underlying map is sharded. +func (stp *SubtreeProcessor) recordConflictingHashes(hashes []chainhash.Hash) { + if len(hashes) == 0 || stp.conflictingMap == nil { + return + } + for _, h := range hashes { + _ = stp.conflictingMap.Put(h, 1) + } +} + // GetRemoveMapLength returns the length of the remove map. // // Returns: @@ -3289,10 +3380,15 @@ func (stp *SubtreeProcessor) processConflictingTransactions(ctx context.Context, block.Height = blockHeaderMeta.Height } - if losingTxHashesMap, err = utxostore.ProcessConflicting(ctx, stp.utxoStore, block.Height, conflictingNodes, processedConflictingHashesMap); err != nil { + var allMarkedConflicting []chainhash.Hash + if losingTxHashesMap, allMarkedConflicting, err = utxostore.ProcessConflicting(ctx, stp.utxoStore, block.Height, conflictingNodes, processedConflictingHashesMap); err != nil { return nil, errors.NewProcessingError("[moveForwardBlock][%s] error processing conflicting transactions", block.String(), err) } + // Record every cascaded hash so the queue→subtree dequeue path can + // reject in-flight children whose spend has not been recorded yet. + stp.recordConflictingHashes(allMarkedConflicting) + if losingTxHashesMap.Length() > 0 { // mark all the losing txs in the subtrees in the blocks they were mined into as conflicting if err = stp.markConflictingTxsInSubtrees(ctx, losingTxHashesMap); err != nil { @@ -3717,6 +3813,7 @@ func (stp *SubtreeProcessor) dequeueDuringBlockMovement(transactionMap *SplitSwi if queueLength > 0 { nrBatchesProcessed := int64(0) validFromMillis := time.Now().Add(-1 * stp.settings.BlockAssembly.DoubleSpendWindow).UnixMilli() + conflictingMap := stp.conflictingMap for { batch, found := stp.queue.dequeueBatch(validFromMillis) @@ -3728,9 +3825,38 @@ func (stp *SubtreeProcessor) dequeueDuringBlockMovement(transactionMap *SplitSwi for i, node := range batch.nodes { txInpoints := batch.txInpoints[i] - if (transactionMap == nil || !transactionMap.Exists(node.Hash)) && (losingTxHashesMap == nil || !losingTxHashesMap.Exists(node.Hash)) { - _ = stp.addNode(node, txInpoints, skipNotification) + if transactionMap != nil && transactionMap.Exists(node.Hash) { + continue + } + if losingTxHashesMap != nil && losingTxHashesMap.Exists(node.Hash) { + continue } + + // Conflicting filter: reject the tx if it is itself flagged + // conflicting, or any of its parents in TxInpoints is flagged + // conflicting. When rejected because of a parent, mark the + // child too — this catches later-arriving descendants whose + // spend was not yet recorded when the cascade ran. + if conflictingMap != nil && conflictingMap.Length() > 0 { + if conflictingMap.Exists(node.Hash) { + continue + } + if txInpoints != nil { + conflictingParent := false + for _, parent := range txInpoints.ParentTxHashes { + if conflictingMap.Exists(parent) { + conflictingParent = true + break + } + } + if conflictingParent { + _ = conflictingMap.Put(node.Hash, 1) + continue + } + } + } + + _ = stp.addNode(node, txInpoints, skipNotification) } prometheusSubtreeProcessorDequeuedTxs.Add(float64(len(batch.nodes))) diff --git a/services/blockassembly/subtreeprocessor/conflicting_queue_race_test.go b/services/blockassembly/subtreeprocessor/conflicting_queue_race_test.go new file mode 100644 index 0000000000..571415fa9a --- /dev/null +++ b/services/blockassembly/subtreeprocessor/conflicting_queue_race_test.go @@ -0,0 +1,113 @@ +package subtreeprocessor + +import ( + "testing" + "time" + + "github.com/bsv-blockchain/go-bt/v2/chainhash" + subtreepkg "github.com/bsv-blockchain/go-subtree" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestDequeue_NoParentConflictingCheck_BUG demonstrates the production bug +// observed on teranode-mainnet-eu-1 (v0.15.0-beta-3): a child transaction whose +// parent is already marked Conflicting=true in the UTXO store still lands in the +// block-assembly subtree. The resulting mining candidate fails ValidateBlock with +// "parent transaction X of tx Y has no block IDs" and the block is rejected with +// "bad-txns-inputs-missingorspent" by SVNode. +// +// Race in production: +// +// T0 parent P added to UTXO store via validator. +// T1 ProcessConflicting (during moveForwardBlock with ConflictingNodes) flags +// P.Conflicting=true. Cascade walks P.outputs -> recorded spenders, finds +// none for child C: C's Spend has not been committed yet (C is mid-flight +// in the BA queue). Cascade misses C; C.Conflicting stays false in store. +// T2 Event loop returns to default case, drains queue. Phase 2 filter +// (SubtreeProcessor.go:861-886) only checks removeMap and currentTxMap +// dedup. No Conflicting check on self or any parent in TxInpoints. +// T3 C lands in subtree. Mining candidate built and submitted. REJECTED. +// +// Even the alternative drain path used during block movement, +// dequeueDuringBlockMovement at SubtreeProcessor.go:3715, only filters by +// `losingTxHashesMap.Exists(node.Hash)` — its own hash, never parents. And the +// losing map itself is built in ProcessConflicting (process_conflicting.go:109) +// from GetCounterConflicting hashes only; the cascaded descendants returned by +// MarkConflictingRecursively (line 122) are discarded. So even when the cascade +// does discover a descendant, that hash never reaches the dequeue filter. +// +// This test proves the queue-drain side of the gap: the subtree-processor +// dequeue path has no mechanism to reject a child whose parent the system +// considers conflicting — because there is no map/store lookup at all on the +// parent inpoints during dequeue. +// +// Fix shape (per design discussion): +// - Introduce conflictingMap on SubtreeProcessor (separate from removeMap). +// - ProcessConflicting / MarkConflictingRecursively populate it recursively +// with all known-conflicting hashes (the cascaded descendants currently +// thrown away at process_conflicting.go:122 must be captured into it). +// - Phase 2 dequeue filter (SubtreeProcessor.go:861-886) and +// dequeueDuringBlockMovement (3731) reject any node whose Hash is in the +// map OR whose TxInpoints.ParentTxHashes contains a hash in the map. +// - When rejected at dequeue, the rejected child Hash is itself added to the +// map so any later-arriving descendants are also caught. +// +// Status: this test FAILS under current code (child is admitted) and is +// expected to PASS once the conflictingMap fix is in place. +func TestDequeue_NoParentConflictingCheck_BUG(t *testing.T) { + stp := setupTestSubtreeProcessor(t) + + // We do not need the parent to actually be in the UTXO store to prove the + // bug. Production has the parent marked Conflicting=true in the store, but + // the dequeue path never reads the store for parent inpoints, so the test + // is just as faithful with a synthetic parent hash. (Avoiding real + // SetConflicting also avoids sqlitememory single-writer lock contention + // during this test.) + parentHash := chainhash.HashH([]byte("conflicting-parent")) + childHash := chainhash.HashH([]byte("child-of-conflicting-parent")) + + // Populate the SubtreeProcessor's conflictingMap to reflect what + // processConflictingTransactions / BlockAssembler.markAsConflicting would + // have published when the parent was flagged Conflicting=true in the + // store. The dequeue path must consult this and reject the child. + stp.MarkConflicting([]chainhash.Hash{parentHash}) + + childNode := subtreepkg.Node{Hash: childHash, Fee: 1, SizeInBytes: 250} + childInpoints := &subtreepkg.TxInpoints{ + ParentTxHashes: []chainhash.Hash{parentHash}, + Idxs: [][]uint32{{0}}, + } + + stp.AddBatch([]subtreepkg.Node{childNode}, []*subtreepkg.TxInpoints{childInpoints}) + + require.Eventually(t, func() bool { return stp.QueueLength() == 0 }, + 2*time.Second, 5*time.Millisecond, "queue did not drain") + + // Allow the goroutine one more iteration to complete the insert path. + time.Sleep(50 * time.Millisecond) + + found := false + for _, h := range stp.GetTransactionHashes() { + if h.Equal(childHash) { + found = true + break + } + } + + assert.False(t, found, + "BUG: child of conflicting parent admitted to subtree.\n"+ + " parent: %s (treated as Conflicting=true)\n"+ + " child: %s (admitted into BA subtree)\n"+ + "Cause: SubtreeProcessor.go:861-886 (Phase 2 dequeue filter) and\n"+ + "SubtreeProcessor.go:3731 (dequeueDuringBlockMovement) consult only\n"+ + "removeMap, currentTxMap dedup, and losingTxHashesMap by self-hash.\n"+ + "Neither path checks TxInpoints.ParentTxHashes against any\n"+ + "conflicting-state source. Cascade hashes from\n"+ + "MarkConflictingRecursively are also discarded at\n"+ + "process_conflicting.go:122, so they never reach the filter.\n"+ + "Fix: add conflictingMap, populated recursively, consulted in\n"+ + "Phase 2 filter for both self-hash and parent-inpoints.", + parentHash.String(), childHash.String()) +} + diff --git a/services/blockassembly/subtreeprocessor/interface.go b/services/blockassembly/subtreeprocessor/interface.go index bb04ce5679..47e617ed3c 100644 --- a/services/blockassembly/subtreeprocessor/interface.go +++ b/services/blockassembly/subtreeprocessor/interface.go @@ -214,6 +214,23 @@ type Interface interface { // - int: Number of transactions in the removal map GetRemoveMapLength() int + // GetConflictingMap returns the map of transactions known to be conflicting. + // The dequeue paths consult this map to reject children of conflicting + // parents whose own conflicting flag has not yet been cascaded by the + // store-side traversal of recorded spenders. + // + // Returns: + // - txmap.TxMap: Map of conflicting transaction hashes + GetConflictingMap() txmap.TxMap + + // MarkConflicting records the supplied hashes in the processor's + // conflictingMap so that the queue→subtree dequeue path rejects any node + // whose own hash or whose TxInpoints reference any hash in the map. + // Callers must include the cascaded descendants (e.g. the second return + // value of utxo.MarkConflictingRecursively) — not just the seed hashes — + // otherwise the race window stays open. + MarkConflicting(hashes []chainhash.Hash) + // GetChainedSubtrees returns subtrees that are chained together. // These represent transaction dependencies and processing order. // diff --git a/services/blockassembly/subtreeprocessor/mock.go b/services/blockassembly/subtreeprocessor/mock.go index d38fdaa5bb..332719ef85 100644 --- a/services/blockassembly/subtreeprocessor/mock.go +++ b/services/blockassembly/subtreeprocessor/mock.go @@ -52,6 +52,15 @@ func (m *MockSubtreeProcessor) GetRemoveMapLength() int { return args.Int(0) } +func (m *MockSubtreeProcessor) GetConflictingMap() txmap.TxMap { + args := m.Called() + return args.Get(0).(txmap.TxMap) +} + +func (m *MockSubtreeProcessor) MarkConflicting(hashes []chainhash.Hash) { + m.Called(hashes) +} + func (m *MockSubtreeProcessor) GetCurrentRunningState() State { args := m.Called() return args.Get(0).(State) diff --git a/stores/utxo/process_conflicting.go b/stores/utxo/process_conflicting.go index 474981e88b..22676eb6f1 100644 --- a/stores/utxo/process_conflicting.go +++ b/stores/utxo/process_conflicting.go @@ -48,8 +48,17 @@ import ( - 4: mark tx_double_spend as not conflicting - 5: mark tx_parent1 & tx_parent2 & tx_parent4 as spendable again */ +// ProcessConflicting returns: +// - losingTxHashesMap: hashes of txs displaced by the winners (the immediate +// counter-conflicting set from GetCounterConflicting). Used by callers to +// mark losers in subtrees / drop them from upstream paths. +// - allMarkedConflicting: every hash marked Conflicting=true during this run, +// in BFS order — losers + every descendant the cascade reached. Callers +// (notably block assembly) need this superset to populate a conflictingMap +// so the queue→subtree dequeue path can reject children of conflicting +// parents that arrive after the cascade has run. func ProcessConflicting(ctx context.Context, s Store, blockHeight uint32, conflictingTxHashes []chainhash.Hash, - processedConflictingHashesMap map[chainhash.Hash]bool) (losingTxHashesMap txmap.TxMap, err error) { + processedConflictingHashesMap map[chainhash.Hash]bool) (losingTxHashesMap txmap.TxMap, allMarkedConflicting []chainhash.Hash, err error) { ctx, _, deferFn := tracing.Tracer("utxo").Start(ctx, "ProcessConflicting") defer deferFn() @@ -70,7 +79,7 @@ func ProcessConflicting(ctx context.Context, s Store, blockHeight uint32, confli if txHash.Equal(subtree.CoinbasePlaceholderHashValue) { // the counter-conflicting tx is frozen, we should not process anything further - return nil, errors.NewProcessingError("[ProcessConflicting][%s] tx is frozen", txHash.String()) + return nil, nil, errors.NewProcessingError("[ProcessConflicting][%s] tx is frozen", txHash.String()) } g.Go(func() error { @@ -102,7 +111,7 @@ func ProcessConflicting(ctx context.Context, s Store, blockHeight uint32, confli } if err = g.Wait(); err != nil { - return nil, err + return nil, nil, err } // create a unique list of all the losing tx hashes @@ -118,15 +127,18 @@ func ProcessConflicting(ctx context.Context, s Store, blockHeight uint32, confli losingTxHashes := losingTxHashesMap.Keys() - // - 1: mark all losingTxHashesPerConflictingTx as conflicting + all its spending transactions recursively - affectedParentSpends, _, err := MarkConflictingRecursively(ctx, s, losingTxHashes) + // - 1: mark all losingTxHashesPerConflictingTx as conflicting + all its spending transactions recursively. + // markedOrder is the BFS expansion: every hash now flagged Conflicting=true. Forwarded to callers so + // the block-assembly conflictingMap can include the cascaded descendants (not just the immediate losers). + affectedParentSpends, markedOrder, err := MarkConflictingRecursively(ctx, s, losingTxHashes) if err != nil { - return nil, err + return nil, nil, err } + allMarkedConflicting = markedOrder // - 2: un-spend txa, marking the input txs as not spendable (txp & txq) if err = s.Unspend(ctx, affectedParentSpends, true); err != nil { - return nil, errors.NewProcessingError("error unspending affected parent spends", err) + return nil, nil, errors.NewProcessingError("error unspending affected parent spends", err) } // get the unique hashes of the transactions that were marked as not spendable @@ -158,21 +170,21 @@ func ProcessConflicting(ctx context.Context, s Store, blockHeight uint32, confli } } - return nil, err + return nil, nil, err } } // - 4: mark txb as not conflicting if _, _, err = s.SetConflicting(ctx, conflictingTxHashes, false); err != nil { - return nil, err + return nil, nil, err } // - 5: mark txp & txq as spendable again if err = s.SetLocked(ctx, markedAsNotSpendableHashes, false); err != nil { - return nil, err + return nil, nil, err } - return losingTxHashesMap, nil + return losingTxHashesMap, allMarkedConflicting, nil } // MarkConflictingRecursively marks the given transactions as conflicting, and iteratively marks all their spending diff --git a/stores/utxo/process_conflicting_test.go b/stores/utxo/process_conflicting_test.go index 2bbb69af8c..238d512df4 100644 --- a/stores/utxo/process_conflicting_test.go +++ b/stores/utxo/process_conflicting_test.go @@ -58,7 +58,7 @@ func TestProcessConflicting_Success(t *testing.T) { mockStore.On("SetLocked", mock.Anything, []chainhash.Hash{losingTxHash}, false).Return(nil) // Execute test - result, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) + result, _, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) // Assertions require.NoError(t, err) @@ -75,7 +75,7 @@ func TestProcessConflicting_FrozenTxError(t *testing.T) { conflictingTxHashes := []chainhash.Hash{subtree.CoinbasePlaceholderHashValue} // Execute test - result, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) + result, _, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) // Assertions assert.Nil(t, result) @@ -98,7 +98,7 @@ func TestProcessConflicting_TxNotConflictingError(t *testing.T) { }, nil) // Execute test - result, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) + result, _, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) // Assertions assert.Nil(t, result) @@ -118,7 +118,7 @@ func TestProcessConflicting_GetTxError(t *testing.T) { mockStore.On("Get", mock.Anything, &conflictingTxHash, mock.Anything).Return(nil, errors.NewProcessingError("database error")) // Execute test - result, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) + result, _, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) // Assertions assert.Nil(t, result) @@ -145,7 +145,7 @@ func TestProcessConflicting_GetCounterConflictingError(t *testing.T) { Return([]chainhash.Hash{}, errors.NewProcessingError("counter conflicting error")) // Execute test - result, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) + result, _, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) // Assertions assert.Nil(t, result) @@ -180,7 +180,7 @@ func TestProcessConflicting_UnspendError(t *testing.T) { Return(errors.NewProcessingError("unspend failed")) // Execute test - result, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) + result, _, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) // Assertions assert.Nil(t, result) @@ -223,7 +223,7 @@ func TestProcessConflicting_SpendError(t *testing.T) { Return([]*Spend{spendWithError}, errors.NewTxInvalidError("spend failed")) // Execute test - result, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) + result, _, err := ProcessConflicting(ctx, mockStore, 1, conflictingTxHashes, map[chainhash.Hash]bool{}) // Assertions assert.Nil(t, result) From 643eb936a4555ede9e3734f5b2d37de10f58165b Mon Sep 17 00:00:00 2001 From: oskarszoon <1449115+oskarszoon@users.noreply.github.com> Date: Mon, 4 May 2026 15:26:35 +0200 Subject: [PATCH 2/4] fix(blockassembly): drop trailing newline in conflicting_queue_race_test.go (gci) --- .../subtreeprocessor/conflicting_queue_race_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/services/blockassembly/subtreeprocessor/conflicting_queue_race_test.go b/services/blockassembly/subtreeprocessor/conflicting_queue_race_test.go index 571415fa9a..49134c9bd3 100644 --- a/services/blockassembly/subtreeprocessor/conflicting_queue_race_test.go +++ b/services/blockassembly/subtreeprocessor/conflicting_queue_race_test.go @@ -110,4 +110,3 @@ func TestDequeue_NoParentConflictingCheck_BUG(t *testing.T) { "Phase 2 filter for both self-hash and parent-inpoints.", parentHash.String(), childHash.String()) } - From 5db94a630181e27b1789be1a8f7f571b8435eead Mon Sep 17 00:00:00 2001 From: oskarszoon <1449115+oskarszoon@users.noreply.github.com> Date: Mon, 4 May 2026 16:27:27 +0200 Subject: [PATCH 3/4] refactor(blockassembly): scope conflicting filter to block-movement only Address review feedback: conflictingMap as a persistent SubtreeProcessor field imposed a hot-path lookup on every default-case dequeue, which is wrong. The conflicting-state knowledge is only valid for the duration of one block movement (and the immediate post-cascade drain), not for the lifetime of the processor. - processConflictingTransactions returns a transient map[chainhash.Hash]struct{} of every hash flagged Conflicting=true by the BFS cascade (immediate losers + every descendant returned by MarkConflictingRecursively). - The set is threaded through RemainderTransactionParams.ConflictingHashes into dequeueDuringBlockMovement, which rejects any node whose own hash is in the set OR whose TxInpoints.ParentTxHashes contains a hash in the set. On parent match the node's hash is added to the set so any later-in-batch descendants are caught. - Default-case Phase 2 filter is unchanged: removeMap + currentTxMap dedup only, no conflicting lookup. - No new SubtreeProcessor fields, no new Interface methods, no new mock methods. The cascade information lives only in local variables for the duration of one moveForwardBlock event. - BlockAssembler.markAsConflicting reverted to its pre-fix shape; the reload path's cascade-to-descendants concern is handled separately upstream by PR #806 (validateParentChain). Test rewritten to drive dequeueDuringBlockMovement directly, no event loop. --- services/blockassembly/BlockAssembler.go | 7 - .../blockassembly/mark_as_conflicting_test.go | 1 - .../ConflictingTransactions_test.go | 2 +- .../subtreeprocessor/SubtreeProcessor.go | 176 +++++----------- .../conflicting_queue_race_test.go | 191 ++++++++++-------- .../subtreeprocessor/interface.go | 17 -- .../blockassembly/subtreeprocessor/mock.go | 9 - 7 files changed, 168 insertions(+), 235 deletions(-) diff --git a/services/blockassembly/BlockAssembler.go b/services/blockassembly/BlockAssembler.go index 1e57f68d4d..59e3cb26c4 100644 --- a/services/blockassembly/BlockAssembler.go +++ b/services/blockassembly/BlockAssembler.go @@ -2420,13 +2420,6 @@ func (b *BlockAssembler) markAsConflicting(ctx context.Context, txHash chainhash return } - // Record every cascaded hash in the subtree processor's conflicting map so - // the queue→subtree dequeue path rejects in-flight children whose spend - // link to the parent has not yet been recorded in the UTXO store. Without - // this, the cascade alone (which walks recorded spenders only) misses - // queue-resident descendants and they land in the next mining candidate. - b.subtreeProcessor.MarkConflicting(cascadedHashes) - for _, h := range cascadedHashes { if removeErr := b.subtreeProcessor.Remove(ctx, h); removeErr != nil { b.logger.Warnf("[validateUnminedTxInputs][%s] failed to evict cascaded tx from subtree processor: %v", h.String(), removeErr) diff --git a/services/blockassembly/mark_as_conflicting_test.go b/services/blockassembly/mark_as_conflicting_test.go index 672f183962..5b5cca61df 100644 --- a/services/blockassembly/mark_as_conflicting_test.go +++ b/services/blockassembly/mark_as_conflicting_test.go @@ -45,7 +45,6 @@ func TestMarkAsConflicting_EvictsCascadedDescendants(t *testing.T) { Return([]*utxoStore.Spend{{TxID: &grandchildHash, Vout: 0}}, []chainhash.Hash{}, nil) mockStp := &subtreeprocessor.MockSubtreeProcessor{} - mockStp.On("MarkConflicting", []chainhash.Hash{parentHash, childHash, grandchildHash}).Once() mockStp.On("Remove", mock.Anything, parentHash).Return(nil).Once() mockStp.On("Remove", mock.Anything, childHash).Return(nil).Once() mockStp.On("Remove", mock.Anything, grandchildHash).Return(nil).Once() diff --git a/services/blockassembly/subtreeprocessor/ConflictingTransactions_test.go b/services/blockassembly/subtreeprocessor/ConflictingTransactions_test.go index 2647814a7d..7081e95ca1 100644 --- a/services/blockassembly/subtreeprocessor/ConflictingTransactions_test.go +++ b/services/blockassembly/subtreeprocessor/ConflictingTransactions_test.go @@ -93,7 +93,7 @@ func TestProcessConflictingTransactions(t *testing.T) { // This is a complex method that would require extensive mocking, so we'll test it separately // Call the method under test - result, err := stp.processConflictingTransactions(context.Background(), block, conflictingNodes, map[chainhash.Hash]bool{}) + result, _, err := stp.processConflictingTransactions(context.Background(), block, conflictingNodes, map[chainhash.Hash]bool{}) // Verify results require.NoError(t, err) diff --git a/services/blockassembly/subtreeprocessor/SubtreeProcessor.go b/services/blockassembly/subtreeprocessor/SubtreeProcessor.go index abab98dcaa..9bdba1dae9 100644 --- a/services/blockassembly/subtreeprocessor/SubtreeProcessor.go +++ b/services/blockassembly/subtreeprocessor/SubtreeProcessor.go @@ -151,6 +151,12 @@ type RemainderTransactionParams struct { CurrentSubtree *subtreepkg.Subtree TransactionMap *SplitSwissMap LosingTxHashesMap txmap.TxMap + // ConflictingHashes is the transient set of every tx hash flagged + // Conflicting=true by the cascade that triggered this drain (immediate + // losers + every descendant returned by MarkConflictingRecursively). + // Scoped to a single drain — the dequeue filter checks self-hash and + // every TxInpoints.ParentTxHashes entry, then the set is discarded. + ConflictingHashes map[chainhash.Hash]struct{} CurrentTxMap TxInpointsMap SkipDequeue bool SkipNotification bool @@ -261,22 +267,6 @@ type SubtreeProcessor struct { // removeMap tracks transactions marked for removal removeMap txmap.TxMap - // conflictingMap tracks transactions known to be conflicting (and any of - // their queue-resident descendants we discover at dequeue time). The - // dequeue paths consult this map to reject children whose parents are - // flagged conflicting in the UTXO store but whose own conflicting flag - // has not been cascaded yet — typically because the child's spend - // has not been recorded when SetConflicting walked the parent's outputs. - // - // Populated by: - // - processConflictingTransactions (cascade hashes from - // ProcessConflicting / MarkConflictingRecursively) - // - BlockAssembler.markAsConflicting (reload-time input revalidation) - // - dequeue filter itself, when a child is rejected because of a - // conflicting parent: the child's hash is added so any later-arriving - // descendants are also caught. - conflictingMap txmap.TxMap - // blockchainClient provides access to blockchain data blockchainClient blockchain.ClientI @@ -452,7 +442,6 @@ func NewSubtreeProcessor(_ context.Context, logger ulogger.Logger, tSettings *se currentTxMap: NewSplitTxInpointsMap(splitMapBuckets), deletedTxs: txmap.NewSyncedMap[chainhash.Hash, subtreepkg.TxInpoints](), removeMap: txmap.NewSplitSwissMap(256, 16), - conflictingMap: txmap.NewSplitSwissMap(256, 16), blockchainClient: blockchainClient, subtreeStore: subtreeStore, utxoStore: utxoStore, @@ -843,8 +832,6 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) { // Cache these — they are read on every single iteration removeMap := stp.removeMap mapLength := removeMap.Length() - conflictingMap := stp.conflictingMap - conflictingMapLength := conflictingMap.Length() currentTxMap := stp.currentTxMap currentItemsPerFile := int(stp.currentItemsPerFile.Load()) addedCount := uint64(0) @@ -894,35 +881,6 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) { continue } - // Conflicting filter: reject the tx if either it - // is itself flagged conflicting, or any of its - // parents in TxInpoints is flagged conflicting. - // Cascading: when a child is rejected because of - // a conflicting parent, add the child's own hash - // to the map so that any later-arriving - // descendants (whose spend was not yet recorded - // when the cascade ran) are also caught here. - if conflictingMapLength > 0 { - if conflictingMap.Exists(hash) { - b.nodes[i].Hash = zeroHash - continue - } - if inpoints != nil { - conflictingParent := false - for _, parent := range inpoints.ParentTxHashes { - if conflictingMap.Exists(parent) { - conflictingParent = true - break - } - } - if conflictingParent { - _ = conflictingMap.Put(hash, 1) - b.nodes[i].Hash = zeroHash - continue - } - } - } - // Check for duplicates and insert into txMap if _, wasSet := currentTxMap.SetIfNotExists(hash, inpoints); !wasSet { b.nodes[i].Hash = zeroHash // Mark as duplicate @@ -1214,10 +1172,6 @@ func (stp *SubtreeProcessor) reset(blockHeader *model.BlockHeader, moveBackBlock // never dequeued would otherwise accumulate indefinitely across resets stp.removeMap = txmap.NewSplitSwissMap(256, 16) - // clear conflicting map for the same reason: after reset, the unmined-tx reload - // path re-validates from the store and any new conflicts will be re-marked. - stp.conflictingMap = txmap.NewSplitSwissMap(256, 16) - // reset tx count stp.setTxCountFromSubtrees() @@ -1326,18 +1280,16 @@ func (stp *SubtreeProcessor) reset(blockHeader *model.BlockHeader, moveBackBlock block.Height = blockHeaderMeta.Height } - losingTxHashesMap, allMarkedConflicting, err := utxostore.ProcessConflicting(ctx, stp.utxoStore, block.Height, conflictingNodes, processedConflictingHashesMap) + // The Reset path replays moveForwardBlocks and discards the entire + // queue at the end (see the validUntilMillis drain after + // postProcess). Any in-flight tx whose parent gets cascaded here + // is dropped by that drain regardless of cascade discovery, so + // the per-block transient set is not needed here. + losingTxHashesMap, _, err := utxostore.ProcessConflicting(ctx, stp.utxoStore, block.Height, conflictingNodes, processedConflictingHashesMap) if err != nil { return errors.NewProcessingError("[moveForwardBlock][%s] error processing conflicting transactions in Reset()", block.String(), err) } - // Record every cascaded hash so the queue→subtree dequeue path - // can reject children of any of these txs that arrive later or - // are already in flight. Without this, a child whose spend - // hasn't been recorded would slip past the cascade (which only - // walks recorded spenders) and land in the next mining candidate. - stp.recordConflictingHashes(allMarkedConflicting) - if losingTxHashesMap.Length() > 0 { // mark all the losing txs in the subtrees in the blocks they were mined into as conflicting if err = stp.markConflictingTxsInSubtrees(ctx, losingTxHashesMap); err != nil { @@ -1507,38 +1459,6 @@ func (stp *SubtreeProcessor) GetRemoveMap() txmap.TxMap { return stp.removeMap } -// GetConflictingMap returns the map of transactions known to be conflicting. -// The dequeue path consults this map to reject children of conflicting parents -// even when the parent's output→spender link has not been recorded yet. -// -// Returns: -// - txmap.TxMap: Map of conflicting transaction hashes -func (stp *SubtreeProcessor) GetConflictingMap() txmap.TxMap { - return stp.conflictingMap -} - -// MarkConflicting records a set of transaction hashes as conflicting in the -// processor's conflictingMap. Callers (e.g. BlockAssembler.markAsConflicting, -// processConflictingTransactions) should pass every hash that the UTXO store -// has flagged Conflicting=true — both the immediate losers and every cascaded -// descendant returned by MarkConflictingRecursively. The dequeue path will -// then reject any node whose hash is in the map or whose TxInpoints reference -// any hash in the map. -func (stp *SubtreeProcessor) MarkConflicting(hashes []chainhash.Hash) { - stp.recordConflictingHashes(hashes) -} - -// recordConflictingHashes pushes every hash into the conflictingMap. Safe for -// concurrent callers; underlying map is sharded. -func (stp *SubtreeProcessor) recordConflictingHashes(hashes []chainhash.Hash) { - if len(hashes) == 0 || stp.conflictingMap == nil { - return - } - for _, h := range hashes { - _ = stp.conflictingMap.Put(h, 1) - } -} - // GetRemoveMapLength returns the length of the remove map. // // Returns: @@ -2732,7 +2652,7 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []* } // dequeueDuringBlockMovement all transactions that are in the queue - if err = stp.dequeueDuringBlockMovement(nil, nil, true); err != nil { + if err = stp.dequeueDuringBlockMovement(nil, nil, nil, true); err != nil { return errors.NewProcessingError("[reorgBlocks] error dequeueing transactions during block movement", err) } @@ -3348,10 +3268,19 @@ func (stp *SubtreeProcessor) createTransactionMapIfNeeded(ctx context.Context, b return transactionMap, conflictingNodes, nil } -// processConflictingTransactions handles conflicting transactions and returns losing transaction hashes +// processConflictingTransactions handles conflicting transactions and returns +// losing transaction hashes plus the transient conflicting-hash set populated +// from the BFS cascade. Callers feed the set into the immediately-following +// dequeueDuringBlockMovement so any queue-resident children of cascaded +// parents are rejected before the default-case dequeue picks them up. The +// set is scoped to this single block-movement event and not persisted on the +// processor. func (stp *SubtreeProcessor) processConflictingTransactions(ctx context.Context, block *model.Block, - conflictingNodes []chainhash.Hash, processedConflictingHashesMap map[chainhash.Hash]bool) (txmap.TxMap, error) { - var losingTxHashesMap txmap.TxMap + conflictingNodes []chainhash.Hash, processedConflictingHashesMap map[chainhash.Hash]bool) (txmap.TxMap, map[chainhash.Hash]struct{}, error) { + var ( + losingTxHashesMap txmap.TxMap + conflictingSet map[chainhash.Hash]struct{} + ) // process conflicting txs if len(conflictingNodes) > 0 { @@ -3367,14 +3296,14 @@ func (stp *SubtreeProcessor) processConflictingTransactions(ctx context.Context, // we can then process the conflicting transactions _, err := stp.waitForBlockBeingMined(ctx, block.Header.Hash()) if err != nil { - return nil, errors.NewProcessingError("[moveForwardBlock][%s] error waiting for block to be mined", block.String(), err) + return nil, nil, errors.NewProcessingError("[moveForwardBlock][%s] error waiting for block to be mined", block.String(), err) } if block.Height == 0 { // get the block height from the blockchain client _, blockHeaderMeta, err := stp.blockchainClient.GetBlockHeader(ctx, block.Header.Hash()) if err != nil { - return nil, errors.NewProcessingError("[moveForwardBlock][%s] error getting block header for genesis block", block.String(), err) + return nil, nil, errors.NewProcessingError("[moveForwardBlock][%s] error getting block header for genesis block", block.String(), err) } block.Height = blockHeaderMeta.Height @@ -3382,22 +3311,25 @@ func (stp *SubtreeProcessor) processConflictingTransactions(ctx context.Context, var allMarkedConflicting []chainhash.Hash if losingTxHashesMap, allMarkedConflicting, err = utxostore.ProcessConflicting(ctx, stp.utxoStore, block.Height, conflictingNodes, processedConflictingHashesMap); err != nil { - return nil, errors.NewProcessingError("[moveForwardBlock][%s] error processing conflicting transactions", block.String(), err) + return nil, nil, errors.NewProcessingError("[moveForwardBlock][%s] error processing conflicting transactions", block.String(), err) } - // Record every cascaded hash so the queue→subtree dequeue path can - // reject in-flight children whose spend has not been recorded yet. - stp.recordConflictingHashes(allMarkedConflicting) + if len(allMarkedConflicting) > 0 { + conflictingSet = make(map[chainhash.Hash]struct{}, len(allMarkedConflicting)) + for _, h := range allMarkedConflicting { + conflictingSet[h] = struct{}{} + } + } if losingTxHashesMap.Length() > 0 { // mark all the losing txs in the subtrees in the blocks they were mined into as conflicting if err = stp.markConflictingTxsInSubtrees(ctx, losingTxHashesMap); err != nil { - return nil, errors.NewProcessingError("[moveForwardBlock][%s] error marking conflicting transactions", block.String(), err) + return nil, nil, errors.NewProcessingError("[moveForwardBlock][%s] error marking conflicting transactions", block.String(), err) } } } - return losingTxHashesMap, nil + return losingTxHashesMap, conflictingSet, nil } // resetSubtreeState resets the current subtree state and returns the old state @@ -3478,7 +3410,7 @@ func (stp *SubtreeProcessor) processRemainderTransactionsAndDequeue(ctx context. stp.logger.Debugf("[moveForwardBlock][%s] processing queue while moveForwardBlock: %d", params.Block.String(), stp.queue.length()) if !params.SkipDequeue { - if err := stp.dequeueDuringBlockMovement(params.TransactionMap, params.LosingTxHashesMap, params.SkipNotification); err != nil { + if err := stp.dequeueDuringBlockMovement(params.TransactionMap, params.LosingTxHashesMap, params.ConflictingHashes, params.SkipNotification); err != nil { return errors.NewProcessingError("[moveForwardBlock][%s] error moving up block deQueue", params.Block.String(), err) } } @@ -3665,7 +3597,8 @@ func (stp *SubtreeProcessor) moveForwardBlock(ctx context.Context, block *model. } // Process conflicting transactions - losingTxHashesMap, err = stp.processConflictingTransactions(ctx, block, conflictingNodes, processedConflictingHashesMap) + var conflictingHashes map[chainhash.Hash]struct{} + losingTxHashesMap, conflictingHashes, err = stp.processConflictingTransactions(ctx, block, conflictingNodes, processedConflictingHashesMap) if err != nil { return nil, nil, err } @@ -3685,6 +3618,7 @@ func (stp *SubtreeProcessor) moveForwardBlock(ctx context.Context, block *model. CurrentSubtree: originalCurrentSubtree, TransactionMap: transactionMap, LosingTxHashesMap: losingTxHashesMap, + ConflictingHashes: conflictingHashes, CurrentTxMap: originalCurrentTxMap, SkipDequeue: skipDequeue, SkipNotification: skipNotification, @@ -3804,16 +3738,23 @@ func (stp *SubtreeProcessor) WaitForPendingBlocks(ctx context.Context) error { // Parameters: // - transactionMap: Map of transactions that were in the block and need to be removed // - losingTxHashesMap: Map of transactions that were conflicting and need to be removed +// - conflictingHashes: transient set of every tx hash flagged Conflicting=true +// by the cascade that triggered this drain (immediate losers + every +// descendant returned by MarkConflictingRecursively). May be nil/empty. +// A queued tx is rejected if its own hash is in the set OR any hash in +// its TxInpoints.ParentTxHashes is in the set; on rejection by parent +// match the tx's own hash is added to the set so any later-in-batch +// descendants are also caught. The set is scoped to this single drain +// and discarded by the caller. // - skipNotification: Whether to skip notification of new subtrees // // Returns: // - error: Any error encountered during processing -func (stp *SubtreeProcessor) dequeueDuringBlockMovement(transactionMap *SplitSwissMap, losingTxHashesMap txmap.TxMap, skipNotification bool) (err error) { +func (stp *SubtreeProcessor) dequeueDuringBlockMovement(transactionMap *SplitSwissMap, losingTxHashesMap txmap.TxMap, conflictingHashes map[chainhash.Hash]struct{}, skipNotification bool) (err error) { queueLength := stp.queue.length() if queueLength > 0 { nrBatchesProcessed := int64(0) validFromMillis := time.Now().Add(-1 * stp.settings.BlockAssembly.DoubleSpendWindow).UnixMilli() - conflictingMap := stp.conflictingMap for { batch, found := stp.queue.dequeueBatch(validFromMillis) @@ -3832,25 +3773,20 @@ func (stp *SubtreeProcessor) dequeueDuringBlockMovement(transactionMap *SplitSwi continue } - // Conflicting filter: reject the tx if it is itself flagged - // conflicting, or any of its parents in TxInpoints is flagged - // conflicting. When rejected because of a parent, mark the - // child too — this catches later-arriving descendants whose - // spend was not yet recorded when the cascade ran. - if conflictingMap != nil && conflictingMap.Length() > 0 { - if conflictingMap.Exists(node.Hash) { + if len(conflictingHashes) > 0 { + if _, ok := conflictingHashes[node.Hash]; ok { continue } if txInpoints != nil { - conflictingParent := false + matched := false for _, parent := range txInpoints.ParentTxHashes { - if conflictingMap.Exists(parent) { - conflictingParent = true + if _, ok := conflictingHashes[parent]; ok { + matched = true break } } - if conflictingParent { - _ = conflictingMap.Put(node.Hash, 1) + if matched { + conflictingHashes[node.Hash] = struct{}{} continue } } diff --git a/services/blockassembly/subtreeprocessor/conflicting_queue_race_test.go b/services/blockassembly/subtreeprocessor/conflicting_queue_race_test.go index 49134c9bd3..901346dfb2 100644 --- a/services/blockassembly/subtreeprocessor/conflicting_queue_race_test.go +++ b/services/blockassembly/subtreeprocessor/conflicting_queue_race_test.go @@ -1,112 +1,143 @@ package subtreeprocessor import ( + "context" + "net/url" "testing" "time" "github.com/bsv-blockchain/go-bt/v2/chainhash" subtreepkg "github.com/bsv-blockchain/go-subtree" + blob_memory "github.com/bsv-blockchain/teranode/stores/blob/memory" + "github.com/bsv-blockchain/teranode/stores/utxo/sql" + "github.com/bsv-blockchain/teranode/ulogger" + "github.com/bsv-blockchain/teranode/util/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// TestDequeue_NoParentConflictingCheck_BUG demonstrates the production bug -// observed on teranode-mainnet-eu-1 (v0.15.0-beta-3): a child transaction whose -// parent is already marked Conflicting=true in the UTXO store still lands in the -// block-assembly subtree. The resulting mining candidate fails ValidateBlock with -// "parent transaction X of tx Y has no block IDs" and the block is rejected with -// "bad-txns-inputs-missingorspent" by SVNode. +// TestDequeueDuringBlockMovement_RejectsChildOfConflictingParent demonstrates +// the production bug observed on teranode-mainnet-eu-1 (v0.15.0-beta-3): a +// child transaction whose parent is already marked Conflicting=true in the +// UTXO store still lands in the block-assembly subtree. The mining candidate +// then fails ValidateBlock with "parent transaction X of tx Y has no block +// IDs" and is rejected with bad-txns-inputs-missingorspent. // // Race in production: // // T0 parent P added to UTXO store via validator. -// T1 ProcessConflicting (during moveForwardBlock with ConflictingNodes) flags -// P.Conflicting=true. Cascade walks P.outputs -> recorded spenders, finds -// none for child C: C's Spend has not been committed yet (C is mid-flight -// in the BA queue). Cascade misses C; C.Conflicting stays false in store. -// T2 Event loop returns to default case, drains queue. Phase 2 filter -// (SubtreeProcessor.go:861-886) only checks removeMap and currentTxMap -// dedup. No Conflicting check on self or any parent in TxInpoints. -// T3 C lands in subtree. Mining candidate built and submitted. REJECTED. +// T1 ProcessConflicting (during moveForwardBlock with ConflictingNodes) +// flags P.Conflicting=true. Cascade walks P.outputs -> recorded +// spenders, finds none for child C: C's Spend has not been committed +// yet (C is mid-flight in the BA queue). Cascade misses C. +// T2 Event loop falls into dequeueDuringBlockMovement to drain whatever +// accumulated during the moveForwardBlock case. The drain filter only +// checked self-hash against transactionMap and losingTxHashesMap. No +// parent-inpoints check. C admitted into subtree. +// T3 C lands in subtree. Mining candidate built. Block REJECTED. // -// Even the alternative drain path used during block movement, -// dequeueDuringBlockMovement at SubtreeProcessor.go:3715, only filters by -// `losingTxHashesMap.Exists(node.Hash)` — its own hash, never parents. And the -// losing map itself is built in ProcessConflicting (process_conflicting.go:109) -// from GetCounterConflicting hashes only; the cascaded descendants returned by -// MarkConflictingRecursively (line 122) are discarded. So even when the cascade -// does discover a descendant, that hash never reaches the dequeue filter. -// -// This test proves the queue-drain side of the gap: the subtree-processor -// dequeue path has no mechanism to reject a child whose parent the system -// considers conflicting — because there is no map/store lookup at all on the -// parent inpoints during dequeue. -// -// Fix shape (per design discussion): -// - Introduce conflictingMap on SubtreeProcessor (separate from removeMap). -// - ProcessConflicting / MarkConflictingRecursively populate it recursively -// with all known-conflicting hashes (the cascaded descendants currently -// thrown away at process_conflicting.go:122 must be captured into it). -// - Phase 2 dequeue filter (SubtreeProcessor.go:861-886) and -// dequeueDuringBlockMovement (3731) reject any node whose Hash is in the -// map OR whose TxInpoints.ParentTxHashes contains a hash in the map. -// - When rejected at dequeue, the rejected child Hash is itself added to the -// map so any later-arriving descendants are also caught. -// -// Status: this test FAILS under current code (child is admitted) and is -// expected to PASS once the conflictingMap fix is in place. -func TestDequeue_NoParentConflictingCheck_BUG(t *testing.T) { - stp := setupTestSubtreeProcessor(t) - - // We do not need the parent to actually be in the UTXO store to prove the - // bug. Production has the parent marked Conflicting=true in the store, but - // the dequeue path never reads the store for parent inpoints, so the test - // is just as faithful with a synthetic parent hash. (Avoiding real - // SetConflicting also avoids sqlitememory single-writer lock contention - // during this test.) +// Fix: processConflictingTransactions now returns a transient set of every +// hash flagged Conflicting=true by the BFS cascade (immediate losers + every +// descendant returned by MarkConflictingRecursively). That set is threaded +// through RemainderTransactionParams.ConflictingHashes into +// dequeueDuringBlockMovement, which rejects any node whose own hash is in +// the set OR whose TxInpoints.ParentTxHashes contains a hash in the set. +// On parent match the node's hash is also added to the set so any +// later-in-batch descendants are caught. The set is scoped to this single +// drain — the default-case dequeue path is left untouched. +func TestDequeueDuringBlockMovement_RejectsChildOfConflictingParent(t *testing.T) { + stp := newTestProcessorNoStart(t) + parentHash := chainhash.HashH([]byte("conflicting-parent")) childHash := chainhash.HashH([]byte("child-of-conflicting-parent")) - - // Populate the SubtreeProcessor's conflictingMap to reflect what - // processConflictingTransactions / BlockAssembler.markAsConflicting would - // have published when the parent was flagged Conflicting=true in the - // store. The dequeue path must consult this and reject the child. - stp.MarkConflicting([]chainhash.Hash{parentHash}) + otherHash := chainhash.HashH([]byte("unrelated-tx")) childNode := subtreepkg.Node{Hash: childHash, Fee: 1, SizeInBytes: 250} childInpoints := &subtreepkg.TxInpoints{ ParentTxHashes: []chainhash.Hash{parentHash}, Idxs: [][]uint32{{0}}, } + otherNode := subtreepkg.Node{Hash: otherHash, Fee: 2, SizeInBytes: 220} + otherInpoints := &subtreepkg.TxInpoints{ + ParentTxHashes: []chainhash.Hash{chainhash.HashH([]byte("unrelated-parent"))}, + Idxs: [][]uint32{{0}}, + } - stp.AddBatch([]subtreepkg.Node{childNode}, []*subtreepkg.TxInpoints{childInpoints}) + stp.queue.enqueueBatch( + []subtreepkg.Node{childNode, otherNode}, + []*subtreepkg.TxInpoints{childInpoints, otherInpoints}, + ) - require.Eventually(t, func() bool { return stp.QueueLength() == 0 }, - 2*time.Second, 5*time.Millisecond, "queue did not drain") + // dequeueDuringBlockMovement holds back batches enqueued at-or-after + // (now - DoubleSpendWindow). Default window is 0, so it holds batches + // with time == now. A short sleep moves batch.time strictly into the + // past so the drain releases it. + time.Sleep(5 * time.Millisecond) - // Allow the goroutine one more iteration to complete the insert path. - time.Sleep(50 * time.Millisecond) + conflictingHashes := map[chainhash.Hash]struct{}{ + parentHash: {}, + } + + require.NoError(t, stp.dequeueDuringBlockMovement(nil, nil, conflictingHashes, true)) + + hashes := collectSubtreeHashes(stp) + + assert.NotContains(t, hashes, childHash, + "child of conflicting parent must be rejected by the dequeue filter") + assert.Contains(t, hashes, otherHash, + "unrelated tx must still pass through the filter") + + // Cascade through the set: rejected child hash should now be in + // conflictingHashes so any later-in-batch descendant of the child is + // also rejected without a store round-trip. + _, marked := conflictingHashes[childHash] + assert.True(t, marked, "rejected child must be added to the transient set "+ + "so its own descendants are caught later in the same drain") +} - found := false - for _, h := range stp.GetTransactionHashes() { - if h.Equal(childHash) { - found = true - break +// newTestProcessorNoStart builds a SubtreeProcessor without starting the +// event-loop goroutine. This lets the test drive dequeueDuringBlockMovement +// directly with a known queue state and a known conflictingHashes set, with +// no race against the default-case dequeue. +func newTestProcessorNoStart(t *testing.T) *SubtreeProcessor { + t.Helper() + + utxoStoreURL, err := url.Parse("sqlitememory:///test") + require.NoError(t, err) + + utxoStore, err := sql.New(context.Background(), ulogger.TestLogger{}, test.CreateBaseTestSettings(t), utxoStoreURL) + require.NoError(t, err) + + settings := test.CreateBaseTestSettings(t) + settings.BlockAssembly.InitialMerkleItemsPerSubtree = 32 + + newSubtreeChan := make(chan NewSubtreeRequest, 10) + go func() { + for req := range newSubtreeChan { + if req.ErrChan != nil { + req.ErrChan <- nil + } } - } + }() + t.Cleanup(func() { close(newSubtreeChan) }) + + stp, err := NewSubtreeProcessor(t.Context(), ulogger.TestLogger{}, settings, blob_memory.New(), nil, utxoStore, newSubtreeChan) + require.NoError(t, err) - assert.False(t, found, - "BUG: child of conflicting parent admitted to subtree.\n"+ - " parent: %s (treated as Conflicting=true)\n"+ - " child: %s (admitted into BA subtree)\n"+ - "Cause: SubtreeProcessor.go:861-886 (Phase 2 dequeue filter) and\n"+ - "SubtreeProcessor.go:3731 (dequeueDuringBlockMovement) consult only\n"+ - "removeMap, currentTxMap dedup, and losingTxHashesMap by self-hash.\n"+ - "Neither path checks TxInpoints.ParentTxHashes against any\n"+ - "conflicting-state source. Cascade hashes from\n"+ - "MarkConflictingRecursively are also discarded at\n"+ - "process_conflicting.go:122, so they never reach the filter.\n"+ - "Fix: add conflictingMap, populated recursively, consulted in\n"+ - "Phase 2 filter for both self-hash and parent-inpoints.", - parentHash.String(), childHash.String()) + return stp +} + +func collectSubtreeHashes(stp *SubtreeProcessor) []chainhash.Hash { + out := make([]chainhash.Hash, 0) + for _, st := range stp.chainedSubtrees { + for _, n := range st.Nodes { + out = append(out, n.Hash) + } + } + if cs := stp.currentSubtree.Load(); cs != nil { + for _, n := range cs.Nodes { + out = append(out, n.Hash) + } + } + return out } diff --git a/services/blockassembly/subtreeprocessor/interface.go b/services/blockassembly/subtreeprocessor/interface.go index 47e617ed3c..bb04ce5679 100644 --- a/services/blockassembly/subtreeprocessor/interface.go +++ b/services/blockassembly/subtreeprocessor/interface.go @@ -214,23 +214,6 @@ type Interface interface { // - int: Number of transactions in the removal map GetRemoveMapLength() int - // GetConflictingMap returns the map of transactions known to be conflicting. - // The dequeue paths consult this map to reject children of conflicting - // parents whose own conflicting flag has not yet been cascaded by the - // store-side traversal of recorded spenders. - // - // Returns: - // - txmap.TxMap: Map of conflicting transaction hashes - GetConflictingMap() txmap.TxMap - - // MarkConflicting records the supplied hashes in the processor's - // conflictingMap so that the queue→subtree dequeue path rejects any node - // whose own hash or whose TxInpoints reference any hash in the map. - // Callers must include the cascaded descendants (e.g. the second return - // value of utxo.MarkConflictingRecursively) — not just the seed hashes — - // otherwise the race window stays open. - MarkConflicting(hashes []chainhash.Hash) - // GetChainedSubtrees returns subtrees that are chained together. // These represent transaction dependencies and processing order. // diff --git a/services/blockassembly/subtreeprocessor/mock.go b/services/blockassembly/subtreeprocessor/mock.go index 332719ef85..d38fdaa5bb 100644 --- a/services/blockassembly/subtreeprocessor/mock.go +++ b/services/blockassembly/subtreeprocessor/mock.go @@ -52,15 +52,6 @@ func (m *MockSubtreeProcessor) GetRemoveMapLength() int { return args.Int(0) } -func (m *MockSubtreeProcessor) GetConflictingMap() txmap.TxMap { - args := m.Called() - return args.Get(0).(txmap.TxMap) -} - -func (m *MockSubtreeProcessor) MarkConflicting(hashes []chainhash.Hash) { - m.Called(hashes) -} - func (m *MockSubtreeProcessor) GetCurrentRunningState() State { args := m.Called() return args.Get(0).(State) From 384c7fdfb3ebbc2da633d95662aa9db369ea0801 Mon Sep 17 00:00:00 2001 From: oskarszoon <1449115+oskarszoon@users.noreply.github.com> Date: Mon, 4 May 2026 17:06:44 +0200 Subject: [PATCH 4/4] fix(blockassembly): drain queue with cascade set after loadUnminedTransactions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit loadUnminedTransactions runs while gRPC AddTx is already enqueueing on the input queue. If validateUnminedTxInputs cascades a parent + descendants as conflicting in the UTXO store, any in-flight child of those parents that arrived before the cascade ran is sitting in the queue and will be admitted to the next mining candidate by default-case dequeue. This was the post-reset / startup half of the production race seen on teranode-mainnet-eu-1; PR #806 closed the in-memory subtree side, but the queue side stayed open. Fix: - Add Interface.DrainQueue(dropHashes) — generic queue drain that drops any tx whose hash, or whose TxInpoints.ParentTxHashes entry, is in dropHashes. On parent-match the dropped tx's own hash is added to the set so any later-in-batch descendant is also caught without an extra store round-trip. Implemented on SubtreeProcessor as a thin wrapper over dequeueDuringBlockMovement with skipNotification=true. - BlockAssembler accumulates cascade hashes in unminedDropHashes during loadUnminedTransactions (markAsConflicting writes the MarkConflictingRecursively return there). Field is serialised by the existing unminedTransactionsLoading flag. - BA.Start drains the queue with that set after loadUnminedTransactions returns, before stp.Start fires the goroutine. - BA.Reset's postProcessFn does the same after its loadUnminedTransactions call, before stp.reset's existing post-postProcess drain runs. --- services/blockassembly/BlockAssembler.go | 38 +++++++++++++++++++ .../subtreeprocessor/SubtreeProcessor.go | 19 ++++++++++ .../subtreeprocessor/interface.go | 18 +++++++++ .../blockassembly/subtreeprocessor/mock.go | 5 +++ 4 files changed, 80 insertions(+) diff --git a/services/blockassembly/BlockAssembler.go b/services/blockassembly/BlockAssembler.go index ff00931e20..83ae184593 100644 --- a/services/blockassembly/BlockAssembler.go +++ b/services/blockassembly/BlockAssembler.go @@ -153,6 +153,14 @@ type BlockAssembler struct { // unminedTransactionsLoading indicates if unmined transactions are currently being loaded unminedTransactionsLoading atomic.Bool + // unminedDropHashes accumulates hashes that should be dropped from the + // input queue at the end of loadUnminedTransactions. Populated by + // markAsConflicting via the cascade returned from MarkConflictingRecursively. + // Read once by Start / postProcessFn after loadUnminedTransactions returns, + // then handed to subtreeProcessor.DrainQueue. Serialized by + // unminedTransactionsLoading; must not be touched concurrently. + unminedDropHashes map[chainhash.Hash]struct{} + // wg tracks background goroutines for clean shutdown wg sync.WaitGroup } @@ -559,6 +567,14 @@ func (b *BlockAssembler) reset(ctx context.Context, validateInputs ...bool) erro return errors.NewProcessingError("[Reset] error loading unmined transactions", err) } + // Drop any in-flight children of cascaded conflicting parents from + // the input queue before the existing post-postProcess drain runs + // and before default-case dequeue resumes. + if drop := b.unminedDropHashes; len(drop) > 0 { + b.subtreeProcessor.DrainQueue(drop) + } + b.unminedDropHashes = nil + return nil } @@ -841,6 +857,16 @@ func (b *BlockAssembler) Start(ctx context.Context) (err error) { return errors.NewStorageError("[BlockAssembler] failed to load un-mined transactions: %v", err) } + // AddTx is already enqueueing on the gRPC side. If loadUnminedTransactions + // flagged any tx as conflicting (and cascaded its descendants), drain the + // input queue with that set as a drop filter before the event-loop + // goroutine starts — otherwise in-flight children whose parent was just + // flagged would be admitted to the next mining candidate. + if drop := b.unminedDropHashes; len(drop) > 0 { + b.subtreeProcessor.DrainQueue(drop) + } + b.unminedDropHashes = nil + // Start SubtreeProcessor goroutine after loading unmined transactions to avoid race conditions b.subtreeProcessor.Start(ctx) @@ -2043,6 +2069,10 @@ func (b *BlockAssembler) loadUnminedTransactions(ctx context.Context, validateIn b.logger.Infof("[loadUnminedTransactions] unmined transaction loading completed") }() + // Reset the accumulator: any cascade fired during this load goes here so + // the caller can drain the input queue with this set as a drop filter. + b.unminedDropHashes = make(map[chainhash.Hash]struct{}) + if b.utxoStore == nil { return errors.NewServiceError("[BlockAssembler] no utxostore") } @@ -2509,6 +2539,14 @@ func (b *BlockAssembler) markAsConflicting(ctx context.Context, txHash chainhash return } + // Stash cascade hashes for the post-load DrainQueue call. Safe because + // loadUnminedTransactions is serialised by unminedTransactionsLoading. + if b.unminedDropHashes != nil { + for _, h := range cascadedHashes { + b.unminedDropHashes[h] = struct{}{} + } + } + for _, h := range cascadedHashes { if removeErr := b.subtreeProcessor.Remove(ctx, h); removeErr != nil { b.logger.Warnf("[validateUnminedTxInputs][%s] failed to evict cascaded tx from subtree processor: %v", h.String(), removeErr) diff --git a/services/blockassembly/subtreeprocessor/SubtreeProcessor.go b/services/blockassembly/subtreeprocessor/SubtreeProcessor.go index 9bdba1dae9..741c0c1d38 100644 --- a/services/blockassembly/subtreeprocessor/SubtreeProcessor.go +++ b/services/blockassembly/subtreeprocessor/SubtreeProcessor.go @@ -3733,6 +3733,25 @@ func (stp *SubtreeProcessor) WaitForPendingBlocks(ctx context.Context) error { return err } +// DrainQueue is the public entry point for BlockAssembler to flush in-flight +// children of conflicting parents from the input queue before the event-loop +// goroutine starts (BA.Start path) or before the existing post-postProcess +// drain runs (Reset path's postProcess closure). +// +// Internally calls dequeueDuringBlockMovement with the supplied transient +// drop set as the filter. Non-conflicting txs in the queue are added to the +// current subtree as part of the same drain — same routing as default-case +// dequeue would have used. skipNotification=true so this is safe to invoke +// before subtree-announcement listeners are wired up. +func (stp *SubtreeProcessor) DrainQueue(dropHashes map[chainhash.Hash]struct{}) { + if len(dropHashes) == 0 { + return + } + if err := stp.dequeueDuringBlockMovement(nil, nil, dropHashes, true); err != nil { + stp.logger.Errorf("[SubtreeProcessor][DrainQueue] error: %v", err) + } +} + // dequeueDuringBlockMovement processes the transaction queue during block movement. // // Parameters: diff --git a/services/blockassembly/subtreeprocessor/interface.go b/services/blockassembly/subtreeprocessor/interface.go index bb04ce5679..26fa860c04 100644 --- a/services/blockassembly/subtreeprocessor/interface.go +++ b/services/blockassembly/subtreeprocessor/interface.go @@ -151,6 +151,24 @@ type Interface interface { // - error: Any error encountered during transaction removal Remove(ctx context.Context, hash chainhash.Hash) error + // DrainQueue drains the input queue and routes valid txs the same way the + // during-block-movement drain does, with one filter: any tx whose own hash + // is in dropHashes, or whose TxInpoints.ParentTxHashes contains a hash in + // dropHashes, is dropped on the floor. On parent match the dropped tx's + // own hash is added to dropHashes so any later-in-batch descendants are + // also caught without an extra store round-trip. + // + // Used by BlockAssembler after loadUnminedTransactions has flagged a set + // of txs as conflicting in the UTXO store (and cascaded their descendants). + // AddTx is already enqueueing on the gRPC side at that point, so by the + // time the event-loop goroutine is started (or resumed, for the Reset + // path) the queue can hold in-flight children whose parents were just + // flagged. Without this drain those children land in the next mining + // candidate. + // + // The set is scoped to a single drain and discarded by the caller. + DrainQueue(dropHashes map[chainhash.Hash]struct{}) + // GetCompletedSubtreesForMiningCandidate returns completed subtrees ready for mining. // These subtrees contain validated transactions that can be included in a block. // diff --git a/services/blockassembly/subtreeprocessor/mock.go b/services/blockassembly/subtreeprocessor/mock.go index d38fdaa5bb..b7a993e48f 100644 --- a/services/blockassembly/subtreeprocessor/mock.go +++ b/services/blockassembly/subtreeprocessor/mock.go @@ -214,6 +214,11 @@ func (m *MockSubtreeProcessor) Remove(ctx context.Context, hash chainhash.Hash) return args.Error(0) } +// DrainQueue implements Interface.DrainQueue +func (m *MockSubtreeProcessor) DrainQueue(dropHashes map[chainhash.Hash]struct{}) { + m.Called(dropHashes) +} + // GetCompletedSubtreesForMiningCandidate implements Interface.GetCompletedSubtreesForMiningCandidate func (m *MockSubtreeProcessor) GetCompletedSubtreesForMiningCandidate() []*subtree.Subtree { args := m.Called()