diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 81edf09b09..14a9573d55 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2015-2025 The Decred developers +// Copyright (c) 2015-2026 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -1056,7 +1056,8 @@ func (m *SyncManager) OnMixMsg(peer *Peer, msg mixing.Message) ([]mixing.Message return nil, nil } - accepted, err := m.cfg.MixPool.AcceptMessage(msg) + source := mixpool.Uint64Source(peer.ID()) + accepted, err := m.cfg.MixPool.AcceptMessage(msg, source) // Remove message from request maps. Either the mixpool already knows // about it and as such we shouldn't have any more instances of trying diff --git a/internal/rpcserver/interface.go b/internal/rpcserver/interface.go index 0316d1f5fe..e230331496 100644 --- a/internal/rpcserver/interface.go +++ b/internal/rpcserver/interface.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2025 The Decred developers +// Copyright (c) 2019-2026 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -20,6 +20,7 @@ import ( "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/math/uint256" "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/mixpool" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4/stdaddr" @@ -200,9 +201,9 @@ type SyncManager interface { // This method may report a false positive, but never a false negative. RecentlyConfirmedTxn(hash *chainhash.Hash) bool - // SubmitMixMessage submits the mixing message to the network after - // processing it locally. - SubmitMixMessage(msg mixing.Message) error + // AcceptMixMessage attempts to accept a mixing message to the local mixing + // pool. + AcceptMixMessage(msg mixing.Message, src mixpool.Source) error } // UtxoEntry represents a utxo entry for use with the RPC server. diff --git a/internal/rpcserver/rpcserver.go b/internal/rpcserver/rpcserver.go index 5245369113..1a609ff0c5 100644 --- a/internal/rpcserver/rpcserver.go +++ b/internal/rpcserver/rpcserver.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2015-2025 The Decred developers +// Copyright (c) 2015-2026 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -50,6 +50,7 @@ import ( "github.com/decred/dcrd/internal/mining/cpuminer" "github.com/decred/dcrd/internal/version" "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/mixpool" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4" "github.com/decred/dcrd/txscript/v4/stdaddr" @@ -4348,7 +4349,8 @@ func handleSendRawMixMessage(_ context.Context, s *Server, cmd interface{}) (int msg.WriteHash(s.blake256Hasher) s.blake256HaserMu.Unlock() - err = s.cfg.SyncMgr.SubmitMixMessage(msg) + // Use 0 for the source to represent the local node. + err = s.cfg.SyncMgr.AcceptMixMessage(msg, mixpool.ZeroSource) if err != nil { // XXX: consider a better error code/function str := fmt.Sprintf("Rejected mix message: %s", err) diff --git a/internal/rpcserver/rpcserverhandlers_test.go b/internal/rpcserver/rpcserverhandlers_test.go index bacfa0c7fb..e642d448a6 100644 --- a/internal/rpcserver/rpcserverhandlers_test.go +++ b/internal/rpcserver/rpcserverhandlers_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2025 The Decred developers +// Copyright (c) 2020-2026 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -44,6 +44,7 @@ import ( "github.com/decred/dcrd/internal/version" "github.com/decred/dcrd/math/uint256" "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/mixpool" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/rpc/jsonrpc/types/v4" "github.com/decred/dcrd/txscript/v4" @@ -572,7 +573,7 @@ func (c *testAddrManager) LocalAddresses() []addrmgr.LocalAddr { type testSyncManager struct { isCurrent bool submitBlockErr error - submitMixErr error + acceptMixErr error syncPeerID int32 syncHeight int64 processTransaction []*dcrutil.Tx @@ -592,10 +593,6 @@ func (s *testSyncManager) SubmitBlock(block *dcrutil.Block) error { return s.submitBlockErr } -func (s *testSyncManager) SubmitMixMessage(msg mixing.Message) error { - return s.submitMixErr -} - // SyncPeer returns a mocked id of the current peer being synced with. func (s *testSyncManager) SyncPeerID() int32 { return s.syncPeerID @@ -619,6 +616,12 @@ func (s *testSyncManager) RecentlyConfirmedTxn(hash *chainhash.Hash) bool { return s.recentlyConfirmedTxn } +// AcceptMixMessage provides a mock implementation for attempting to accept a +// mixing message to the local mixing pool. +func (s *testSyncManager) AcceptMixMessage(msg mixing.Message, src mixpool.Source) error { + return s.acceptMixErr +} + // testExistsAddresser provides a mock exists addresser by implementing the // ExistsAddresser interface. type testExistsAddresser struct { diff --git a/mixing/mixclient/client_test.go b/mixing/mixclient/client_test.go index 1f4d88d0e0..c03932bf08 100644 --- a/mixing/mixclient/client_test.go +++ b/mixing/mixclient/client_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2024 The Decred developers +// Copyright (c) 2024-2026 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -114,7 +114,7 @@ func (w *testWallet) SignInput(tx *wire.MsgTx, index int, prevScript []byte) err } func (w *testWallet) SubmitMixMessage(ctx context.Context, msg mixing.Message) error { - _, err := w.mixpool.AcceptMessage(msg) + _, err := w.mixpool.AcceptMessage(msg, mixpool.ZeroSource) return err } diff --git a/mixing/mixpool/mixpool.go b/mixing/mixpool/mixpool.go index 98888d0997..2bf0455bc8 100644 --- a/mixing/mixpool/mixpool.go +++ b/mixing/mixpool/mixpool.go @@ -1,4 +1,4 @@ -// Copyright (c) 2023-2025 The Decred developers +// Copyright (c) 2023-2026 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -10,6 +10,7 @@ import ( "bytes" "context" "fmt" + "slices" "sort" "sync" "time" @@ -30,6 +31,17 @@ const minconf = 1 const feeRate = 0.0001e8 const earlyKEDuration = 5 * time.Second +const ( + // maxOrphans specifies the maximum number of orphans allowed in the orphan + // pool at one time. + maxOrphans = 250 + + // maxPostEvictionOrphans is the maximum number of orphans to keep in the + // pool after a forced eviction occurs due to exceeding the overall max + // limit. It is set to 75% of the overall max limit. + maxPostEvictionOrphans = maxOrphans * 3 / 4 +) + type idPubKey = [33]byte type msgtype int @@ -85,6 +97,27 @@ func (m msgtype) String() string { } } +// Source represents a source of mixing messages. This is typically the peer +// that first relayed them, but the caller may choose any scheme it desires. +type Source interface { + // ID returns an opaque identifier that uniquely identifies the source. + ID() uint64 +} + +// Uint64Source implements the [Source] interface by returning the associated +// uint64 as the ID. This is primarily useful as a convenience for callers that +// do not require an additional object associated with the source. +type Uint64Source uint64 + +// ID returns the underlying uint64 associated with the source. +func (s Uint64Source) ID() uint64 { return uint64(s) } + +// Ensure [Uint64Source] implements the [Source] interface. +var _ Source = (*Uint64Source)(nil) + +// ZeroSource implements the [Source] interface by returning 0 for the ID. +const ZeroSource = Uint64Source(0) + // entry describes non-PR messages accepted to the pool. type entry struct { hash chainhash.Hash @@ -94,8 +127,9 @@ type entry struct { msgtype msgtype } -type orphan struct { +type orphanMsg struct { message mixing.Message + src Source accepted time.Time } @@ -145,8 +179,8 @@ type Pool struct { prs map[chainhash.Hash]*wire.MsgMixPairReq outPoints map[wire.OutPoint]chainhash.Hash pool map[chainhash.Hash]entry - orphans map[chainhash.Hash]*orphan - orphansByID map[idPubKey]map[chainhash.Hash]mixing.Message + orphans map[chainhash.Hash]*orphanMsg + orphansByID map[idPubKey]map[chainhash.Hash]*orphanMsg messagesByIdentity map[idPubKey][]chainhash.Hash latestKE map[idPubKey]*wire.MsgMixKeyExchange sessions map[[32]byte]*session @@ -228,8 +262,8 @@ func NewPool(blockchain BlockChain) *Pool { prs: make(map[chainhash.Hash]*wire.MsgMixPairReq), outPoints: make(map[wire.OutPoint]chainhash.Hash), pool: make(map[chainhash.Hash]entry), - orphans: make(map[chainhash.Hash]*orphan), - orphansByID: make(map[idPubKey]map[chainhash.Hash]mixing.Message), + orphans: make(map[chainhash.Hash]*orphanMsg), + orphansByID: make(map[idPubKey]map[chainhash.Hash]*orphanMsg), messagesByIdentity: make(map[idPubKey][]chainhash.Hash), latestKE: make(map[idPubKey]*wire.MsgMixKeyExchange), sessions: make(map[[32]byte]*session), @@ -474,6 +508,46 @@ func (p *Pool) removeMessage(hash chainhash.Hash) { p.maybeLogRecentMixMsgsNumEvicted() } +// removeOrphan removes the message associated with the passed hash from the +// orphan pool and orphans by ID index. +// +// This function MUST be called with the mixpool lock held (for writes). +func (p *Pool) removeOrphan(hash *chainhash.Hash, id *idPubKey) { + // Remove the message from the orphan pool and the reference from the + // orphans by ID index. + delete(p.orphans, *hash) + orphansByID := p.orphansByID[*id] + delete(orphansByID, *hash) + + // Remove the map entry altogether if there are no longer any orphans which + // depend on it. + if len(orphansByID) == 0 { + delete(p.orphansByID, *id) + } + + log.Tracef("Removed orphan %v (pool size %v)", hash, len(p.orphans)) +} + +// removeOrphansBySourceID removes up to the maximum specified number of orphan +// messages associated with the provided source ID and returns the number of +// entries removed. + +// This function MUST be called with the mixpool lock held (for writes). +func (p *Pool) removeOrphansBySourceID(srcID uint64, maxToEvict uint64) uint64 { + var numEvicted uint64 + for hash, orphan := range p.orphans { + if numEvicted >= maxToEvict { + break + } + if orphan.src.ID() == srcID { + id := (*idPubKey)(orphan.message.Pub()) + p.removeOrphan(&hash, id) + numEvicted++ + } + } + return numEvicted +} + // ExpireMessages immediately expires all pair requests and sessions built // from them that indicate expiry at or after a block height. func (p *Pool) ExpireMessages(height uint32) { @@ -517,8 +591,7 @@ func (p *Pool) expireMessagesNow(height uint32) { } } if expire { - delete(p.orphans, hash) - delete(p.orphansByID, *(*idPubKey)(o.message.Pub())) + p.removeOrphan(&hash, (*idPubKey)(o.message.Pub())) } } } @@ -969,6 +1042,88 @@ Loop: var zeroHash chainhash.Hash +// limitNumOrphans limits the number of orphan mixing messages by evicting a +// subset of the existing orphans when adding a new one would cause it to +// overflow the max allowed. +// +// This function MUST be called with the mixpool lock held (for writes). +func (p *Pool) limitNumOrphans() { + // Nothing to do if adding another orphan will not cause the pool to exceed + // the limit. + if len(p.orphans)+1 <= maxOrphans { + return + } + + // Determine which sources have the most orphans associated with them and + // then remove all of the orphans associated with each source in descending + // order until the orphan pool has reached the target maximum number of post + // eviction orphans allowed. + // + // This approach is fairly efficient since it naturally limits the frequency + // of eviction algorithm execution. Further, in practice, orphan messages + // are quite rare after initial startup where ongoing mixing sessions are + // discovered, so any peer sending a lot of orphans is likely experiencing + // severe connectivity issues or otherwise misbehaving. This approach also + // has the added benefit of handling a variety of orphan flooding + // misbehavior well. + srcCounters := make(map[uint64]int) + for _, orphan := range p.orphans { + srcCounters[orphan.src.ID()]++ + } + type srcWithCount struct { + srcID uint64 + count int + } + srcCounts := make([]srcWithCount, 0, len(srcCounters)) + for srcID, count := range srcCounters { + srcCounts = append(srcCounts, srcWithCount{srcID, count}) + } + slices.SortFunc(srcCounts, func(a, b srcWithCount) int { + return b.count - a.count + }) + numOrphans := uint64(len(p.orphans)) + for numOrphans > maxPostEvictionOrphans && len(srcCounts) > 0 { + srcID := srcCounts[0].srcID + maxToEvict := numOrphans - maxPostEvictionOrphans + numEvicted := p.removeOrphansBySourceID(srcID, maxToEvict) + log.Tracef("Removed %d orphans with source ID %d", numEvicted, srcID) + numOrphans -= numEvicted + srcCounts = srcCounts[1:] + } +} + +// addOrphan adds the passed message to the orphan pool when it is not already +// present. +// +// It also potentially removes orphans to make room when necessary. +// +// This function MUST be called with the mixpool lock held (for writes). +func (p *Pool) addOrphan(msg mixing.Message, hash *chainhash.Hash, id *idPubKey, src Source) { + orphansByID := p.orphansByID[*id] + if _, ok := orphansByID[*hash]; ok { + // Already an orphan. + return + } + + // Limit the number of orphan mixing messages to prevent memory exhaustion. + p.limitNumOrphans() + + orphan := &orphanMsg{ + message: msg, + src: src, + accepted: time.Now(), + } + p.orphans[*hash] = orphan + if orphansByID == nil { + orphansByID = make(map[chainhash.Hash]*orphanMsg) + p.orphansByID[*id] = orphansByID + } + orphansByID[*hash] = orphan + + log.Debugf("Stored orphan message %T %v (pool size: %d)", msg, hash, + len(p.orphans)) +} + // AcceptMessage accepts a mixing message to the pool. // // Messages must contain the mixing participant's identity and contain a valid @@ -980,7 +1135,7 @@ var zeroHash chainhash.Hash // // All newly accepted messages, including any orphan key exchange messages // that were processed after processing missing pair requests, are returned. -func (p *Pool) AcceptMessage(msg mixing.Message) (accepted []mixing.Message, err error) { +func (p *Pool) AcceptMessage(msg mixing.Message, src Source) (accepted []mixing.Message, err error) { defer func() { if err == nil && len(accepted) == 0 { // Don't log duplicate messages or non-KE orphans. @@ -1075,7 +1230,7 @@ func (p *Pool) AcceptMessage(msg mixing.Message) (accepted []mixing.Message, err p.mtx.Lock() defer p.mtx.Unlock() - accepted, err := p.acceptKE(msg, &hash, id) + accepted, err := p.acceptKE(msg, &hash, id, src) if err != nil { return nil, err } @@ -1133,20 +1288,7 @@ func (p *Pool) AcceptMessage(msg mixing.Message) (accepted []mixing.Message, err } // Save as an orphan if their KE is not (yet) accepted. if !haveKE { - orphansByID := p.orphansByID[*id] - if _, ok := orphansByID[hash]; ok { - // Already an orphan. - return nil, nil - } - if orphansByID == nil { - orphansByID = make(map[chainhash.Hash]mixing.Message) - p.orphansByID[*id] = orphansByID - } - p.orphans[hash] = &orphan{ - message: msg, - accepted: time.Now(), - } - orphansByID[hash] = msg + p.addOrphan(msg, &hash, id, src) // TODO: Consider return an error containing the unknown // messages, so they can be getdata'd. @@ -1197,9 +1339,8 @@ func (p *Pool) removePR(pr *wire.MsgMixPairReq, reason string) { delete(p.messagesByIdentity, pr.Identity) delete(p.latestKE, pr.Identity) for orphanHash := range p.orphansByID[pr.Identity] { - delete(p.orphans, orphanHash) + p.removeOrphan(&orphanHash, &pr.Identity) } - delete(p.orphansByID, pr.Identity) for i := range pr.UTXOs { delete(p.outPoints, pr.UTXOs[i].OutPoint) } @@ -1317,42 +1458,35 @@ func (p *Pool) reconsiderOrphans(accepted mixing.Message, id *idPubKey) []mixing // If the accepted message was a PR, there may be KE orphans that can // be accepted now. if pr, ok := accepted.(*wire.MsgMixPairReq); ok { - var orphanKEs []*wire.MsgMixKeyExchange + var orphanKEs []*orphanMsg for _, orphan := range p.orphansByID[*id] { - orphanKE, ok := orphan.(*wire.MsgMixKeyExchange) + orphanKE, ok := orphan.message.(*wire.MsgMixKeyExchange) if !ok { continue } - refsAcceptedPR := false - for _, prHash := range orphanKE.SeenPRs { - if pr.Hash() == prHash { - refsAcceptedPR = true - break - } - } - if !refsAcceptedPR { + if !slices.Contains(orphanKE.SeenPRs, pr.Hash()) { continue } - orphanKEs = append(orphanKEs, orphanKE) + orphanKEs = append(orphanKEs, orphan) } - for _, orphanKE := range orphanKEs { + for _, orphan := range orphanKEs { + orphanKE := orphan.message.(*wire.MsgMixKeyExchange) orphanKEHash := orphanKE.Hash() - _, err := p.acceptKE(orphanKE, &orphanKEHash, &orphanKE.Identity) + _, err := p.acceptKE(orphanKE, &orphanKEHash, &orphanKE.Identity, + orphan.src) if err != nil { log.Debugf("orphan KE could not be accepted: %v", err) continue } kes = append(kes, orphanKE) - delete(p.orphansByID[*id], orphanKEHash) - delete(p.orphans, orphanKEHash) + p.removeOrphan(&orphanKEHash, id) acceptedMessages = append(acceptedMessages, orphanKE) } if len(p.orphansByID[*id]) == 0 { - delete(p.orphansByID, *id) return acceptedMessages } } @@ -1368,8 +1502,8 @@ func (p *Pool) reconsiderOrphans(accepted mixing.Message, id *idPubKey) []mixing continue } - var acceptedOrphans []mixing.Message - for orphanHash, orphan := range p.orphansByID[*id] { + for orphanHash, omsg := range p.orphansByID[*id] { + orphan := omsg.message if !bytes.Equal(orphan.Sid(), ke.SessionID[:]) { continue } @@ -1395,16 +1529,10 @@ func (p *Pool) reconsiderOrphans(accepted mixing.Message, id *idPubKey) []mixing p.acceptEntry(orphan, msgtype, &orphanHash, id, ses) - acceptedOrphans = append(acceptedOrphans, orphan) acceptedMessages = append(acceptedMessages, orphan) - } - for _, orphan := range acceptedOrphans { - orphanHash := orphan.Hash() - delete(p.orphansByID[*id], orphanHash) - delete(p.orphans, orphanHash) + p.removeOrphan(&orphanHash, id) } if len(p.orphansByID[*id]) == 0 { - delete(p.orphansByID, *id) return acceptedMessages } } @@ -1499,7 +1627,7 @@ func (p *Pool) checkAcceptKE(ke *wire.MsgMixKeyExchange) error { return nil } -func (p *Pool) acceptKE(ke *wire.MsgMixKeyExchange, hash *chainhash.Hash, id *idPubKey) (accepted *wire.MsgMixKeyExchange, err error) { +func (p *Pool) acceptKE(ke *wire.MsgMixKeyExchange, hash *chainhash.Hash, id *idPubKey, src Source) (accepted *wire.MsgMixKeyExchange, err error) { // Check if already accepted. if _, ok := p.pool[*hash]; ok { return nil, nil @@ -1551,16 +1679,7 @@ func (p *Pool) acceptKE(ke *wire.MsgMixKeyExchange, hash *chainhash.Hash, id *id } } if missingOwnPR != nil { - p.orphans[*hash] = &orphan{ - message: ke, - accepted: time.Now(), - } - orphansByID := p.orphansByID[*id] - if orphansByID == nil { - orphansByID = make(map[chainhash.Hash]mixing.Message) - p.orphansByID[*id] = orphansByID - } - orphansByID[*hash] = ke + p.addOrphan(ke, hash, id, src) err := &MissingOwnPRError{ MissingPR: *missingOwnPR, } diff --git a/mixing/mixpool/mixpool_test.go b/mixing/mixpool/mixpool_test.go index bdfb5a814c..abaec1e373 100644 --- a/mixing/mixpool/mixpool_test.go +++ b/mixing/mixpool/mixpool_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2023-2024 The Decred developers +// Copyright (c) 2023-2026 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -225,7 +225,7 @@ func TestAccept(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = p.AcceptMessage(pr) + _, err = p.AcceptMessage(pr, ZeroSource) if err != nil { t.Fatal(err) } @@ -271,7 +271,7 @@ func TestAccept(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = p.AcceptMessage(ke) + _, err = p.AcceptMessage(ke, ZeroSource) if err != nil { t.Fatal(err) } @@ -289,7 +289,7 @@ func TestAccept(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = p.AcceptMessage(ct) + _, err = p.AcceptMessage(ct, ZeroSource) if err != nil { t.Fatal(err) } @@ -330,7 +330,7 @@ func TestAccept(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = p.AcceptMessage(sr) + _, err = p.AcceptMessage(sr, ZeroSource) if err != nil { t.Fatal(err) } @@ -365,7 +365,7 @@ func TestAccept(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = p.AcceptMessage(dc) + _, err = p.AcceptMessage(dc, ZeroSource) if err != nil { t.Fatal(err) } @@ -399,7 +399,7 @@ func TestAccept(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = p.AcceptMessage(cm) + _, err = p.AcceptMessage(cm, ZeroSource) if err != nil { t.Fatal(err) } diff --git a/mixing/mixpool/orphans_test.go b/mixing/mixpool/orphans_test.go index 48ffd7917f..54319284f6 100644 --- a/mixing/mixpool/orphans_test.go +++ b/mixing/mixpool/orphans_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2024 The Decred developers +// Copyright (c) 2024-2026 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -153,7 +153,7 @@ func TestOrphans(t *testing.T) { mp := NewPool(newTestBlockchain()) for j, a := range accepts { - accepted, err := mp.AcceptMessage(a.message) + accepted, err := mp.AcceptMessage(a.message, ZeroSource) if err != nil != a.errors { t.Errorf("test %d call %d %q: unexpected error: %v", i, j, a.desc, err) } @@ -173,3 +173,117 @@ func TestOrphans(t *testing.T) { } } } + +// TestOrphanEviction ensures that exceeding the maximum number of orphans +// evicts entries to make room for the new ones. +func TestOrphanEviction(t *testing.T) { + pub, priv, err := generateSecp256k1(nil) + if err != nil { + t.Fatal(err) + } + id := *(*[33]byte)(pub.SerializeCompressed()) + + // createOrphanKEAndCT returns a signed mix key exchange and cyphertexts + // message associated with a unique pair request. + h := blake256.NewHasher256() + var nextPRNum uint32 + createOrphanKEAndCT := func() (*wire.MsgMixKeyExchange, *wire.MsgMixCiphertexts) { + h.Reset() + h.WriteUint32LE(nextPRNum) + nextPRNum++ + randPrevOut := wire.OutPoint{Hash: h.Sum256()} + randomUTXO := wire.MixPairReqUTXO{OutPoint: randPrevOut} + pr := &wire.MsgMixPairReq{ + Identity: id, + UTXOs: []wire.MixPairReqUTXO{randomUTXO}, + MessageCount: 1, + Expiry: testStartingHeight + 10, + ScriptClass: string(mixing.ScriptClassP2PKHv0), + InputValue: 1 << 18, + } + err = mixing.SignMessage(pr, priv) + if err != nil { + t.Fatal(err) + } + pr.WriteHash(h) + + prs := []*wire.MsgMixPairReq{pr} + epoch := uint64(time.Now().Unix()) + sid := mixing.SortPRsForSession(prs, epoch) + ke := &wire.MsgMixKeyExchange{ + Identity: id, + SessionID: sid, + Epoch: epoch, + Run: 0, + SeenPRs: []chainhash.Hash{ + pr.Hash(), + }, + } + err = mixing.SignMessage(ke, priv) + if err != nil { + t.Fatal(err) + } + ke.WriteHash(h) + + seenKEs := []chainhash.Hash{ke.Hash()} + ct := &wire.MsgMixCiphertexts{ + Identity: id, + SessionID: sid, + Run: 0, + SeenKeyExchanges: seenKEs, + } + err = mixing.SignMessage(ct, priv) + if err != nil { + t.Fatal(err) + } + ct.WriteHash(h) + + return ke, ct + } + + // Create enough orphan key exchange and cyphertext messages to be able to + // exceed the maximum number of orphans allowed in the pool. + orphans := make([]mixing.Message, 0, maxOrphans+2) + for range maxOrphans/2 + 1 { + ke, ct := createOrphanKEAndCT() + orphans = append(orphans, ke, ct) + } + + // Fill up the orphan pool to the max allowed with orphan messages while + // also pretending as though the orphans came from different sources. + p := NewPool(newTestBlockchain()) + acceptOrphanAndCheck := func(msg mixing.Message, srcID uint64) { + accepted, err := p.AcceptMessage(msg, Uint64Source(srcID)) + if len(accepted) != 0 { + t.Fatalf("accepted orphan message %T to main pool", msg) + } + if _, ok := msg.(*wire.MsgMixKeyExchange); ok { + var ownPRErr *MissingOwnPRError + if !errors.As(err, &ownPRErr) { + t.Fatalf("unexpected error for orphan message: %v", err) + } + } + } + const numSources = 5 + numOrphansPerSource := (maxOrphans + 1 + (numSources - 1)) / numSources + for i := range maxOrphans { + orphan := orphans[i] + srcID := uint64(i/numOrphansPerSource) + 1 + acceptOrphanAndCheck(orphan, srcID) + } + + // Ensure the orphan pool is the max allowed. + assertOrphanPoolSize := func(expected int) { + if len(p.orphans) != expected { + t.Fatalf("unexpected orphan pool size - got %d, want %d", + len(p.orphans), expected) + } + } + assertOrphanPoolSize(maxOrphans) + + // Ensure adding another orphan causes evictions of orphans tagged from + // the sources that submitted the most until the pool only retains the + // target number post eviction plus one for the orphan being added. + acceptOrphanAndCheck(orphans[maxOrphans], numSources) + assertOrphanPoolSize(maxPostEvictionOrphans + 1) +} diff --git a/rpcadaptors.go b/rpcadaptors.go index 8f411fe737..66e81ddbe0 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -1,5 +1,5 @@ // Copyright (c) 2017 The btcsuite developers -// Copyright (c) 2015-2025 The Decred developers +// Copyright (c) 2015-2026 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -22,6 +22,7 @@ import ( "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" "github.com/decred/dcrd/mixing" + "github.com/decred/dcrd/mixing/mixpool" "github.com/decred/dcrd/peer/v3" "github.com/decred/dcrd/wire" ) @@ -444,9 +445,10 @@ func (b *rpcSyncMgr) RecentlyConfirmedTxn(hash *chainhash.Hash) bool { return b.server.recentlyConfirmedTxns.Contains(hash[:]) } -// SubmitMixMessage locally processes the mixing message. -func (b *rpcSyncMgr) SubmitMixMessage(msg mixing.Message) error { - _, err := b.server.mixMsgPool.AcceptMessage(msg) +// AcceptMixMessage attempts to accept a mixing message to the local mixing +// pool. +func (b *rpcSyncMgr) AcceptMixMessage(msg mixing.Message, src mixpool.Source) error { + _, err := b.server.mixMsgPool.AcceptMessage(msg, src) return err }