From dfa7e2880c7d44943ba3a09d972218b2fc4a059a Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 29 Jan 2025 21:49:46 +0530 Subject: [PATCH] address review comments 2 --- core/network/conn.go | 8 ++--- core/network/mux.go | 8 ++--- p2p/net/connmgr/connmgr_test.go | 5 ++- p2p/net/swarm/swarm.go | 13 +++++--- p2p/protocol/circuitv2/relay/relay_test.go | 1 - p2p/test/transport/transport_test.go | 39 ++++++++++++++++++---- p2p/transport/quic/stream.go | 20 +++++++---- p2p/transport/webrtc/connection.go | 4 ++- p2p/transport/webtransport/conn.go | 2 +- p2p/transport/webtransport/stream.go | 5 +++ 10 files changed, 72 insertions(+), 33 deletions(-) diff --git a/core/network/conn.go b/core/network/conn.go index 208ebda8d6..82a2a2fdc3 100644 --- a/core/network/conn.go +++ b/core/network/conn.go @@ -33,17 +33,14 @@ func (c *ConnError) Error() string { } func (c *ConnError) Is(target error) bool { - if target == ErrReset { - return true - } if tce, ok := target.(*ConnError); ok { return tce.ErrorCode == c.ErrorCode && tce.Remote == c.Remote } return false } -func (c *ConnError) Unwrap() error { - return c.TransportError +func (c *ConnError) Unwrap() []error { + return []error{ErrReset, c.TransportError} } const ( @@ -56,6 +53,7 @@ const ( ConnGarbageCollected ConnErrorCode = 1006 ConnShutdown ConnErrorCode = 1007 ConnGated ConnErrorCode = 1008 + ConnCodeOutOfRange ConnErrorCode = 1009 ) // Conn is a connection to a remote peer. It multiplexes streams. diff --git a/core/network/mux.go b/core/network/mux.go index fbc9de220b..bcd741a2bb 100644 --- a/core/network/mux.go +++ b/core/network/mux.go @@ -32,17 +32,14 @@ func (s *StreamError) Error() string { } func (s *StreamError) Is(target error) bool { - if target == ErrReset { - return true - } if tse, ok := target.(*StreamError); ok { return tse.ErrorCode == s.ErrorCode && tse.Remote == s.Remote } return false } -func (s *StreamError) Unwrap() error { - return s.TransportError +func (s *StreamError) Unwrap() []error { + return []error{ErrReset, s.TransportError} } const ( @@ -55,6 +52,7 @@ const ( StreamGarbageCollected StreamErrorCode = 1006 StreamShutdown StreamErrorCode = 1007 StreamGated StreamErrorCode = 1008 + StreamCodeOutOfRange StreamErrorCode = 1009 ) // MuxedStream is a bidirectional io pipe within a connection. diff --git a/p2p/net/connmgr/connmgr_test.go b/p2p/net/connmgr/connmgr_test.go index 4772837fe1..f47557b02c 100644 --- a/p2p/net/connmgr/connmgr_test.go +++ b/p2p/net/connmgr/connmgr_test.go @@ -1009,7 +1009,6 @@ func TestErrorCode(t *testing.T) { require.NoError(t, err) defer cm.Close() - sw1.Notify(cm.Notifee()) sw1.Peerstore().AddAddrs(sw2.LocalPeer(), sw2.ListenAddresses(), peerstore.PermanentAddrTTL) sw1.Peerstore().AddAddrs(sw3.LocalPeer(), sw3.ListenAddresses(), peerstore.PermanentAddrTTL) @@ -1039,6 +1038,10 @@ func TestErrorCode(t *testing.T) { return true }, 10*time.Second, 100*time.Millisecond) + not := cm.Notifee() + not.Connected(sw1, c12) + not.Connected(sw1, c13) + cm.TrimOpenConns(context.Background()) require.True(t, c12.IsClosed() || c13.IsClosed()) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index ca4360e11a..52fac831c4 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -850,11 +850,14 @@ func (c *connWithMetrics) Close() error { } func (c *connWithMetrics) CloseWithError(errCode network.ConnErrorCode) error { - c.metricsTracer.ClosedConnection(c.dir, time.Since(c.opened), c.ConnState(), c.LocalMultiaddr()) - if ce, ok := c.CapableConn.(network.CloseWithErrorer); ok { - return ce.CloseWithError(errCode) - } - return c.CapableConn.Close() + c.once.Do(func() { + c.metricsTracer.ClosedConnection(c.dir, time.Since(c.opened), c.ConnState(), c.LocalMultiaddr()) + if ce, ok := c.CapableConn.(network.CloseWithErrorer); ok { + c.closeErr = ce.CloseWithError(errCode) + } + c.closeErr = c.CapableConn.Close() + }) + return c.closeErr } func (c *connWithMetrics) Stat() network.ConnStats { diff --git a/p2p/protocol/circuitv2/relay/relay_test.go b/p2p/protocol/circuitv2/relay/relay_test.go index 1329aa67b0..7c5ec927df 100644 --- a/p2p/protocol/circuitv2/relay/relay_test.go +++ b/p2p/protocol/circuitv2/relay/relay_test.go @@ -309,7 +309,6 @@ func TestRelayLimitData(t *testing.T) { rc := relay.DefaultResources() rc.Limit.Duration = time.Second - // Due to yamux framing, 4 blocks of 1024 bytes will exceed the data limit rc.Limit.Data = 4096 r, err := relay.New(hosts[1], relay.WithResources(rc)) diff --git a/p2p/test/transport/transport_test.go b/p2p/test/transport/transport_test.go index 9ae9f111cc..9936463e23 100644 --- a/p2p/test/transport/transport_test.go +++ b/p2p/test/transport/transport_test.go @@ -869,7 +869,6 @@ func TestConnClosedWhenRemoteCloses(t *testing.T) { } func TestErrorCodes(t *testing.T) { - assertStreamErrors := func(s network.Stream, expectedError error) { buf := make([]byte, 10) _, err := s.Read(buf) @@ -925,6 +924,9 @@ func TestErrorCodes(t *testing.T) { require.NoError(t, err) pingPong(s) + remoteStream := <-remoteStreamQ + defer remoteStream.Reset() + err = s.ResetWithError(42) require.NoError(t, err) assertStreamErrors(s, &network.StreamError{ @@ -932,13 +934,36 @@ func TestErrorCodes(t *testing.T) { Remote: false, }) + assertStreamErrors(remoteStream, &network.StreamError{ + ErrorCode: 42, + Remote: true, + }) + }) + t.Run("StreamResetWithErrorByRemote", func(t *testing.T) { + if tc.Name == "WebTransport" { + t.Skipf("skipping: %s, not implemented", tc.Name) + return + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s, err := client.NewStream(ctx, server.ID(), "/test") + require.NoError(t, err) + pingPong(s) + remoteStream := <-remoteStreamQ - defer remoteStream.Reset() - assertStreamErrors(remoteStream, &network.StreamError{ + err = remoteStream.ResetWithError(42) + require.NoError(t, err) + + assertStreamErrors(s, &network.StreamError{ ErrorCode: 42, Remote: true, }) + + assertStreamErrors(remoteStream, &network.StreamError{ + ErrorCode: 42, + Remote: false, + }) }) t.Run("StreamResetByConnCloseWithError", func(t *testing.T) { @@ -952,6 +977,9 @@ func TestErrorCodes(t *testing.T) { require.NoError(t, err) pingPong(s) + remoteStream := <-remoteStreamQ + defer remoteStream.Reset() + err = s.Conn().CloseWithError(42) require.NoError(t, err) @@ -960,16 +988,13 @@ func TestErrorCodes(t *testing.T) { Remote: false, }) - remoteStream := <-remoteStreamQ - defer remoteStream.Reset() - assertStreamErrors(remoteStream, &network.ConnError{ ErrorCode: 42, Remote: true, }) }) - t.Run("StreamResetByConnCloseWithError", func(t *testing.T) { + t.Run("NewStreamErrorByConnCloseWithError", func(t *testing.T) { if tc.Name == "WebTransport" || tc.Name == "WebRTC" { t.Skipf("skipping: %s, not implemented", tc.Name) return diff --git a/p2p/transport/quic/stream.go b/p2p/transport/quic/stream.go index 921e17c76c..1de4770dce 100644 --- a/p2p/transport/quic/stream.go +++ b/p2p/transport/quic/stream.go @@ -25,22 +25,28 @@ func parseStreamError(err error) error { } se := &quic.StreamError{} if errors.As(err, &se) { - code := se.ErrorCode - if code > math.MaxUint32 { - // TODO(sukunrt): do we need this? - code = reset + var code network.StreamErrorCode + if se.ErrorCode > math.MaxUint32 { + code = network.StreamCodeOutOfRange + } else { + code = network.StreamErrorCode(se.ErrorCode) } err = &network.StreamError{ - ErrorCode: network.StreamErrorCode(code), + ErrorCode: code, Remote: se.Remote, TransportError: se, } } ae := &quic.ApplicationError{} if errors.As(err, &ae) { - code := ae.ErrorCode + var code network.ConnErrorCode + if ae.ErrorCode > math.MaxUint32 { + code = network.ConnCodeOutOfRange + } else { + code = network.ConnErrorCode(ae.ErrorCode) + } err = &network.ConnError{ - ErrorCode: network.ConnErrorCode(code), + ErrorCode: code, Remote: ae.Remote, TransportError: ae, } diff --git a/p2p/transport/webrtc/connection.go b/p2p/transport/webrtc/connection.go index 8413b3a394..77b293fadb 100644 --- a/p2p/transport/webrtc/connection.go +++ b/p2p/transport/webrtc/connection.go @@ -132,7 +132,9 @@ func (c *connection) Close() error { return nil } -func (c *connection) CloseWithError(errCode network.ConnErrorCode) error { +// CloseWithError closes the connection ignoring the error code. As there's no way to signal +// the remote peer on closing the underlying peerconnection, we ignore the error code. +func (c *connection) CloseWithError(_ network.ConnErrorCode) error { return c.Close() } diff --git a/p2p/transport/webtransport/conn.go b/p2p/transport/webtransport/conn.go index 3618548d14..f76ad10438 100644 --- a/p2p/transport/webtransport/conn.go +++ b/p2p/transport/webtransport/conn.go @@ -78,7 +78,7 @@ func (c *conn) Close() error { return err } -func (c *conn) CloseWithError(errCode network.ConnErrorCode) error { +func (c *conn) CloseWithError(_ network.ConnErrorCode) error { return c.Close() } diff --git a/p2p/transport/webtransport/stream.go b/p2p/transport/webtransport/stream.go index 048da33d3e..83ee52a5d1 100644 --- a/p2p/transport/webtransport/stream.go +++ b/p2p/transport/webtransport/stream.go @@ -56,6 +56,11 @@ func (s *stream) Reset() error { return nil } +// ResetWithError resets the stream ignoring the error code. Error codes aren't +// specified for WebTransport as the current implementation of WebTransport in +// browsers(https://www.ietf.org/archive/id/draft-kinnear-webtransport-http2-02.html) +// only supports 1 byte error codes. For more details, see +// https://github.com/libp2p/specs/blob/4eca305185c7aef219e936bef76c48b1ab0a8b43/error-codes/README.md?plain=1#L84 func (s *stream) ResetWithError(_ network.StreamErrorCode) error { s.Stream.CancelRead(reset) s.Stream.CancelWrite(reset)