diff --git a/CHANGELOG.md b/CHANGELOG.md index 3837e52a8..411f7bdd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes: ### Added +- `routing/http`: `GET /routing/v1/dht/closest/peers/{key}` per [IPIP-476](https://github.com/ipfs/specs/pull/476) + ### Changed - upgrade to `go-libp2p` [v0.44.0](https://github.com/libp2p/go-libp2p/releases/tag/v0.44.0) diff --git a/examples/go.mod b/examples/go.mod index 287e77c8a..c66b41a18 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -76,7 +76,7 @@ require ( github.com/libp2p/go-libp2p-kad-dht v0.35.1 // indirect github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect github.com/libp2p/go-libp2p-record v0.3.1 // indirect - github.com/libp2p/go-libp2p-routing-helpers v0.7.5 // indirect + github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e // indirect github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-netroute v0.3.0 // indirect github.com/libp2p/go-reuseport v0.4.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index fce2c2b39..6c068e993 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -223,8 +223,8 @@ github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4= github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg= github.com/libp2p/go-libp2p-record v0.3.1/go.mod h1:T8itUkLcWQLCYMqtX7Th6r7SexyUJpIyPgks757td/E= -github.com/libp2p/go-libp2p-routing-helpers v0.7.5 h1:HdwZj9NKovMx0vqq6YNPTh6aaNzey5zHD7HeLJtq6fI= -github.com/libp2p/go-libp2p-routing-helpers v0.7.5/go.mod h1:3YaxrwP0OBPDD7my3D0KxfR89FlcX/IEbxDEDfAmj98= +github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e h1:6DSfN9gsAmBa1iyAKwIuk9GlEga45iH8MBmuYAuXmpU= +github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e/go.mod h1:Q1VSaOawgsvaa3hGl/PejADIhl2deiqSEsQDpB3Ggss= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg= github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0= diff --git a/go.mod b/go.mod index 044b1b616..2d699f778 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/libp2p/go-libp2p v0.44.0 github.com/libp2p/go-libp2p-kad-dht v0.35.1 github.com/libp2p/go-libp2p-record v0.3.1 - github.com/libp2p/go-libp2p-routing-helpers v0.7.5 + github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e github.com/libp2p/go-libp2p-testing v0.12.0 github.com/libp2p/go-msgio v0.3.0 github.com/miekg/dns v1.1.68 diff --git a/go.sum b/go.sum index 2fc54879d..2389c06f8 100644 --- a/go.sum +++ b/go.sum @@ -222,8 +222,8 @@ github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4= github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg= github.com/libp2p/go-libp2p-record v0.3.1/go.mod h1:T8itUkLcWQLCYMqtX7Th6r7SexyUJpIyPgks757td/E= -github.com/libp2p/go-libp2p-routing-helpers v0.7.5 h1:HdwZj9NKovMx0vqq6YNPTh6aaNzey5zHD7HeLJtq6fI= -github.com/libp2p/go-libp2p-routing-helpers v0.7.5/go.mod h1:3YaxrwP0OBPDD7my3D0KxfR89FlcX/IEbxDEDfAmj98= +github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e h1:6DSfN9gsAmBa1iyAKwIuk9GlEga45iH8MBmuYAuXmpU= +github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e/go.mod h1:Q1VSaOawgsvaa3hGl/PejADIhl2deiqSEsQDpB3Ggss= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg= github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0= diff --git a/routing/http/client/client.go b/routing/http/client/client.go index 291793622..d12a789ae 100644 --- a/routing/http/client/client.go +++ b/routing/http/client/client.go @@ -638,3 +638,78 @@ func (c *Client) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Recor return nil } + +// GetClosestPeers obtains the closest peers to the given key (CID or Peer ID). +func (c *Client) GetClosestPeers(ctx context.Context, key cid.Cid) (peers iter.ResultIter[*types.PeerRecord], err error) { + m := newMeasurement("GetClosestPeers") + + // Build the base URL path + u, err := gourl.JoinPath(c.baseURL, "routing/v1/dht/closest/peers", key.String()) + if err != nil { + return nil, err + } + + // Create the HTTP request + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", c.accepts) + + m.host = req.Host + start := c.clock.Now() + resp, err := c.httpClient.Do(req) + m.latency = c.clock.Since(start) + m.err = err + + if err != nil { + m.record(ctx) + return nil, err + } + + var skipBodyClose bool + defer func() { + if !skipBodyClose { + resp.Body.Close() + } + }() + + m.statusCode = resp.StatusCode + if resp.StatusCode == http.StatusNotFound { + m.record(ctx) + return iter.FromSlice[iter.Result[*types.PeerRecord]](nil), nil + } + + if resp.StatusCode != http.StatusOK { + err := httpError(resp.StatusCode, resp.Body) + m.record(ctx) + return nil, err + } + + respContentType := resp.Header.Get("Content-Type") + mediaType, _, err := mime.ParseMediaType(respContentType) + if err != nil { + m.err = err + m.record(ctx) + return nil, fmt.Errorf("parsing Content-Type: %w", err) + } + + m.mediaType = mediaType + + var it iter.ResultIter[*types.PeerRecord] + switch mediaType { + case mediaTypeJSON: + parsedResp := &jsontypes.PeersResponse{} + err = json.NewDecoder(resp.Body).Decode(parsedResp) + var sliceIt iter.Iter[*types.PeerRecord] = iter.FromSlice(parsedResp.Peers) + it = iter.ToResultIter(sliceIt) + case mediaTypeNDJSON: + skipBodyClose = true + it = ndjson.NewPeerRecordsIter(resp.Body) + default: + logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType) + return nil, errors.New("unknown content type") + } + + return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil +} diff --git a/routing/http/client/client_test.go b/routing/http/client/client_test.go index 6171fe85f..e3d60dd63 100644 --- a/routing/http/client/client_test.go +++ b/routing/http/client/client_test.go @@ -48,6 +48,11 @@ func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit in return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1) } +func (m *mockContentRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + args := m.Called(ctx, key) + return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1) +} + func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { args := m.Called(ctx, name) rec, _ := args.Get(0).(*ipns.Record) @@ -836,6 +841,132 @@ func TestClient_EmptyResponses(t *testing.T) { } } +func TestClient_GetClosestPeers(t *testing.T) { + bitswapPeerRecord := makePeerRecord([]string{"transport-bitswap"}) + httpPeerRecord := makePeerRecord([]string{"transport-ipfs-gateway-http"}) + + peerRecords := []iter.Result[*types.PeerRecord]{ + {Val: &bitswapPeerRecord}, + {Val: &httpPeerRecord}, + } + + key := peer.ToCid(*bitswapPeerRecord.ID) + + cases := []struct { + name string + httpStatusCode int + stopServer bool + routerResult []iter.Result[*types.PeerRecord] + routerErr error + clientRequiresStreaming bool + serverStreamingDisabled bool + + expErrContains osErrContains + expResult []iter.Result[*types.PeerRecord] + expStreamingResponse bool + expJSONResponse bool + }{ + { + name: "happy case", + routerResult: peerRecords, + expResult: peerRecords, + expStreamingResponse: true, + }, + { + name: "server doesn't support streaming", + routerResult: peerRecords, + expResult: peerRecords, + serverStreamingDisabled: true, + expJSONResponse: true, + }, + { + name: "client requires streaming but server doesn't support it", + serverStreamingDisabled: true, + clientRequiresStreaming: true, + expErrContains: osErrContains{expContains: "HTTP error with StatusCode=400: no supported content types"}, + }, + { + name: "returns an error if there's a non-200 response", + httpStatusCode: 500, + expErrContains: osErrContains{expContains: "HTTP error with StatusCode=500"}, + }, + { + name: "returns an error if the HTTP client returns a non-HTTP error", + stopServer: true, + expErrContains: osErrContains{ + expContains: "connect: connection refused", + expContainsWin: "connectex: No connection could be made because the target machine actively refused it.", + }, + }, + { + name: "returns no providers if the HTTP server returns a 404 response", + httpStatusCode: 404, + expResult: nil, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + var clientOpts []Option + var serverOpts []server.Option + var onRespReceived []func(*http.Response) + var onReqReceived []func(*http.Request) + + if c.serverStreamingDisabled { + serverOpts = append(serverOpts, server.WithStreamingResultsDisabled()) + } + + if c.clientRequiresStreaming { + clientOpts = append(clientOpts, WithStreamResultsRequired()) + onReqReceived = append(onReqReceived, func(r *http.Request) { + assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Accept")) + }) + } + + if c.expStreamingResponse { + onRespReceived = append(onRespReceived, func(r *http.Response) { + assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type")) + }) + } + + if c.expJSONResponse { + onRespReceived = append(onRespReceived, func(r *http.Response) { + assert.Equal(t, mediaTypeJSON, r.Header.Get("Content-Type")) + }) + } + + deps := makeTestDeps(t, clientOpts, serverOpts) + + deps.recordingHTTPClient.f = append(deps.recordingHTTPClient.f, onRespReceived...) + deps.recordingHandler.f = append(deps.recordingHandler.f, onReqReceived...) + + client := deps.client + router := deps.router + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + if c.httpStatusCode != 0 { + deps.server.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(c.httpStatusCode) + }) + } + + if c.stopServer { + deps.server.Close() + } + + routerResultIter := iter.FromSlice(c.routerResult) + router.On("GetClosestPeers", mock.Anything, key).Return(routerResultIter, c.routerErr) + + resultIter, err := client.GetClosestPeers(ctx, key) + c.expErrContains.errContains(t, err) + + results := iter.ReadAll(resultIter) + assert.Equal(t, c.expResult, results) + }) + } +} + func TestNormalizeBaseURL(t *testing.T) { cases := []struct { name string diff --git a/routing/http/contentrouter/contentrouter.go b/routing/http/contentrouter/contentrouter.go index 8a8cfb8ae..34049a1e1 100644 --- a/routing/http/contentrouter/contentrouter.go +++ b/routing/http/contentrouter/contentrouter.go @@ -23,12 +23,15 @@ var logger = logging.Logger("routing/http/contentrouter") const ttl = 24 * time.Hour +// A Client provides HTTP Delegated Routing methods. See also [server.DelegatedRouter]. type Client interface { FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error + // GetClosestPeers returns the DHT closest peers to the given key (CID or Peer ID). + GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) } type contentRouter struct { @@ -43,6 +46,7 @@ var ( _ routing.ValueStore = (*contentRouter)(nil) _ routinghelpers.ProvideManyRouter = (*contentRouter)(nil) _ routinghelpers.ReadyAbleRouter = (*contentRouter)(nil) + _ routinghelpers.DHTRouter = (*contentRouter)(nil) ) type option func(c *contentRouter) @@ -59,6 +63,8 @@ func WithMaxProvideBatchSize(max int) option { } } +// NewContentRoutingClient returns a client that conforms to the +// ContentRouting interfaces. func NewContentRoutingClient(c Client, opts ...option) *contentRouter { cr := &contentRouter{ client: c, @@ -300,3 +306,44 @@ func (c *contentRouter) SearchValue(ctx context.Context, key string, opts ...rou return ch, nil } + +func (c *contentRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (<-chan peer.AddrInfo, error) { + iter, err := c.client.GetClosestPeers(ctx, key) + if err != nil { + return nil, err + } + infos := make(chan peer.AddrInfo) + go func() { + defer iter.Close() + defer close(infos) + for iter.Next() { + res := iter.Val() + if res.Err != nil { + logger.Warnf("error iterating peer responses: %s", res.Err) + continue + } + + var addrs []multiaddr.Multiaddr + for _, a := range res.Val.Addrs { + addrs = append(addrs, a.Multiaddr) + } + + // If there are no addresses there's nothing of value to return + if len(addrs) == 0 { + continue + } + + select { + case <-ctx.Done(): + logger.Warnf("aborting GetClosestPeers: %s", ctx.Err()) + return + case infos <- peer.AddrInfo{ + ID: *res.Val.ID, + Addrs: addrs, + }: + } + } + }() + + return infos, nil +} diff --git a/routing/http/contentrouter/contentrouter_test.go b/routing/http/contentrouter/contentrouter_test.go index 839293617..927ee0d44 100644 --- a/routing/http/contentrouter/contentrouter_test.go +++ b/routing/http/contentrouter/contentrouter_test.go @@ -53,6 +53,11 @@ func (m *mockClient) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.R return args.Error(0) } +func (m *mockClient) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + args := m.Called(ctx, key) + return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1) +} + func TestProvide(t *testing.T) { for _, c := range []struct { name string @@ -258,6 +263,102 @@ func TestFindPeerNoPeer(t *testing.T) { require.ErrorIs(t, err, routing.ErrNotFound) } +func TestGetClosestPeers(t *testing.T) { + t.Run("returns a channel and can read all results", func(t *testing.T) { + ctx := context.Background() + client := &mockClient{} + crc := NewContentRoutingClient(client) + + key := makeCID() + + // Mock response with two peer records + peer1 := peer.ID("peer1") + peer2 := peer.ID("peer2") + addr1 := multiaddr.StringCast("/ip4/1.2.3.4/tcp/1234") + addr2 := multiaddr.StringCast("/ip4/5.6.7.8/tcp/5678") + addrs1 := []types.Multiaddr{{Multiaddr: addr1}} + addrs2 := []types.Multiaddr{{Multiaddr: addr2}} + peerRec1 := &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &peer1, + Addrs: addrs1, + Protocols: []string{"transport-bitswap"}, + } + peerRec2 := &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &peer2, + Addrs: addrs2, + Protocols: []string{"transport-bitswap"}, + } + + peerIter := iter.ToResultIter[*types.PeerRecord](iter.FromSlice([]*types.PeerRecord{peerRec1, peerRec2})) + + client.On("GetClosestPeers", ctx, key).Return(peerIter, nil) + + infos, err := crc.GetClosestPeers(ctx, key) + require.NoError(t, err) + + var actual []peer.AddrInfo + for info := range infos { + actual = append(actual, info) + } + + expected := []peer.AddrInfo{ + {ID: peer1, Addrs: []multiaddr.Multiaddr{addr1}}, + {ID: peer2, Addrs: []multiaddr.Multiaddr{addr2}}, + } + + assert.Equal(t, expected, actual) + }) + + t.Run("returns no results if addrs is empty", func(t *testing.T) { + ctx := context.Background() + client := &mockClient{} + crc := NewContentRoutingClient(client) + + key := makeCID() + + peer1 := peer.ID("peer1") + peerRec1 := &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &peer1, + Protocols: []string{"transport-bitswap"}, + // no addresses + } + + // Mock response with an empty iterator + peerIter := iter.ToResultIter[*types.PeerRecord](iter.FromSlice([]*types.PeerRecord{peerRec1})) + + client.On("GetClosestPeers", ctx, key).Return(peerIter, nil) + + infos, err := crc.GetClosestPeers(ctx, key) + require.NoError(t, err) + + var actual []peer.AddrInfo + for info := range infos { + actual = append(actual, info) + } + + assert.Empty(t, actual) + }) + + t.Run("returns an error if call errors", func(t *testing.T) { + ctx := context.Background() + client := &mockClient{} + crc := NewContentRoutingClient(client) + + key := makeCID() + + // Mock error response + peerIter := iter.ToResultIter[*types.PeerRecord](iter.FromSlice([]*types.PeerRecord{})) + client.On("GetClosestPeers", ctx, key).Return(peerIter, assert.AnError) + + infos, err := crc.GetClosestPeers(ctx, key) + require.ErrorIs(t, err, assert.AnError) + assert.Nil(t, infos) + }) +} + func makeName(t *testing.T) (crypto.PrivKey, ipns.Name) { sk, _, err := crypto.GenerateEd25519Key(rand.Reader) require.NoError(t, err) diff --git a/routing/http/server/server.go b/routing/http/server/server.go index 7e96d58f2..ceb75b819 100644 --- a/routing/http/server/server.go +++ b/routing/http/server/server.go @@ -47,10 +47,11 @@ const ( var logger = logging.Logger("routing/http/server") const ( - providePath = "/routing/v1/providers/" - findProvidersPath = "/routing/v1/providers/{cid}" - findPeersPath = "/routing/v1/peers/{peer-id}" - GetIPNSPath = "/routing/v1/ipns/{cid}" + providePath = "/routing/v1/providers/" + findProvidersPath = "/routing/v1/providers/{cid}" + findPeersPath = "/routing/v1/peers/{peer-id}" + getIPNSPath = "/routing/v1/ipns/{cid}" + getClosestPeersPath = "/routing/v1/dht/closest/peers/{key}" ) type FindProvidersAsyncResponse struct { @@ -58,7 +59,14 @@ type FindProvidersAsyncResponse struct { Error error } -type ContentRouter interface { +// DelegatedRouter provides the Delegated Routing V1 HTTP API for offloading +// routing operations to another process/server. +// +// This interface focuses on querying operations for content providers, peers, +// IPNS records, and DHT routing information. It also supports delegated IPNS +// publishing. Additional publishing methods may be added in the future via the +// IPIP process as the ecosystem evolves and new needs arise. +type DelegatedRouter interface { // FindProviders searches for peers who are able to provide the given [cid.Cid]. // Limit indicates the maximum amount of results to return; 0 means unbounded. FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) @@ -78,8 +86,15 @@ type ContentRouter interface { // PutIPNS stores the provided [ipns.Record] for the given [ipns.Name]. // It is guaranteed that the record matches the provided name. PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error + + // GetClosestPeers returns the DHT closest peers to the given key (CID or Peer ID). + GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) } +// ContentRouter is deprecated, use DelegatedRouter instead. +// Deprecated: use DelegatedRouter. ContentRouter will be removed in a future version. +type ContentRouter = DelegatedRouter + // Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: // // [IPIP-378]: https://github.com/ipfs/specs/pull/378 @@ -181,8 +196,9 @@ func Handler(svc ContentRouter, opts ...Option) http.Handler { r.Handle(findProvidersPath, middlewarestd.Handler(findProvidersPath, mdlw, http.HandlerFunc(server.findProviders))).Methods(http.MethodGet) r.Handle(providePath, middlewarestd.Handler(providePath, mdlw, http.HandlerFunc(server.provide))).Methods(http.MethodPut) r.Handle(findPeersPath, middlewarestd.Handler(findPeersPath, mdlw, http.HandlerFunc(server.findPeers))).Methods(http.MethodGet) - r.Handle(GetIPNSPath, middlewarestd.Handler(GetIPNSPath, mdlw, http.HandlerFunc(server.GetIPNS))).Methods(http.MethodGet) - r.Handle(GetIPNSPath, middlewarestd.Handler(GetIPNSPath, mdlw, http.HandlerFunc(server.PutIPNS))).Methods(http.MethodPut) + r.Handle(getIPNSPath, middlewarestd.Handler(getIPNSPath, mdlw, http.HandlerFunc(server.GetIPNS))).Methods(http.MethodGet) + r.Handle(getIPNSPath, middlewarestd.Handler(getIPNSPath, mdlw, http.HandlerFunc(server.PutIPNS))).Methods(http.MethodPut) + r.Handle(getClosestPeersPath, middlewarestd.Handler(getClosestPeersPath, mdlw, http.HandlerFunc(server.getClosestPeers))).Methods(http.MethodGet) return r } @@ -190,7 +206,7 @@ func Handler(svc ContentRouter, opts ...Option) http.Handler { var handlerCount atomic.Int32 type server struct { - svc ContentRouter + svc DelegatedRouter disableNDJSON bool recordsLimit int streamingRecordsLimit int @@ -313,30 +329,7 @@ func (s *server) findProvidersNDJSON(w http.ResponseWriter, provIter iter.Result func (s *server) findPeers(w http.ResponseWriter, r *http.Request) { pidStr := mux.Vars(r)["peer-id"] - - // While specification states that peer-id is expected to be in CIDv1 format, reality - // is the clients will often learn legacy PeerID string from other sources, - // and try to use it. - // See https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation - // We are liberal in inputs here, and uplift legacy PeerID to CID if necessary. - // Rationale: it is better to fix this common mistake than to error and break peer routing. - - // Attempt to parse PeerID - pid, err := peer.Decode(pidStr) - if err != nil { - // Retry by parsing PeerID as CID, then setting codec to libp2p-key - // and turning that back to PeerID. - // This is necessary to make sure legacy keys like: - // - RSA QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N - // - ED25519 12D3KooWD3eckifWpRn9wQpMG9R9hX3sD158z7EqHWmweQAJU5SA - // are parsed correctly. - pidAsCid, err2 := cid.Decode(pidStr) - if err2 == nil { - pidAsCid = cid.NewCidV1(cid.Libp2pKey, pidAsCid.Hash()) - pid, err = peer.FromCid(pidAsCid) - } - } - + pid, err := parsePeerID(pidStr) if err != nil { writeErr(w, "FindPeers", http.StatusBadRequest, fmt.Errorf("unable to parse PeerID %q: %w", pidStr, err)) return @@ -608,6 +601,62 @@ func (s *server) PutIPNS(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +func (s *server) getClosestPeers(w http.ResponseWriter, r *http.Request) { + keyStr := mux.Vars(r)["key"] + c, err := parseKey(keyStr) + if err != nil { + writeErr(w, "GetClosestPeers", http.StatusBadRequest, fmt.Errorf("unable to parse key %q: %w", keyStr, err)) + return + } + + mediaType, err := s.detectResponseType(r) + if err != nil { + writeErr(w, "GetClosestPeers", http.StatusBadRequest, err) + return + } + + var handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[*types.PeerRecord]) + + if mediaType == mediaTypeNDJSON { + handlerFunc = s.getClosestPeersNDJSON + } else { + handlerFunc = s.getClosestPeersJSON + } + + // Add timeout to the routing operation + ctx, cancel := context.WithTimeout(r.Context(), s.routingTimeout) + defer cancel() + + provIter, err := s.svc.GetClosestPeers(ctx, c) + if err != nil { + if errors.Is(err, routing.ErrNotFound) { + // handlerFunc takes care of setting the 404 and necessary headers + provIter = iter.FromSlice([]iter.Result[*types.PeerRecord]{}) + } else { + writeErr(w, "GetClosestPeers", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) + return + } + } + handlerFunc(w, provIter) +} + +func (s *server) getClosestPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) { + defer peersIter.Close() + peers, err := iter.ReadAllResults(peersIter) + if err != nil { + writeErr(w, "GetClosestPeers", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) + return + } + + writeJSONResult(w, "GetClosestPeers", jsontypes.PeersResponse{ + Peers: peers, + }) +} + +func (s *server) getClosestPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) { + writeResultsIterNDJSON(w, peersIter) +} + var ( // Rule-of-thumb Cache-Control policy is to work well with caching proxies and load balancers. // If there are any results, cache on the client for longer, and hint any in-between caches to @@ -618,6 +667,61 @@ var ( maxStale = int((48 * time.Hour).Seconds()) // allow stale results as long within Amino DHT Expiration window ) +func parsePeerID(pidStr string) (peer.ID, error) { + // While specification states that peer-id is expected to be in CIDv1 format, reality + // is the clients will often learn legacy PeerID string from other sources, + // and try to use it. + // See https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation + // We are liberal in inputs here, and uplift legacy PeerID to CID if necessary. + // Rationale: it is better to fix this common mistake than to error and break peer routing. + + // Attempt to parse PeerID + pid, err := peer.Decode(pidStr) + if err != nil { + // Retry by parsing PeerID as CID, then setting codec to libp2p-key + // and turning that back to PeerID. + // This is necessary to make sure legacy keys like: + // - RSA QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N + // - ED25519 12D3KooWD3eckifWpRn9wQpMG9R9hX3sD158z7EqHWmweQAJU5SA + // are parsed correctly. + pidAsCid, err2 := cid.Decode(pidStr) + if err2 == nil { + pidAsCid = cid.NewCidV1(cid.Libp2pKey, pidAsCid.Hash()) + pid, err = peer.FromCid(pidAsCid) + } + } + return pid, err +} + +// parseKey parses a string that can be either a CID or a PeerID. +// It accepts the following formats: +// - Arbitrary CIDs (e.g., bafkreidcd7frenco2m6ch7mny63wztgztv3q6fctaffgowkro6kljre5ei) +// - CIDv1 with libp2p-key codec (e.g., bafzaajaiaejca...) +// - Base58-encoded PeerIDs (e.g., 12D3KooW... or QmYyQ...) +// +// This function is used by endpoints that accept "key" path parameters, where +// the key can represent either content (CID) or a peer (PeerID). +// +// Returns the key as a CID. PeerIDs are converted to CIDv1 with libp2p-key codec. +// Note: only use where the multihash digest of the returned CID is relevant. +func parseKey(keyStr string) (cid.Cid, error) { + // Try parsing as PeerID first using peer.Decode (not parsePeerID, which is too liberal) + // This handles legacy PeerID formats per: + // https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation + pid, pidErr := peer.Decode(keyStr) + if pidErr == nil { + return peer.ToCid(pid), nil + } + + // Fall back to parsing as CID (handles arbitrary CIDs and CIDv1 libp2p-key format) + c, cidErr := cid.Decode(keyStr) + if cidErr == nil { + return c, nil + } + + return cid.Cid{}, fmt.Errorf("unable to parse as CID or PeerID: %w", errors.Join(cidErr, pidErr)) +} + func setCacheControl(w http.ResponseWriter, maxAge int, stale int) { w.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d, stale-while-revalidate=%d, stale-if-error=%d", maxAge, stale, stale)) } diff --git a/routing/http/server/server_test.go b/routing/http/server/server_test.go index 29e259925..cc17cb5bf 100644 --- a/routing/http/server/server_test.go +++ b/routing/http/server/server_test.go @@ -668,6 +668,345 @@ func TestPeers(t *testing.T) { } } +func TestParseKey(t *testing.T) { + t.Run("parses arbitrary CID", func(t *testing.T) { + cidStr := "bafkreidcd7frenco2m6ch7mny63wztgztv3q6fctaffgowkro6kljre5ei" + expectedCID, err := cid.Decode(cidStr) + require.NoError(t, err) + + parsedCID, err := parseKey(cidStr) + require.NoError(t, err) + require.Equal(t, expectedCID, parsedCID) + }) + + t.Run("parses Ed25519 PeerID as CIDv1 libp2p-key", func(t *testing.T) { + // Example from libp2p specs + // https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation + cidStr := "bafzbeie5745rpv2m6tjyuugywy4d5ewrqgqqhfnf445he3omzpjbx5xqxe" + pid, err := peer.Decode(cidStr) + require.NoError(t, err) + expectedCID := peer.ToCid(pid) + + parsedCID, err := parseKey(cidStr) + require.NoError(t, err) + require.Equal(t, expectedCID, parsedCID) + }) + + t.Run("parses Ed25519 PeerID as Base58", func(t *testing.T) { + // Example from libp2p specs (identity multihash) + // https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation + pidStr := "12D3KooWD3eckifWpRn9wQpMG9R9hX3sD158z7EqHWmweQAJU5SA" + pid, err := peer.Decode(pidStr) + require.NoError(t, err) + expectedCID := peer.ToCid(pid) + + parsedCID, err := parseKey(pidStr) + require.NoError(t, err) + require.Equal(t, expectedCID, parsedCID) + }) + + t.Run("parses RSA PeerID as CIDv1 libp2p-key", func(t *testing.T) { + // RSA PeerID starting with "Qm" encoded as CIDv1 + // https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation + pidStr := "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N" + pid, err := peer.Decode(pidStr) + require.NoError(t, err) + // Convert to CIDv1 representation + cidStr := peer.ToCid(pid).String() + expectedCID := peer.ToCid(pid) + + parsedCID, err := parseKey(cidStr) + require.NoError(t, err) + require.Equal(t, expectedCID, parsedCID) + }) + + t.Run("parses RSA PeerID as Base58", func(t *testing.T) { + // Example from libp2p specs (SHA256-based) + // https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation + pidStr := "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N" + pid, err := peer.Decode(pidStr) + require.NoError(t, err) + expectedCID := peer.ToCid(pid) + + parsedCID, err := parseKey(pidStr) + require.NoError(t, err) + require.Equal(t, expectedCID, parsedCID) + }) + + t.Run("returns error for invalid string", func(t *testing.T) { + _, err := parseKey("not-a-valid-cid-or-peerid") + require.Error(t, err) + require.Contains(t, err.Error(), "unable to parse as CID or PeerID") + }) +} + +func TestGetClosestPeers(t *testing.T) { + makeRequest := func(t *testing.T, router *mockContentRouter, contentType, arg string) *http.Response { + server := httptest.NewServer(Handler(router)) + t.Cleanup(server.Close) + + urlStr := fmt.Sprintf("http://%s/routing/v1/dht/closest/peers/%s", server.Listener.Addr().String(), arg) + t.Log(urlStr) + + req, err := http.NewRequest(http.MethodGet, urlStr, nil) + require.NoError(t, err) + if contentType != "" { + req.Header.Set("Accept", contentType) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + return resp + } + + t.Run("GET /routing/v1/dht/closest/peers/{non-cid} returns 400", func(t *testing.T) { + t.Parallel() + + router := &mockContentRouter{} + resp := makeRequest(t, router, mediaTypeJSON, "nonpeerid") + require.Equal(t, 400, resp.StatusCode) + }) + + t.Run("GET /routing/v1/dht/closest/peers/{cid} returns 200 with correct body and headers (No Results, explicit JSON)", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{}) + + router := &mockContentRouter{} + router.On("GetClosestPeers", mock.Anything, key).Return(results, nil) + + resp := makeRequest(t, router, mediaTypeJSON, key.String()) + require.Equal(t, 200, resp.StatusCode) + + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + require.Equal(t, "Accept", resp.Header.Get("Vary")) + require.Equal(t, "public, max-age=15, stale-while-revalidate=172800, stale-if-error=172800", resp.Header.Get("Cache-Control")) + + requireCloseToNow(t, resp.Header.Get("Last-Modified")) + }) + + t.Run("GET /routing/v1/dht/closest/peers/{cid} returns 200 with correct body and headers (No Results, implicit JSON, wildcard Accept header)", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{}) + + router := &mockContentRouter{} + router.On("GetClosestPeers", mock.Anything, key).Return(results, nil) + + // Simulate request with Accept header that includes wildcard match + resp := makeRequest(t, router, "text/html,*/*", key.String()) + + // Expect response to default to application/json + require.Equal(t, 200, resp.StatusCode) + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + }) + + t.Run("GET /routing/v1/dht/closest/peers/{cid} returns 200 with correct body and headers (No Results, implicit JSON, no Accept header)", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{}) + + router := &mockContentRouter{} + router.On("GetClosestPeers", mock.Anything, key).Return(results, nil) + + // Simulate request without Accept header + resp := makeRequest(t, router, "", key.String()) + + // Expect response to default to application/json + require.Equal(t, 200, resp.StatusCode) + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + }) + + t.Run("GET /routing/v1/dht/closest/peers/{cid} returns 200 when router returns routing.ErrNotFound", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + + router := &mockContentRouter{} + router.On("GetClosestPeers", mock.Anything, key).Return(nil, routing.ErrNotFound) + + // Simulate request without Accept header + resp := makeRequest(t, router, "", key.String()) + + // Expect response to default to application/json + require.Equal(t, 200, resp.StatusCode) + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + }) + + t.Run("GET /routing/v1/dht/closest/peers/{cid} returns 200 with correct body and headers (JSON)", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-bitswap", "transport-foo"}, + Addrs: []types.Multiaddr{}, + }}, + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-foo"}, + Addrs: []types.Multiaddr{}, + }}, + }) + + router := &mockContentRouter{} + router.On("GetClosestPeers", mock.Anything, key).Return(results, nil) + + resp := makeRequest(t, router, mediaTypeJSON, key.String()) + require.Equal(t, 200, resp.StatusCode) + + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + require.Equal(t, "Accept", resp.Header.Get("Vary")) + require.Equal(t, "public, max-age=300, stale-while-revalidate=172800, stale-if-error=172800", resp.Header.Get("Cache-Control")) + + requireCloseToNow(t, resp.Header.Get("Last-Modified")) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + expectedBody := `{"Peers":[{"Addrs":[],"ID":"` + pid.String() + `","Protocols":["transport-bitswap","transport-foo"],"Schema":"peer"},{"Addrs":[],"ID":"` + pid.String() + `","Protocols":["transport-foo"],"Schema":"peer"}]}` + require.Equal(t, expectedBody, string(body)) + }) + + t.Run("GET /routing/v1/dht/closest/peers/{cid} returns 200 with correct body and headers (No Results, NDJSON)", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{}) + + router := &mockContentRouter{} + router.On("GetClosestPeers", mock.Anything, key).Return(results, nil) + + resp := makeRequest(t, router, mediaTypeNDJSON, key.String()) + require.Equal(t, 200, resp.StatusCode) + + require.Equal(t, mediaTypeNDJSON, resp.Header.Get("Content-Type")) + require.Equal(t, "Accept", resp.Header.Get("Vary")) + require.Equal(t, "public, max-age=15, stale-while-revalidate=172800, stale-if-error=172800", resp.Header.Get("Cache-Control")) + + requireCloseToNow(t, resp.Header.Get("Last-Modified")) + }) + + t.Run("GET /routing/v1/dht/closest/peers/{cid} returns 200 with correct body and headers (NDJSON)", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-bitswap", "transport-foo"}, + Addrs: []types.Multiaddr{}, + }}, + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-foo"}, + Addrs: []types.Multiaddr{}, + }}, + }) + + router := &mockContentRouter{} + router.On("GetClosestPeers", mock.Anything, key).Return(results, nil) + + resp := makeRequest(t, router, mediaTypeNDJSON, key.String()) + require.Equal(t, 200, resp.StatusCode) + + require.Equal(t, mediaTypeNDJSON, resp.Header.Get("Content-Type")) + require.Equal(t, "Accept", resp.Header.Get("Vary")) + require.Equal(t, "public, max-age=300, stale-while-revalidate=172800, stale-if-error=172800", resp.Header.Get("Cache-Control")) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + expectedBody := `{"Addrs":[],"ID":"` + pid.String() + `","Protocols":["transport-bitswap","transport-foo"],"Schema":"peer"}` + "\n" + `{"Addrs":[],"ID":"` + pid.String() + `","Protocols":["transport-foo"],"Schema":"peer"}` + "\n" + require.Equal(t, expectedBody, string(body)) + }) + + // Test matrix that runs the HTTP 200 scenario against different key formats. + // The test verifies that GetClosestPeers is called with a CID whose digest matches the expected value, + // regardless of the CID codec. This is correct because DHT operations only use the digest. + // per https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation + keyTestCases := []struct { + keyType string + keyStr string + expectedDigest string // hex-encoded multihash digest + }{ + // Examples from libp2p spec + // https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation + {"cidv1-libp2p-key-ed25519-peerid", "bafzbeie5745rpv2m6tjyuugywy4d5ewrqgqqhfnf445he3omzpjbx5xqxe", "12209dff3b17d74cf4d38a50d8b6383e92d181a10395a5e73a726dcccbd21bf6f0b9"}, + {"base58-ed25519-peerid", "12D3KooWD3eckifWpRn9wQpMG9R9hX3sD158z7EqHWmweQAJU5SA", "0024080112202ffa35a99d3a3cfbb17bb7c1dc5561b18a8dcca4df38dc613ea859c37eb1336b"}, + {"base58-rsa-peerid", "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N", "12209dff3b17d74cf4d38a50d8b6383e92d181a10395a5e73a726dcccbd21bf6f0b9"}, + // Arbitrary CID (not a PeerID) + {"arbitrary-cid", "bafkreidcd7frenco2m6ch7mny63wztgztv3q6fctaffgowkro6kljre5ei", "1220621fcb12344ed33c23fd8dc7b76cccd99d770f1453014a6759517794b4c49d22"}, + } + + for _, tc := range keyTestCases { + // Parse the key to get the actual digest + parsedKey, err := parseKey(tc.keyStr) + require.NoError(t, err) + actualDigest := parsedKey.Hash() + + // Verify it matches expected + require.Equal(t, tc.expectedDigest, actualDigest.HexString()) + + // Create a PeerID from the digest for response records + pid, err := peer.IDFromBytes(actualDigest) + require.NoError(t, err) + + results := []iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-bitswap", "transport-foo"}, + Addrs: []types.Multiaddr{}, + }}, + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-foo"}, + Addrs: []types.Multiaddr{}, + }}, + } + + t.Run("GET /routing/v1/dht/closest/peers/{"+tc.keyType+"} returns 200 with correct body and headers (JSON)", func(t *testing.T) { + t.Parallel() + + router := &mockContentRouter{} + // Use mock.MatchedBy to verify the digest matches, regardless of codec + router.On("GetClosestPeers", mock.Anything, mock.MatchedBy(func(key cid.Cid) bool { + return bytes.Equal(key.Hash(), actualDigest) + })).Return(iter.FromSlice(results), nil) + + resp := makeRequest(t, router, mediaTypeJSON, tc.keyStr) + require.Equal(t, http.StatusOK, resp.StatusCode) + + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + require.Equal(t, "Accept", resp.Header.Get("Vary")) + require.Equal(t, "public, max-age=300, stale-while-revalidate=172800, stale-if-error=172800", resp.Header.Get("Cache-Control")) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + expectedBody := `{"Peers":[{"Addrs":[],"ID":"` + pid.String() + `","Protocols":["transport-bitswap","transport-foo"],"Schema":"peer"},{"Addrs":[],"ID":"` + pid.String() + `","Protocols":["transport-foo"],"Schema":"peer"}]}` + require.Equal(t, expectedBody, string(body)) + }) + } +} + func makeName(t *testing.T) (crypto.PrivKey, ipns.Name) { sk, pid := makeEd25519PeerID(t) return sk, ipns.NameFromPeer(pid) @@ -929,3 +1268,12 @@ func (m *mockContentRouter) PutIPNS(ctx context.Context, name ipns.Name, record args := m.Called(ctx, name, record) return args.Error(0) } + +func (m *mockContentRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + args := m.Called(ctx, key) + a := args.Get(0) + if a == nil { + return nil, args.Error(1) + } + return a.(iter.ResultIter[*types.PeerRecord]), args.Error(1) +}