Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@ The following emojis are used to highlight certain changes:

### Added

- Added `/routing/v1/peers/closest/{key}` endpoint implementing [IPIP-476](https://github.com/ipfs/specs/pull/476)
- Returns DHT-closest peers to a given CID or PeerID
- Accepts both CID and legacy PeerID formats (e.g., `12D3KooW...`)
- Uses WAN DHT only for more reliable results
- Includes cached addresses in results when available

### Changed

- [go-libp2p v0.45.0](https://github.com/libp2p/go-libp2p/releases/tag/v0.45.0)
- [go-libp2p-kad-dht v0.35.1](https://github.com/libp2p/go-libp2p-kad-dht/releases/tag/v0.35.1)
- [boxo v0.35.2](https://github.com/ipfs/boxo/releases/tag/v0.35.2)
- [go-log/v2 v2.9.0](https://github.com/ipfs/go-log/releases/tag/v2.9.0)
- CLI commands (`ask` subcommands) now default to `delegated-ipfs.dev` instead of `cid.contact` to ensure both IPNI and DHT results are returned without daemon running

### Removed

Expand Down
41 changes: 41 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,47 @@ func findPeers(ctx context.Context, pid peer.ID, endpoint string, prettyOutput b
return nil
}

func getClosestPeers(ctx context.Context, key cid.Cid, endpoint string, prettyOutput bool) error {
drc, err := client.New(endpoint)
if err != nil {
return err
}

recordsIter, err := drc.GetClosestPeers(ctx, key)
if err != nil {
return err
}
defer recordsIter.Close()

for recordsIter.Next() {
res := recordsIter.Val()

// Check for error, but do not complain if we exceeded the timeout. We are
// expecting that to happen: we explicitly defined a timeout.
if res.Err != nil {
if !errors.Is(res.Err, context.DeadlineExceeded) {
return res.Err
}

return nil
}

if prettyOutput {
fmt.Fprintln(os.Stdout, res.Val.ID)
fmt.Fprintln(os.Stdout, "\tProtocols:", res.Val.Protocols)
fmt.Fprintln(os.Stdout, "\tAddresses:", res.Val.Addrs)
fmt.Fprintln(os.Stdout)
} else {
err := json.NewEncoder(os.Stdout).Encode(res.Val)
if err != nil {
return err
}
}
}

return nil
}

func getIPNS(ctx context.Context, name ipns.Name, endpoint string, prettyOutput bool) error {
drc, err := client.New(endpoint)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/felixge/httpsnoop v1.0.4
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/ipfs/boxo v0.35.2
github.com/ipfs/boxo v0.35.3-0.20251112034801-918fee9e707f
github.com/ipfs/go-cid v0.5.0
github.com/ipfs/go-log/v2 v2.9.0
github.com/libp2p/go-libp2p v0.45.0
Expand Down Expand Up @@ -70,7 +70,7 @@ require (
github.com/libp2p/go-flow-metrics v0.3.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e // indirect
github.com/libp2p/go-libp2p-xor v0.1.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-netroute v0.3.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ipfs/boxo v0.35.2 h1:0QZJJh6qrak28abENOi5OA8NjBnZM4p52SxeuIDqNf8=
github.com/ipfs/boxo v0.35.2/go.mod h1:bZn02OFWwJtY8dDW9XLHaki59EC5o+TGDECXEbe1w8U=
github.com/ipfs/boxo v0.35.3-0.20251112034801-918fee9e707f h1:JWib13kzQSoe+vZQIF9TpIRTOWCsGqnYr2UuvdLOG3M=
github.com/ipfs/boxo v0.35.3-0.20251112034801-918fee9e707f/go.mod h1:xUfw18JMtTMIDLW1FouIGVcDq7lL2+1KC7+bYnCK2g0=
github.com/ipfs/go-block-format v0.2.3 h1:mpCuDaNXJ4wrBJLrtEaGFGXkferrw5eqVvzaHhtFKQk=
github.com/ipfs/go-block-format v0.2.3/go.mod h1:WJaQmPAKhD3LspLixqlqNFxiZ3BZ3xgqxxoSR/76pnA=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
Expand Down Expand Up @@ -356,8 +356,8 @@ github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduS
github.com/libp2p/go-libp2p-peerstore v0.1.4/go.mod h1:+4BDbDiiKf4PzpANZDAT+knVdLxvqh7hXOujessqdzs=
github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg=
github.com/libp2p/go-libp2p-record v0.3.1/go.mod h1:T8itUkLcWQLCYMqtX7Th6r7SexyUJpIyPgks757td/E=
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 h1:HdwZj9NKovMx0vqq6YNPTh6aaNzey5zHD7HeLJtq6fI=
github.com/libp2p/go-libp2p-routing-helpers v0.7.5/go.mod h1:3YaxrwP0OBPDD7my3D0KxfR89FlcX/IEbxDEDfAmj98=
github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e h1:6DSfN9gsAmBa1iyAKwIuk9GlEga45iH8MBmuYAuXmpU=
github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e/go.mod h1:Q1VSaOawgsvaa3hGl/PejADIhl2deiqSEsQDpB3Ggss=
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg=
github.com/libp2p/go-libp2p-xor v0.1.0 h1:hhQwT4uGrBcuAkUGXADuPltalOdpf9aag9kaYNT2tLA=
Expand Down
49 changes: 48 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ import (
"github.com/urfave/cli/v2"
)

// cidContactEndpoint is the default for daemon mode (start command).
// Used for proxying provider requests in addition to results from the local DHT client.
const cidContactEndpoint = "https://cid.contact"

// delegatedIPFSEndpoint is the default for CLI mode (ask command).
// Used as the only source of routing results when no local DHT is available.
const delegatedIPFSEndpoint = "https://delegated-ipfs.dev"

func main() {
app := &cli.App{
Name: name,
Expand Down Expand Up @@ -209,7 +215,7 @@ func main() {
Flags: []cli.Flag{
&cli.StringFlag{
Name: "endpoint",
Value: cidContactEndpoint,
Value: delegatedIPFSEndpoint,
Usage: "the Delegated Routing V1 endpoint to ask",
},
&cli.BoolFlag{
Expand Down Expand Up @@ -251,6 +257,22 @@ func main() {
return findPeers(ctx.Context, pid, ctx.String("endpoint"), ctx.Bool("pretty"))
},
},
{
Name: "getclosestpeers",
Usage: "getclosestpeers <key>",
UsageText: "Find DHT-closest peers to a key (CID or peer ID)",
Action: func(ctx *cli.Context) error {
if ctx.NArg() != 1 {
return errors.New("invalid command, see help")
}
keyStr := ctx.Args().Get(0)
c, err := parseKey(keyStr)
if err != nil {
return err
}
return getClosestPeers(ctx.Context, c, ctx.String("endpoint"), ctx.Bool("pretty"))
},
},
{
Name: "getipns",
Usage: "getipns <ipns-id>",
Expand Down Expand Up @@ -305,3 +327,28 @@ func printIfListConfigured(message string, list []string) {
fmt.Printf(message+"%v\n", strings.Join(list, ", "))
}
}

// parseKey parses a string that can be either a CID or a PeerID.
// It accepts the following formats:
// - Arbitrary CIDs (e.g., bafkreidcd7frenco2m6ch7mny63wztgztv3q6fctaffgowkro6kljre5ei)
// - CIDv1 with libp2p-key codec (e.g., bafzaajaiaejca...)
// - Base58-encoded PeerIDs (e.g., 12D3KooW... or QmYyQ...)
//
// Returns the key as a CID. PeerIDs are converted to CIDv1 with libp2p-key codec.
func parseKey(keyStr string) (cid.Cid, error) {
// Try parsing as PeerID first using peer.Decode
// This handles legacy PeerID formats per:
// https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation
pid, pidErr := peer.Decode(keyStr)
if pidErr == nil {
return peer.ToCid(pid), nil
}

// Fall back to parsing as CID (handles arbitrary CIDs and CIDv1 libp2p-key format)
c, cidErr := cid.Parse(keyStr)
if cidErr == nil {
return c, nil
}

return cid.Cid{}, fmt.Errorf("unable to parse as CID or PeerID: %w", errors.Join(cidErr, pidErr))
}
18 changes: 12 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,22 @@ func start(ctx context.Context, cfg *config) error {
}
}

crRouters, err := getCombinedRouting(cfg.contentEndpoints, dhtRouting, cachedAddrBook, blockProviderRouters)
crRouters, err := getCombinedRouting(cfg.contentEndpoints, h, dhtRouting, cachedAddrBook, blockProviderRouters)
if err != nil {
return err
}

prRouters, err := getCombinedRouting(cfg.peerEndpoints, dhtRouting, cachedAddrBook, nil)
prRouters, err := getCombinedRouting(cfg.peerEndpoints, h, dhtRouting, cachedAddrBook, nil)
if err != nil {
return err
}

ipnsRouters, err := getCombinedRouting(cfg.ipnsEndpoints, dhtRouting, cachedAddrBook, nil)
ipnsRouters, err := getCombinedRouting(cfg.ipnsEndpoints, h, dhtRouting, cachedAddrBook, nil)
if err != nil {
return err
}

dhtRouters, err := getCombinedRouting(nil, h, dhtRouting, cachedAddrBook, nil)
if err != nil {
return err
}
Expand All @@ -200,6 +205,7 @@ func start(ctx context.Context, cfg *config) error {
providers: crRouters,
peers: prRouters,
ipns: ipnsRouters,
dht: dhtRouters,
}, handlerOpts...)

// Add CORS.
Expand Down Expand Up @@ -303,14 +309,14 @@ func newHost(cfg *config) (host.Host, error) {
return h, nil
}

func getCombinedRouting(endpoints []string, dht routing.Routing, cachedAddrBook *cachedAddrBook, additionalRouters []router) (router, error) {
func getCombinedRouting(endpoints []string, host host.Host, dht routing.Routing, cachedAddrBook *cachedAddrBook, additionalRouters []router) (router, error) {
var dhtRouter router

if cachedAddrBook != nil {
cachedRouter := NewCachedRouter(libp2pRouter{routing: dht}, cachedAddrBook)
cachedRouter := NewCachedRouter(libp2pRouter{host: host, routing: dht}, cachedAddrBook)
dhtRouter = sanitizeRouter{cachedRouter}
} else if dht != nil {
dhtRouter = sanitizeRouter{libp2pRouter{routing: dht}}
dhtRouter = sanitizeRouter{libp2pRouter{host: host, routing: dht}}
}

if len(endpoints) == 0 && len(additionalRouters) == 0 {
Expand Down
55 changes: 47 additions & 8 deletions server_cached_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ const (
addrCacheStateHit = "hit"
addrCacheStateMiss = "miss"

// source=providers|peers indicates if query originated from provider or peer endpoint
addrQueryOriginLabel = "origin"
addrQueryOriginProviders = "providers"
addrQueryOriginPeers = "peers"
addrQueryOriginUnknown = "unknown"
// source=providers|peers|closest indicates if query originated from provider, peer, or closest peers endpoint
addrQueryOriginLabel = "origin"
addrQueryOriginProviders = "providers"
addrQueryOriginPeers = "peers"
addrQueryOriginClosestPeers = "closest"
addrQueryOriginUnknown = "unknown"

DispatchedFindPeersTimeout = time.Minute
)
Expand All @@ -64,7 +65,7 @@ func (r cachedRouter) FindProviders(ctx context.Context, key cid.Cid, limit int)
return nil, err
}

iter := NewCacheFallbackIter(it, r, ctx)
iter := NewCacheFallbackIter(it, r, ctx, addrQueryOriginProviders)
return iter, nil
}

Expand All @@ -88,6 +89,42 @@ func (r cachedRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (it
return it, nil
}

func (r cachedRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) {
it, err := r.router.GetClosestPeers(ctx, key)
if err != nil {
return nil, err
}

return r.applyPeerRecordCaching(it, ctx, addrQueryOriginClosestPeers), nil
}

// applyPeerRecordCaching applies cache fallback logic to a PeerRecord iterator
// by converting to Record iterator, applying caching, and converting back
func (r cachedRouter) applyPeerRecordCaching(it iter.ResultIter[*types.PeerRecord], ctx context.Context, queryOrigin string) iter.ResultIter[*types.PeerRecord] {
// Convert *types.PeerRecord to types.Record
recordIter := iter.Map(it, func(v iter.Result[*types.PeerRecord]) iter.Result[types.Record] {
if v.Err != nil {
return iter.Result[types.Record]{Err: v.Err}
}
return iter.Result[types.Record]{Val: v.Val}
})

// Apply caching
cachedIter := NewCacheFallbackIter(recordIter, r, ctx, queryOrigin)

// Convert back to *types.PeerRecord
return iter.Map(cachedIter, func(v iter.Result[types.Record]) iter.Result[*types.PeerRecord] {
if v.Err != nil {
return iter.Result[*types.PeerRecord]{Err: v.Err}
}
peerRec, ok := v.Val.(*types.PeerRecord)
if !ok {
return iter.Result[*types.PeerRecord]{Err: errors.New("unexpected record type")}
}
return iter.Result[*types.PeerRecord]{Val: peerRec}
})
}

// withAddrsFromCache returns the best list of addrs for specified [peer.ID].
// It will consult cache ONLY if the addrs slice passed to it is empty.
func (r cachedRouter) withAddrsFromCache(queryOrigin string, pid peer.ID, addrs []types.Multiaddr) []types.Multiaddr {
Expand Down Expand Up @@ -116,20 +153,22 @@ type cacheFallbackIter struct {
current iter.Result[types.Record]
findPeersResult chan types.PeerRecord
router cachedRouter
queryOrigin string
ctx context.Context
cancel context.CancelFunc
ongoingLookups atomic.Int32
}

// NewCacheFallbackIter is a wrapper around a results iterator that will resolve peers with no addresses from cache and if no cached addresses, will look them up via FindPeers.
// It's a bit complex because it ensures we continue iterating without blocking on the FindPeers call.
func NewCacheFallbackIter(sourceIter iter.ResultIter[types.Record], router cachedRouter, ctx context.Context) *cacheFallbackIter {
func NewCacheFallbackIter(sourceIter iter.ResultIter[types.Record], router cachedRouter, ctx context.Context, queryOrigin string) *cacheFallbackIter {
// Create a cancellable context for this iterator
iterCtx, cancel := context.WithCancel(ctx)

iter := &cacheFallbackIter{
sourceIter: sourceIter,
router: router,
queryOrigin: queryOrigin,
ctx: iterCtx,
cancel: cancel,
findPeersResult: make(chan types.PeerRecord, 100), // Buffer to avoid drops in typical cases
Expand All @@ -148,7 +187,7 @@ func (it *cacheFallbackIter) Next() bool {
switch val.Val.GetSchema() {
case types.SchemaPeer:
if record, ok := val.Val.(*types.PeerRecord); ok {
record.Addrs = it.router.withAddrsFromCache(addrQueryOriginProviders, *record.ID, record.Addrs)
record.Addrs = it.router.withAddrsFromCache(it.queryOrigin, *record.ID, record.Addrs)
if len(record.Addrs) > 0 {
it.current = iter.Result[types.Record]{Val: record}
return true
Expand Down
Loading