Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Jan 27, 2025
1 parent ffed95f commit 3303ef8
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 204 deletions.
11 changes: 11 additions & 0 deletions core/network/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

ic "github.com/libp2p/go-libp2p/core/crypto"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"

Expand All @@ -31,6 +32,16 @@ func (c *ConnError) Error() string {
return fmt.Sprintf("connection closed (%s): code: %d", side, c.ErrorCode)
}

func (c *ConnError) Is(target error) bool {
if target == ErrReset {
return true
}
if tce, ok := target.(*ConnError); ok {
return tce.ErrorCode == c.ErrorCode && tce.Remote == c.Remote
}
return false
}

func (c *ConnError) Unwrap() error {
return c.TransportError
}
Expand Down
20 changes: 12 additions & 8 deletions core/network/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ func (s *StreamError) Error() string {
}

func (s *StreamError) Is(target error) bool {
return target == ErrReset
if target == ErrReset {
return true
}
if tse, ok := target.(*StreamError); ok {
return tse.ErrorCode == s.ErrorCode && tse.Remote == s.Remote
}
return false
}

func (s *StreamError) Unwrap() error {
Expand Down Expand Up @@ -96,16 +102,14 @@ type MuxedStream interface {
// side to hang up and go away.
Reset() error

SetDeadline(time.Time) error
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
}

type ResetWithErrorer interface {
// ResetWithError closes both ends of the stream with errCode. The errCode is sent
// ResetWithError aborts both ends of the stream with `errCode`. `errCode` is sent
// to the peer on a best effort basis. For transports that do not support sending
// error codes to remote peer, the behavior is identical to calling Reset
ResetWithError(errCode StreamErrorCode) error

SetDeadline(time.Time) error
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
}

// MuxedConn represents a connection to a remote peer that has been
Expand Down
1 change: 0 additions & 1 deletion p2p/muxer/yamux/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func parseResetError(err error) error {
if errors.As(err, &ce) {
return &network.ConnError{Remote: ce.Remote, ErrorCode: network.ConnErrorCode(ce.ErrorCode)}
}
// TODO: How should we handle resets for reason other than a remote error
if errors.Is(err, yamux.ErrStreamReset) {
return fmt.Errorf("%w: %w", network.ErrReset, err)
}
Expand Down
76 changes: 76 additions & 0 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
tu "github.com/libp2p/go-libp2p/core/test"

swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"

ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -995,3 +998,76 @@ type testLimitGetter struct {
func (g testLimitGetter) GetConnLimit() int {
return g.limit
}

func TestErrorCode(t *testing.T) {
sw1, sw2, sw3 := swarmt.GenSwarm(t), swarmt.GenSwarm(t), swarmt.GenSwarm(t)
defer sw1.Close()
defer sw2.Close()
defer sw3.Close()

cm, err := NewConnManager(1, 1, WithGracePeriod(0), WithSilencePeriod(10))
require.NoError(t, err)
defer cm.Close()

sw1.Notify(cm.Notifee())
sw1.Peerstore().AddAddrs(sw2.LocalPeer(), sw2.ListenAddresses(), peerstore.PermanentAddrTTL)
sw1.Peerstore().AddAddrs(sw3.LocalPeer(), sw3.ListenAddresses(), peerstore.PermanentAddrTTL)

c12, err := sw1.DialPeer(context.Background(), sw2.LocalPeer())
require.NoError(t, err)

var c21 network.Conn
require.Eventually(t, func() bool {
conns := sw2.ConnsToPeer(sw1.LocalPeer())
if len(conns) == 0 {
return false
}
c21 = conns[0]
return true
}, 5*time.Second, 100*time.Millisecond)

c13, err := sw1.DialPeer(context.Background(), sw3.LocalPeer())
require.NoError(t, err)

var c31 network.Conn
require.Eventually(t, func() bool {
conns := sw3.ConnsToPeer(sw1.LocalPeer())
if len(conns) == 0 {
return false
}
c31 = conns[0]
return true
}, 5*time.Second, 100*time.Millisecond)

cm.TrimOpenConns(context.Background())

require.True(t, c12.IsClosed() || c13.IsClosed())
var c, cr network.Conn
if c12.IsClosed() {
c = c12
require.Eventually(t, func() bool {
conns := sw2.ConnsToPeer(sw1.LocalPeer())
if len(conns) == 0 {
cr = c21
return true
}
return false
}, 5*time.Second, 100*time.Millisecond)
} else {
c = c13
require.Eventually(t, func() bool {
conns := sw3.ConnsToPeer(sw1.LocalPeer())
if len(conns) == 0 {
cr = c31
return true
}
return false
}, 5*time.Second, 100*time.Millisecond)
}

_, err = c.NewStream(context.Background())
require.ErrorIs(t, err, &network.ConnError{ErrorCode: network.ConnGarbageCollected, Remote: false})

_, err = cr.NewStream(context.Background())
require.ErrorIs(t, err, &network.ConnError{ErrorCode: network.ConnGarbageCollected, Remote: true})
}
10 changes: 6 additions & 4 deletions p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,13 @@ func (s *stream) Reset() error {
return nil
}

func (s *stream) ResetWithError(errCode network.StreamErrorCode) error {
// ResetWithError resets the stream. It ignores the provided error code.
// TODO: Implement error code support.
func (s *stream) ResetWithError(_ network.StreamErrorCode) error {
// Cancel any pending reads/writes with an error.
// TODO: Should these be the other way round(remote=true)?
s.write.CloseWithError(&network.StreamError{Remote: false, ErrorCode: errCode})
s.read.CloseWithError(&network.StreamError{Remote: false, ErrorCode: errCode})

s.write.CloseWithError(network.ErrReset)
s.read.CloseWithError(network.ErrReset)

select {
case s.reset <- struct{}{}:
Expand Down
6 changes: 1 addition & 5 deletions p2p/net/swarm/swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,7 @@ func (c *Conn) start() {
}
scope, err := c.swarm.ResourceManager().OpenStream(c.RemotePeer(), network.DirInbound)
if err != nil {
if tse, ok := ts.(network.ResetWithErrorer); ok {
tse.ResetWithError(network.StreamResourceLimitExceeded)
} else {
ts.Reset()
}
ts.ResetWithError(network.StreamResourceLimitExceeded)
continue
}
c.swarm.refs.Add(1)
Expand Down
7 changes: 1 addition & 6 deletions p2p/net/swarm/swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,7 @@ func (s *Stream) Reset() error {
}

func (s *Stream) ResetWithError(errCode network.StreamErrorCode) error {
var err error
if se, ok := s.stream.(network.ResetWithErrorer); ok {
err = se.ResetWithError(errCode)
} else {
err = s.stream.Reset()
}
err := s.stream.ResetWithError(errCode)
s.closeAndRemoveStream()
return err
}
Expand Down
1 change: 1 addition & 0 deletions p2p/protocol/circuitv2/relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ func TestRelayLimitData(t *testing.T) {
t.Fatalf("expected to read %d bytes but read %d", len(buf), n)
}
}

buf = make([]byte, 4096)
if _, err := rand.Read(buf); err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 3303ef8

Please sign in to comment.