From 6617c338d891160bc0110966b458e60f5387127b Mon Sep 17 00:00:00 2001 From: Chen Kai <281165273grape@gmail.com> Date: Sun, 11 May 2025 14:33:12 +0800 Subject: [PATCH 01/12] pipeline read discv5 Signed-off-by: Chen Kai <281165273grape@gmail.com> --- p2p/discover/v5_udp.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 677143fd60b..beb766e6977 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -619,9 +619,15 @@ func (t *UDPv5) dispatch() { t.send(r.destID, r.destAddr, r.msg, nil) case p := <-t.packetInCh: + // Arm next read immediately to allow pipelining. + // The readLoop can start reading the next packet while this one is being handled. + // Backpressure is still maintained by packetInCh (buffer 1) and readNextCh (buffer 1). + select { + case t.readNextCh <- struct{}{}: + case <-t.closeCtx.Done(): // Avoid blocking on send if closing + return + } t.handlePacket(p.Data, p.Addr) - // Arm next read. - t.readNextCh <- struct{}{} case <-t.closeCtx.Done(): close(t.readNextCh) From 62d6517c06ea921a69e058e8e8a79464493f1e4d Mon Sep 17 00:00:00 2001 From: Chen Kai <281165273grape@gmail.com> Date: Sun, 11 May 2025 16:53:48 +0800 Subject: [PATCH 02/12] make send in separate goroutine Signed-off-by: Chen Kai <281165273grape@gmail.com> --- go.mod | 1 + go.sum | 2 ++ p2p/discover/v5_udp.go | 62 ++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d27af647ece..0432acf97a9 100644 --- a/go.mod +++ b/go.mod @@ -118,6 +118,7 @@ require ( github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect + github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/minio/sha256-simd v1.0.0 // indirect diff --git a/go.sum b/go.sum index 200b3725eaa..575c284827b 100644 --- a/go.sum +++ b/go.sum @@ -238,6 +238,8 @@ github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4F github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leanovate/gopter v0.2.11 h1:vRjThO1EKPb/1NsDXuDrzldR28RLkBflWYcU9CvzWu4= github.com/leanovate/gopter v0.2.11/go.mod h1:aK3tzZP/C+p1m3SPRE4SYZFGP7jjkuSI4f7Xvpt0S9c= +github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= +github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ= diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index beb766e6977..434962b7256 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -37,6 +37,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/netutil" + bufferpool "github.com/libp2p/go-buffer-pool" ) const ( @@ -99,6 +100,7 @@ type UDPv5 struct { sendCh chan sendRequest sendNoRespCh chan *sendNoRespRequest unhandled chan<- ReadPacket + writeCh chan pendingWrite // New channel for outgoing packets // state of dispatch codec codecV5 @@ -114,6 +116,12 @@ type UDPv5 struct { wg sync.WaitGroup } +// pendingWrite holds data for a packet to be sent by the writeLoop. +type pendingWrite struct { + toAddr netip.AddrPort + data []byte +} + type sendRequest struct { destID enode.ID destAddr netip.AddrPort @@ -158,9 +166,10 @@ func ListenV5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { return nil, err } go t.tab.loop() - t.wg.Add(2) + t.wg.Add(3) go t.readLoop() go t.dispatch() + go t.writeLoop() return t, nil } @@ -188,6 +197,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { sendNoRespCh: make(chan *sendNoRespRequest), respTimeoutCh: make(chan *callTimeout), unhandled: cfg.Unhandled, + writeCh: make(chan pendingWrite, 128), // Buffered channel for outgoing packets // state of dispatch codec: v5wire.NewCodec(ln, cfg.PrivateKey, cfg.Clock, cfg.V5ProtocolID), activeCallByNode: make(map[enode.ID]*callV5), @@ -631,6 +641,7 @@ func (t *UDPv5) dispatch() { case <-t.closeCtx.Done(): close(t.readNextCh) + close(t.writeCh) for id, queue := range t.callQueue { for _, c := range queue { c.err <- errClosed @@ -770,10 +781,55 @@ func (t *UDPv5) send(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet, t.log.Warn(">> "+packet.Name(), t.logcontext...) return nonce, err } + t.logcontext = append(t.logcontext, "rawpacket", hexutil.Encode(enc)) - _, err = t.conn.WriteToUDPAddrPort(enc, toAddr) t.log.Trace(">> "+packet.Name(), t.logcontext...) - return nonce, err + + dataForSend := bufferpool.Get(len(enc)) + copy(dataForSend, enc) + + pw := pendingWrite{ + toAddr: toAddr, + data: dataForSend, // codec.Encode should return a new slice, safe to pass directly. + } + + select { + case t.writeCh <- pw: + // Packet successfully queued. + return nonce, nil + case <-t.closeCtx.Done(): + bufferpool.Put(dataForSend) + return nonce, errClosed + } +} + +// writeLoop runs in its own goroutine and sends packets from the writeCh to the network. +func (t *UDPv5) writeLoop() { + defer t.wg.Done() + for pw := range t.writeCh { // Loop continues until writeCh is closed and empty. + _, err := t.conn.WriteToUDPAddrPort(pw.data, pw.toAddr) + if err != nil { + // Generic error logging, as we don't have packetName or rich context here. + select { + case <-t.closeCtx.Done(): + // Log trace level if error occurs during or after shutdown initiation. + t.log.Trace("UDP write error during/after shutdown", "addr", pw.toAddr, "err", err) + default: + // Not closing, so it's a more unexpected error. + if netutil.IsTemporaryError(err) { + t.log.Debug("Temporary UDP write error", "addr", pw.toAddr, "err", err) + } else if !errors.Is(err, net.ErrClosed) && !errors.Is(err, io.EOF) { // Avoid logging common "closed" errors if not caught by closeCtx. + t.log.Warn("UDP write error", "addr", pw.toAddr, "err", err) + } + } + } else { + // Minimal trace log confirming the actual send. + // The detailed packet-specific trace was done in the 'send' method. + t.log.Trace("UDP packet data sent", "addr", pw.toAddr, "len", len(pw.data)) + } + + bufferpool.Put(pw.data) + } } // readLoop runs in its own goroutine and reads packets from the network. From 6f1e38af4bcc90f7c131efd1527d5282c3f24def Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Mon, 12 May 2025 19:10:17 +0800 Subject: [PATCH 03/12] read buffer pool --- go.mod | 2 +- p2p/discover/v5_talk.go | 2 +- p2p/discover/v5_udp.go | 49 ++++++++++++++++----------------- p2p/discover/v5wire/encoding.go | 35 ++++++++++++++--------- p2p/discover/v5wire/msg.go | 16 +++++------ 5 files changed, 55 insertions(+), 49 deletions(-) diff --git a/go.mod b/go.mod index 0432acf97a9..30e815d4d7d 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/jedisct1/go-minisign v0.0.0-20230811132847-661be99b8267 github.com/karalabe/hid v1.0.1-0.20240306101548-573246063e52 github.com/kylelemons/godebug v1.1.0 + github.com/libp2p/go-buffer-pool v0.1.0 github.com/mattn/go-colorable v0.1.13 github.com/mattn/go-isatty v0.0.20 github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 @@ -118,7 +119,6 @@ require ( github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect - github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/minio/sha256-simd v1.0.0 // indirect diff --git a/p2p/discover/v5_talk.go b/p2p/discover/v5_talk.go index dca09870d89..3187aafe53a 100644 --- a/p2p/discover/v5_talk.go +++ b/p2p/discover/v5_talk.go @@ -28,7 +28,7 @@ import ( ) // This is a limit for the number of concurrent talk requests. -const maxActiveTalkRequests = 1024 +const maxActiveTalkRequests = 2048 // This is the timeout for acquiring a handler execution slot for a talk request. // The timeout should be short enough to fit within the request timeout. diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 434962b7256..637a71ea522 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -189,7 +189,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { clock: cfg.Clock, respTimeout: cfg.V5RespTimeout, // channels into dispatch - packetInCh: make(chan ReadPacket, 1), + packetInCh: make(chan ReadPacket, 1024), readNextCh: make(chan struct{}, 1), callCh: make(chan *callV5), callDoneCh: make(chan *callV5), @@ -197,7 +197,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { sendNoRespCh: make(chan *sendNoRespRequest), respTimeoutCh: make(chan *callTimeout), unhandled: cfg.Unhandled, - writeCh: make(chan pendingWrite, 128), // Buffered channel for outgoing packets + writeCh: make(chan pendingWrite, 256), // Buffered channel for outgoing packets // state of dispatch codec: v5wire.NewCodec(ln, cfg.PrivateKey, cfg.Clock, cfg.V5ProtocolID), activeCallByNode: make(map[enode.ID]*callV5), @@ -594,9 +594,6 @@ func (t *UDPv5) callDone(c *callV5) { func (t *UDPv5) dispatch() { defer t.wg.Done() - // Arm first read. - t.readNextCh <- struct{}{} - for { select { case c := <-t.callCh: @@ -629,14 +626,6 @@ func (t *UDPv5) dispatch() { t.send(r.destID, r.destAddr, r.msg, nil) case p := <-t.packetInCh: - // Arm next read immediately to allow pipelining. - // The readLoop can start reading the next packet while this one is being handled. - // Backpressure is still maintained by packetInCh (buffer 1) and readNextCh (buffer 1). - select { - case t.readNextCh <- struct{}{}: - case <-t.closeCtx.Done(): // Avoid blocking on send if closing - return - } t.handlePacket(p.Data, p.Addr) case <-t.closeCtx.Done(): @@ -836,21 +825,26 @@ func (t *UDPv5) writeLoop() { func (t *UDPv5) readLoop() { defer t.wg.Done() - buf := make([]byte, maxPacketSize) - for range t.readNextCh { - nbytes, from, err := t.conn.ReadFromUDPAddrPort(buf) - if netutil.IsTemporaryError(err) { - // Ignore temporary read errors. - t.log.Debug("Temporary UDP read error", "err", err) - continue - } else if err != nil { - // Shut down the loop for permanent errors. - if !errors.Is(err, io.EOF) { - t.log.Debug("UDP read error", "err", err) + for { + select { + case <-t.closeCtx.Done(): + t.log.Trace("UDP read loop shutdown") + default: + buf := bufferpool.Get(maxPacketSize) + nbytes, from, err := t.conn.ReadFromUDPAddrPort(buf) + if netutil.IsTemporaryError(err) { + // Ignore temporary read errors. + t.log.Debug("Temporary UDP read error", "err", err) + continue + } else if err != nil { + // Shut down the loop for permanent errors. + if !errors.Is(err, io.EOF) { + t.log.Debug("UDP read error", "err", err) + } + return } - return + t.dispatchReadPacket(from, buf[:nbytes]) } - t.dispatchReadPacket(from, buf[:nbytes]) } } @@ -864,6 +858,7 @@ func (t *UDPv5) dispatchReadPacket(from netip.AddrPort, content []byte) bool { case t.packetInCh <- ReadPacket{content, from}: return true case <-t.closeCtx.Done(): + bufferpool.Put(content) return false } } @@ -873,6 +868,8 @@ func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr netip.AddrPort) error { addr := fromAddr.String() t.log.Trace("<< "+addr, "rawPacket", hexutil.Encode(rawpacket)) fromID, fromNode, packet, err := t.codec.Decode(rawpacket, addr) + bufferpool.Put(rawpacket) + if err != nil { if t.unhandled != nil && v5wire.IsInvalidHeader(err) { // The packet seems unrelated to discv5, send it to the next protocol. diff --git a/p2p/discover/v5wire/encoding.go b/p2p/discover/v5wire/encoding.go index ec5ef8a261a..e1067378abf 100644 --- a/p2p/discover/v5wire/encoding.go +++ b/p2p/discover/v5wire/encoding.go @@ -191,8 +191,15 @@ func (c *Codec) Encode(id enode.ID, addr string, packet Packet, challenge *Whoar case packet.Kind() == WhoareyouPacket: // just send the WHOAREYOU packet raw again, rather than the re-encoded challenge data w := packet.(*Whoareyou) - if len(w.Encoded) > 0 { - return w.Encoded, w.Nonce, nil + if len(w.ChallengeData) > 0 { + // This WHOAREYOU packet was encoded before, so it's a resend. + // The unmasked packet content is stored in w.ChallengeData. + // Just apply the masking again to finish encoding. + c.buf.Reset() + c.buf.Write(w.ChallengeData) + copy(head.IV[:], w.ChallengeData) + enc := applyMasking(id, head.IV, c.buf.Bytes()) + return enc, w.Nonce, nil } head, err = c.encodeWhoareyou(id, packet.(*Whoareyou)) case challenge != nil: @@ -227,7 +234,6 @@ func (c *Codec) Encode(id enode.ID, addr string, packet Packet, challenge *Whoar if err != nil { return nil, Nonce{}, err } - challenge.Encoded = bytes.Clone(enc) c.sc.storeSentHandshake(id, addr, challenge) return enc, head.Nonce, err } @@ -245,13 +251,9 @@ func (c *Codec) Encode(id enode.ID, addr string, packet Packet, challenge *Whoar // EncodeRaw encodes a packet with the given header. func (c *Codec) EncodeRaw(id enode.ID, head Header, msgdata []byte) ([]byte, error) { + // header c.writeHeaders(&head) - - // Apply masking. - masked := c.buf.Bytes()[sizeofMaskingIV:] - mask := head.mask(id) - mask.XORKeyStream(masked[:], masked[:]) - + applyMasking(id, head.IV, c.buf.Bytes()) // Write message data. c.buf.Write(msgdata) return c.buf.Bytes(), nil @@ -463,7 +465,7 @@ func (c *Codec) Decode(inputData []byte, addr string) (src enode.ID, n *enode.No // Unmask the static header. var head Header copy(head.IV[:], input[:sizeofMaskingIV]) - mask := head.mask(c.localnode.ID()) + mask := createMask(c.localnode.ID(), head.IV) staticHeader := input[sizeofMaskingIV:sizeofStaticPacketData] mask.XORKeyStream(staticHeader, staticHeader) @@ -678,13 +680,20 @@ func (h *StaticHeader) checkValid(packetLen int, protocolID [6]byte) error { return nil } -// mask returns a cipher for 'masking' / 'unmasking' packet headers. -func (h *Header) mask(destID enode.ID) cipher.Stream { +// createMask returns a cipher for 'masking' / 'unmasking' packet headers. +func createMask(destID enode.ID, iv [16]byte) cipher.Stream { block, err := aes.NewCipher(destID[:16]) if err != nil { panic("can't create cipher") } - return cipher.NewCTR(block, h.IV[:]) + return cipher.NewCTR(block, iv[:]) +} + +func applyMasking(destID enode.ID, iv [16]byte, packet []byte) []byte { + masked := packet[sizeofMaskingIV:] + mask := createMask(destID, iv) + mask.XORKeyStream(masked[:], masked[:]) + return packet } func bytesCopy(r *bytes.Buffer) []byte { diff --git a/p2p/discover/v5wire/msg.go b/p2p/discover/v5wire/msg.go index 089fd4ebdc8..eb4123bba1f 100644 --- a/p2p/discover/v5wire/msg.go +++ b/p2p/discover/v5wire/msg.go @@ -63,19 +63,19 @@ type ( // WHOAREYOU contains the handshake challenge. Whoareyou struct { - ChallengeData []byte // Encoded challenge - Nonce Nonce // Nonce of request packet - IDNonce [16]byte // Identity proof data - RecordSeq uint64 // ENR sequence number of recipient + Nonce Nonce // Nonce of request packet + IDNonce [16]byte // Identity proof data + RecordSeq uint64 // ENR sequence number of recipient // Node is the locally known node record of recipient. // This must be set by the caller of Encode. - Node *enode.Node + Node *enode.Node `rlp:"-"` + // ChallengeData stores the unmasked encoding of the whole packet. This is the + // input data for verification. It is assigned by both Encode and Decode + // operations. + ChallengeData []byte `rlp:"-"` sent mclock.AbsTime // for handshake GC. - - // Encoded is packet raw data for sending out, but should not be include in the RLP encoding. - Encoded []byte `rlp:"-"` } // PING is sent during liveness checks. From 7714d41097af665e7a6a37396ee21772be53ded6 Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Wed, 14 May 2025 00:04:57 +0800 Subject: [PATCH 04/12] modify write channel size --- p2p/discover/v5_talk.go | 2 +- p2p/discover/v5_udp.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/discover/v5_talk.go b/p2p/discover/v5_talk.go index 3187aafe53a..dca09870d89 100644 --- a/p2p/discover/v5_talk.go +++ b/p2p/discover/v5_talk.go @@ -28,7 +28,7 @@ import ( ) // This is a limit for the number of concurrent talk requests. -const maxActiveTalkRequests = 2048 +const maxActiveTalkRequests = 1024 // This is the timeout for acquiring a handler execution slot for a talk request. // The timeout should be short enough to fit within the request timeout. diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 637a71ea522..223de7310a3 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -197,7 +197,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { sendNoRespCh: make(chan *sendNoRespRequest), respTimeoutCh: make(chan *callTimeout), unhandled: cfg.Unhandled, - writeCh: make(chan pendingWrite, 256), // Buffered channel for outgoing packets + writeCh: make(chan pendingWrite, 1024), // Buffered channel for outgoing packets // state of dispatch codec: v5wire.NewCodec(ln, cfg.PrivateKey, cfg.Clock, cfg.V5ProtocolID), activeCallByNode: make(map[enode.ID]*callV5), From 8d05d5da6d98861345b81f61541ebe5a0b763a4b Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Wed, 14 May 2025 09:59:41 +0800 Subject: [PATCH 05/12] add max active talk requests --- p2p/discover/v5_talk.go | 2 +- p2p/discover/v5_udp.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/p2p/discover/v5_talk.go b/p2p/discover/v5_talk.go index dca09870d89..3187aafe53a 100644 --- a/p2p/discover/v5_talk.go +++ b/p2p/discover/v5_talk.go @@ -28,7 +28,7 @@ import ( ) // This is a limit for the number of concurrent talk requests. -const maxActiveTalkRequests = 1024 +const maxActiveTalkRequests = 2048 // This is the timeout for acquiring a handler execution slot for a talk request. // The timeout should be short enough to fit within the request timeout. diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 223de7310a3..905baae4ce7 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -774,8 +774,8 @@ func (t *UDPv5) send(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet, t.logcontext = append(t.logcontext, "rawpacket", hexutil.Encode(enc)) t.log.Trace(">> "+packet.Name(), t.logcontext...) - dataForSend := bufferpool.Get(len(enc)) - copy(dataForSend, enc) + dataForSend := bufferpool.Get(len(enc))[:0] + dataForSend = append(dataForSend, enc...) pw := pendingWrite{ toAddr: toAddr, From c603f255c84d458ae0293948463c78aa042cdfe7 Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Wed, 14 May 2025 15:39:59 +0800 Subject: [PATCH 06/12] remove readNextCh --- p2p/discover/v5_udp.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 905baae4ce7..6d5e19c2244 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -93,7 +93,6 @@ type UDPv5 struct { // channels into dispatch packetInCh chan ReadPacket - readNextCh chan struct{} callCh chan *callV5 callDoneCh chan *callV5 respTimeoutCh chan *callTimeout @@ -190,7 +189,6 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { respTimeout: cfg.V5RespTimeout, // channels into dispatch packetInCh: make(chan ReadPacket, 1024), - readNextCh: make(chan struct{}, 1), callCh: make(chan *callV5), callDoneCh: make(chan *callV5), sendCh: make(chan sendRequest), @@ -629,7 +627,6 @@ func (t *UDPv5) dispatch() { t.handlePacket(p.Data, p.Addr) case <-t.closeCtx.Done(): - close(t.readNextCh) close(t.writeCh) for id, queue := range t.callQueue { for _, c := range queue { From aa62418193b8b1ef4749ad4415ebf2d282d3c6cd Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Wed, 14 May 2025 17:28:58 +0800 Subject: [PATCH 07/12] fix test case --- p2p/discover/v5_udp_test.go | 15 +++++++------- p2p/discover/v5wire/encoding_test.go | 29 ++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go index 3a384aab129..e258f3c24d6 100644 --- a/p2p/discover/v5_udp_test.go +++ b/p2p/discover/v5_udp_test.go @@ -772,10 +772,10 @@ type testCodecFrame struct { } func (c *testCodec) Encode(toID enode.ID, addr string, p v5wire.Packet, _ *v5wire.Whoareyou) ([]byte, v5wire.Nonce, error) { - // To match the behavior of v5wire.Codec, we return the cached encoding of - // WHOAREYOU challenges. - if wp, ok := p.(*v5wire.Whoareyou); ok && len(wp.Encoded) > 0 { - return wp.Encoded, wp.Nonce, nil + if wp, ok := p.(*v5wire.Whoareyou); ok && len(wp.ChallengeData) > 0 { + // To match the behavior of v5wire.Codec, we return the cached encoding of + // WHOAREYOU challenges. + return wp.ChallengeData, wp.Nonce, nil } c.ctr++ @@ -790,7 +790,7 @@ func (c *testCodec) Encode(toID enode.ID, addr string, p v5wire.Packet, _ *v5wir // Store recently sent challenges. if w, ok := p.(*v5wire.Whoareyou); ok { w.Nonce = authTag - w.Encoded = frame + w.ChallengeData = frame if c.sentChallenges == nil { c.sentChallenges = make(map[enode.ID]*v5wire.Whoareyou) } @@ -827,6 +827,7 @@ func (c *testCodec) decodeFrame(input []byte) (frame testCodecFrame, p v5wire.Pa case v5wire.WhoareyouPacket: dec := new(v5wire.Whoareyou) err = rlp.DecodeBytes(frame.Packet, &dec) + dec.ChallengeData = bytes.Clone(input) p = dec default: p, err = v5wire.DecodeMessage(frame.Ptype, frame.Packet) @@ -877,9 +878,7 @@ func (test *udpV5Test) packetInFrom(key *ecdsa.PrivateKey, addr netip.AddrPort, if err != nil { test.t.Errorf("%s encode error: %v", packet.Name(), err) } - if test.udp.dispatchReadPacket(addr, enc) { - <-test.udp.readNextCh // unblock UDPv5.dispatch - } + test.udp.dispatchReadPacket(addr, enc) } // getNode ensures the test knows about a node at the given endpoint. diff --git a/p2p/discover/v5wire/encoding_test.go b/p2p/discover/v5wire/encoding_test.go index 2304d0f1327..c513e942975 100644 --- a/p2p/discover/v5wire/encoding_test.go +++ b/p2p/discover/v5wire/encoding_test.go @@ -269,6 +269,35 @@ func TestHandshake_BadHandshakeAttack(t *testing.T) { net.nodeB.expectDecodeErr(t, errUnexpectedHandshake, findnode) } +func TestEncodeWhoareyouResend(t *testing.T) { + t.Parallel() + net := newHandshakeTest() + defer net.close() + + // A -> B WHOAREYOU + challenge := &Whoareyou{ + Nonce: Nonce{1, 2, 3, 4}, + IDNonce: testIDnonce, + RecordSeq: 0, + } + enc, _ := net.nodeA.encode(t, net.nodeB, challenge) + net.nodeB.expectDecode(t, WhoareyouPacket, enc) + whoareyou1 := bytes.Clone(enc) + + if len(challenge.ChallengeData) == 0 { + t.Fatal("ChallengeData not assigned by encode") + } + + // A -> B WHOAREYOU + // Send the same challenge again. This should produce exactly + // the same bytes as the first send. + enc, _ = net.nodeA.encode(t, net.nodeB, challenge) + whoareyou2 := bytes.Clone(enc) + if !bytes.Equal(whoareyou2, whoareyou1) { + t.Fatal("re-encoded challenge not equal to first") + } +} + // This test checks some malformed packets. func TestDecodeErrorsV5(t *testing.T) { t.Parallel() From 0fa5150d7a8662586a2ca28542b95ca4762ebead Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Thu, 15 May 2025 21:25:37 +0800 Subject: [PATCH 08/12] remove raw packet of trace log --- p2p/discover/v5_udp.go | 49 +++++++++++++++--------------------------- 1 file changed, 17 insertions(+), 32 deletions(-) diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 6d5e19c2244..f45f386b8ba 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -30,7 +30,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover/v5wire" @@ -768,11 +767,10 @@ func (t *UDPv5) send(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet, return nonce, err } - t.logcontext = append(t.logcontext, "rawpacket", hexutil.Encode(enc)) t.log.Trace(">> "+packet.Name(), t.logcontext...) - dataForSend := bufferpool.Get(len(enc))[:0] - dataForSend = append(dataForSend, enc...) + dataForSend := make([]byte, len(enc)) + copy(dataForSend, enc) pw := pendingWrite{ toAddr: toAddr, @@ -784,7 +782,6 @@ func (t *UDPv5) send(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet, // Packet successfully queued. return nonce, nil case <-t.closeCtx.Done(): - bufferpool.Put(dataForSend) return nonce, errClosed } } @@ -808,13 +805,7 @@ func (t *UDPv5) writeLoop() { t.log.Warn("UDP write error", "addr", pw.toAddr, "err", err) } } - } else { - // Minimal trace log confirming the actual send. - // The detailed packet-specific trace was done in the 'send' method. - t.log.Trace("UDP packet data sent", "addr", pw.toAddr, "len", len(pw.data)) } - - bufferpool.Put(pw.data) } } @@ -822,26 +813,21 @@ func (t *UDPv5) writeLoop() { func (t *UDPv5) readLoop() { defer t.wg.Done() + buf := bufferpool.Get(maxPacketSize) for { - select { - case <-t.closeCtx.Done(): - t.log.Trace("UDP read loop shutdown") - default: - buf := bufferpool.Get(maxPacketSize) - nbytes, from, err := t.conn.ReadFromUDPAddrPort(buf) - if netutil.IsTemporaryError(err) { - // Ignore temporary read errors. - t.log.Debug("Temporary UDP read error", "err", err) - continue - } else if err != nil { - // Shut down the loop for permanent errors. - if !errors.Is(err, io.EOF) { - t.log.Debug("UDP read error", "err", err) - } - return + nbytes, from, err := t.conn.ReadFromUDPAddrPort(buf) + if netutil.IsTemporaryError(err) { + // Ignore temporary read errors. + t.log.Debug("Temporary UDP read error", "err", err) + continue + } else if err != nil { + // Shut down the loop for permanent errors. + if !errors.Is(err, io.EOF) { + t.log.Debug("UDP read error", "err", err) } - t.dispatchReadPacket(from, buf[:nbytes]) + return } + t.dispatchReadPacket(from, buf[:nbytes]) } } @@ -851,11 +837,12 @@ func (t *UDPv5) dispatchReadPacket(from netip.AddrPort, content []byte) bool { if from.Addr().Is4In6() { from = netip.AddrPortFrom(netip.AddrFrom4(from.Addr().As4()), from.Port()) } + data := make([]byte, len(content)) + copy(data, content) select { - case t.packetInCh <- ReadPacket{content, from}: + case t.packetInCh <- ReadPacket{data, from}: return true case <-t.closeCtx.Done(): - bufferpool.Put(content) return false } } @@ -863,9 +850,7 @@ func (t *UDPv5) dispatchReadPacket(from netip.AddrPort, content []byte) bool { // handlePacket decodes and processes an incoming packet from the network. func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr netip.AddrPort) error { addr := fromAddr.String() - t.log.Trace("<< "+addr, "rawPacket", hexutil.Encode(rawpacket)) fromID, fromNode, packet, err := t.codec.Decode(rawpacket, addr) - bufferpool.Put(rawpacket) if err != nil { if t.unhandled != nil && v5wire.IsInvalidHeader(err) { From 9e10bab51afa0cce80131b6dcf4e5aaa2ef76875 Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Fri, 16 May 2025 10:24:22 +0800 Subject: [PATCH 09/12] remove buffer pool --- go.mod | 1 - go.sum | 2 -- p2p/discover/v5_udp.go | 7 +++---- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 30e815d4d7d..d27af647ece 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,6 @@ require ( github.com/jedisct1/go-minisign v0.0.0-20230811132847-661be99b8267 github.com/karalabe/hid v1.0.1-0.20240306101548-573246063e52 github.com/kylelemons/godebug v1.1.0 - github.com/libp2p/go-buffer-pool v0.1.0 github.com/mattn/go-colorable v0.1.13 github.com/mattn/go-isatty v0.0.20 github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 diff --git a/go.sum b/go.sum index 575c284827b..200b3725eaa 100644 --- a/go.sum +++ b/go.sum @@ -238,8 +238,6 @@ github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4F github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leanovate/gopter v0.2.11 h1:vRjThO1EKPb/1NsDXuDrzldR28RLkBflWYcU9CvzWu4= github.com/leanovate/gopter v0.2.11/go.mod h1:aK3tzZP/C+p1m3SPRE4SYZFGP7jjkuSI4f7Xvpt0S9c= -github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= -github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ= diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index f45f386b8ba..7047e279fb5 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -36,7 +36,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/netutil" - bufferpool "github.com/libp2p/go-buffer-pool" ) const ( @@ -187,14 +186,14 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { clock: cfg.Clock, respTimeout: cfg.V5RespTimeout, // channels into dispatch - packetInCh: make(chan ReadPacket, 1024), + packetInCh: make(chan ReadPacket, 64), callCh: make(chan *callV5), callDoneCh: make(chan *callV5), sendCh: make(chan sendRequest), sendNoRespCh: make(chan *sendNoRespRequest), respTimeoutCh: make(chan *callTimeout), unhandled: cfg.Unhandled, - writeCh: make(chan pendingWrite, 1024), // Buffered channel for outgoing packets + writeCh: make(chan pendingWrite, 32), // Buffered channel for outgoing packets // state of dispatch codec: v5wire.NewCodec(ln, cfg.PrivateKey, cfg.Clock, cfg.V5ProtocolID), activeCallByNode: make(map[enode.ID]*callV5), @@ -813,7 +812,7 @@ func (t *UDPv5) writeLoop() { func (t *UDPv5) readLoop() { defer t.wg.Done() - buf := bufferpool.Get(maxPacketSize) + buf := make([]byte, maxPacketSize) for { nbytes, from, err := t.conn.ReadFromUDPAddrPort(buf) if netutil.IsTemporaryError(err) { From 713d0b1a90f1f8a5afa4c921872f5a09c82cf4dc Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Fri, 16 May 2025 16:05:48 +0800 Subject: [PATCH 10/12] fix test case --- cmd/devp2p/internal/v5test/discv5tests.go | 29 +++++++++++++---------- p2p/discover/v5_udp.go | 19 +++++---------- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/cmd/devp2p/internal/v5test/discv5tests.go b/cmd/devp2p/internal/v5test/discv5tests.go index 2139cd8ca6f..efe9144069a 100644 --- a/cmd/devp2p/internal/v5test/discv5tests.go +++ b/cmd/devp2p/internal/v5test/discv5tests.go @@ -52,7 +52,7 @@ func (s *Suite) AllTests() []utesting.Test { {Name: "Ping", Fn: s.TestPing}, {Name: "PingLargeRequestID", Fn: s.TestPingLargeRequestID}, {Name: "PingMultiIP", Fn: s.TestPingMultiIP}, - {Name: "PingHandshakeInterrupted", Fn: s.TestPingHandshakeInterrupted}, + {Name: "HandshakeResend", Fn: s.TestHandshakeResend}, {Name: "TalkRequest", Fn: s.TestTalkRequest}, {Name: "FindnodeZeroDistance", Fn: s.TestFindnodeZeroDistance}, {Name: "FindnodeResults", Fn: s.TestFindnodeResults}, @@ -158,22 +158,20 @@ the attempt from a different IP.`) } } -// TestPingHandshakeInterrupted starts a handshake, but doesn't finish it and sends a second ordinary message -// packet instead of a handshake message packet. The remote node should respond with -// another WHOAREYOU challenge for the second packet. -func (s *Suite) TestPingHandshakeInterrupted(t *utesting.T) { - t.Log(`TestPingHandshakeInterrupted starts a handshake, but doesn't finish it and sends a second ordinary message -packet instead of a handshake message packet. The remote node should respond with -another WHOAREYOU challenge for the second packet.`) - +// TestHandshakeResend starts a handshake, but doesn't finish it and sends a second ordinary message +// packet instead of a handshake message packet. The remote node should repeat the previous WHOAREYOU +// challenge for the first PING. +func (s *Suite) TestHandshakeResend(t *utesting.T) { conn, l1 := s.listen1(t) defer conn.close() // First PING triggers challenge. ping := &v5wire.Ping{ReqID: conn.nextReqID()} conn.write(l1, ping, nil) + var challenge1 *v5wire.Whoareyou switch resp := conn.read(l1).(type) { case *v5wire.Whoareyou: + challenge1 = resp t.Logf("got WHOAREYOU for PING") default: t.Fatal("expected WHOAREYOU, got", resp) @@ -181,9 +179,16 @@ another WHOAREYOU challenge for the second packet.`) // Send second PING. ping2 := &v5wire.Ping{ReqID: conn.nextReqID()} - switch resp := conn.reqresp(l1, ping2).(type) { - case *v5wire.Pong: - checkPong(t, resp, ping2, l1) + conn.write(l1, ping2, nil) + switch resp := conn.read(l1).(type) { + case *v5wire.Whoareyou: + if resp.Nonce != challenge1.Nonce { + t.Fatalf("wrong nonce %x in WHOAREYOU (want %x)", resp.Nonce[:], challenge1.Nonce[:]) + } + if !bytes.Equal(resp.ChallengeData, challenge1.ChallengeData) { + t.Fatalf("wrong ChallengeData in resent WHOAREYOU (want %x)", resp.ChallengeData, challenge1.ChallengeData) + } + resp.Node = conn.remote default: t.Fatal("expected WHOAREYOU, got", resp) } diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 7047e279fb5..08903a0d970 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -790,20 +790,13 @@ func (t *UDPv5) writeLoop() { defer t.wg.Done() for pw := range t.writeCh { // Loop continues until writeCh is closed and empty. _, err := t.conn.WriteToUDPAddrPort(pw.data, pw.toAddr) - if err != nil { - // Generic error logging, as we don't have packetName or rich context here. - select { - case <-t.closeCtx.Done(): - // Log trace level if error occurs during or after shutdown initiation. - t.log.Trace("UDP write error during/after shutdown", "addr", pw.toAddr, "err", err) - default: - // Not closing, so it's a more unexpected error. - if netutil.IsTemporaryError(err) { - t.log.Debug("Temporary UDP write error", "addr", pw.toAddr, "err", err) - } else if !errors.Is(err, net.ErrClosed) && !errors.Is(err, io.EOF) { // Avoid logging common "closed" errors if not caught by closeCtx. - t.log.Warn("UDP write error", "addr", pw.toAddr, "err", err) - } + if netutil.IsTemporaryError(err) { + t.log.Debug("Temporary UDP write error", "addr", pw.toAddr, "err", err) + } else if err != nil { + if !errors.Is(err, net.ErrClosed) || !errors.Is(err, io.EOF) { + t.log.Warn("UDP write error", "addr", pw.toAddr, "err", err) } + return } } } From 4323ec356de1ca01c3195b0226a72248ad166523 Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Mon, 12 May 2025 19:10:17 +0800 Subject: [PATCH 11/12] read loop --- go.mod | 1 + go.sum | 2 ++ p2p/discover/v5_udp.go | 56 ++++++------------------------------------ 3 files changed, 11 insertions(+), 48 deletions(-) diff --git a/go.mod b/go.mod index d27af647ece..30e815d4d7d 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/jedisct1/go-minisign v0.0.0-20230811132847-661be99b8267 github.com/karalabe/hid v1.0.1-0.20240306101548-573246063e52 github.com/kylelemons/godebug v1.1.0 + github.com/libp2p/go-buffer-pool v0.1.0 github.com/mattn/go-colorable v0.1.13 github.com/mattn/go-isatty v0.0.20 github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 diff --git a/go.sum b/go.sum index 200b3725eaa..575c284827b 100644 --- a/go.sum +++ b/go.sum @@ -238,6 +238,8 @@ github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4F github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leanovate/gopter v0.2.11 h1:vRjThO1EKPb/1NsDXuDrzldR28RLkBflWYcU9CvzWu4= github.com/leanovate/gopter v0.2.11/go.mod h1:aK3tzZP/C+p1m3SPRE4SYZFGP7jjkuSI4f7Xvpt0S9c= +github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= +github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ= diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 08903a0d970..c924633fcfc 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -97,7 +97,6 @@ type UDPv5 struct { sendCh chan sendRequest sendNoRespCh chan *sendNoRespRequest unhandled chan<- ReadPacket - writeCh chan pendingWrite // New channel for outgoing packets // state of dispatch codec codecV5 @@ -113,12 +112,6 @@ type UDPv5 struct { wg sync.WaitGroup } -// pendingWrite holds data for a packet to be sent by the writeLoop. -type pendingWrite struct { - toAddr netip.AddrPort - data []byte -} - type sendRequest struct { destID enode.ID destAddr netip.AddrPort @@ -163,10 +156,9 @@ func ListenV5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { return nil, err } go t.tab.loop() - t.wg.Add(3) + t.wg.Add(2) go t.readLoop() go t.dispatch() - go t.writeLoop() return t, nil } @@ -186,14 +178,13 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { clock: cfg.Clock, respTimeout: cfg.V5RespTimeout, // channels into dispatch - packetInCh: make(chan ReadPacket, 64), + packetInCh: make(chan ReadPacket, 4), callCh: make(chan *callV5), callDoneCh: make(chan *callV5), sendCh: make(chan sendRequest), sendNoRespCh: make(chan *sendNoRespRequest), respTimeoutCh: make(chan *callTimeout), unhandled: cfg.Unhandled, - writeCh: make(chan pendingWrite, 32), // Buffered channel for outgoing packets // state of dispatch codec: v5wire.NewCodec(ln, cfg.PrivateKey, cfg.Clock, cfg.V5ProtocolID), activeCallByNode: make(map[enode.ID]*callV5), @@ -625,7 +616,6 @@ func (t *UDPv5) dispatch() { t.handlePacket(p.Data, p.Addr) case <-t.closeCtx.Done(): - close(t.writeCh) for id, queue := range t.callQueue { for _, c := range queue { c.err <- errClosed @@ -766,39 +756,9 @@ func (t *UDPv5) send(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet, return nonce, err } + _, err = t.conn.WriteToUDPAddrPort(enc, toAddr) t.log.Trace(">> "+packet.Name(), t.logcontext...) - - dataForSend := make([]byte, len(enc)) - copy(dataForSend, enc) - - pw := pendingWrite{ - toAddr: toAddr, - data: dataForSend, // codec.Encode should return a new slice, safe to pass directly. - } - - select { - case t.writeCh <- pw: - // Packet successfully queued. - return nonce, nil - case <-t.closeCtx.Done(): - return nonce, errClosed - } -} - -// writeLoop runs in its own goroutine and sends packets from the writeCh to the network. -func (t *UDPv5) writeLoop() { - defer t.wg.Done() - for pw := range t.writeCh { // Loop continues until writeCh is closed and empty. - _, err := t.conn.WriteToUDPAddrPort(pw.data, pw.toAddr) - if netutil.IsTemporaryError(err) { - t.log.Debug("Temporary UDP write error", "addr", pw.toAddr, "err", err) - } else if err != nil { - if !errors.Is(err, net.ErrClosed) || !errors.Is(err, io.EOF) { - t.log.Warn("UDP write error", "addr", pw.toAddr, "err", err) - } - return - } - } + return nonce, err } // readLoop runs in its own goroutine and reads packets from the network. @@ -819,7 +779,9 @@ func (t *UDPv5) readLoop() { } return } - t.dispatchReadPacket(from, buf[:nbytes]) + content := make([]byte, nbytes) + copy(content, buf[:nbytes]) + t.dispatchReadPacket(from, content) } } @@ -829,10 +791,8 @@ func (t *UDPv5) dispatchReadPacket(from netip.AddrPort, content []byte) bool { if from.Addr().Is4In6() { from = netip.AddrPortFrom(netip.AddrFrom4(from.Addr().As4()), from.Port()) } - data := make([]byte, len(content)) - copy(data, content) select { - case t.packetInCh <- ReadPacket{data, from}: + case t.packetInCh <- ReadPacket{content, from}: return true case <-t.closeCtx.Done(): return false From e115d3a319242a792ddb7a6b2349bcf39b0a1ea0 Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Sun, 25 May 2025 14:33:34 +0800 Subject: [PATCH 12/12] fix go mod --- go.mod | 1 - go.sum | 2 -- 2 files changed, 3 deletions(-) diff --git a/go.mod b/go.mod index 30e815d4d7d..d27af647ece 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,6 @@ require ( github.com/jedisct1/go-minisign v0.0.0-20230811132847-661be99b8267 github.com/karalabe/hid v1.0.1-0.20240306101548-573246063e52 github.com/kylelemons/godebug v1.1.0 - github.com/libp2p/go-buffer-pool v0.1.0 github.com/mattn/go-colorable v0.1.13 github.com/mattn/go-isatty v0.0.20 github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 diff --git a/go.sum b/go.sum index 575c284827b..200b3725eaa 100644 --- a/go.sum +++ b/go.sum @@ -238,8 +238,6 @@ github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4F github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leanovate/gopter v0.2.11 h1:vRjThO1EKPb/1NsDXuDrzldR28RLkBflWYcU9CvzWu4= github.com/leanovate/gopter v0.2.11/go.mod h1:aK3tzZP/C+p1m3SPRE4SYZFGP7jjkuSI4f7Xvpt0S9c= -github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= -github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ=