Skip to content

Commit e71f50e

Browse files
hsanjuanlidelgammazero
authored
routing/http: add support for GetClosestPeers (IPIP-476) (#1021)
This adds for GetClosestPeers into the routing packages. Boxo does not implement the actual GetClosestPeers call, but it provides the facilities to provide this endpoint in routing/v1 servers and clients. GetClosestPeers finds the closest peers to a given DHT-Key. We only accept CIDs and Peer.IDs as keys (either as legacy Peer IDs or CID-formatted peer IDs). This helps setting guidelines for the intended usage of this method. Co-authored-by: Marcin Rataj <lidel@lidel.org> Co-authored-by: Andrew Gillis <11790789+gammazero@users.noreply.github.com>
1 parent 58ff268 commit e71f50e

9 files changed

Lines changed: 847 additions & 35 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes:
1616

1717
### Added
1818

19+
- `routing/http`: `GET /routing/v1/dht/closest/peers/{key}` per [IPIP-476](https://github.com/ipfs/specs/pull/476)
20+
1921
### Changed
2022

2123
### Removed

examples/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ require (
7676
github.com/libp2p/go-libp2p-kad-dht v0.35.1 // indirect
7777
github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect
7878
github.com/libp2p/go-libp2p-record v0.3.1 // indirect
79-
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 // indirect
79+
github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e // indirect
8080
github.com/libp2p/go-msgio v0.3.0 // indirect
8181
github.com/libp2p/go-netroute v0.3.0 // indirect
8282
github.com/libp2p/go-reuseport v0.4.0 // indirect

examples/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,8 @@ github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m
227227
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=
228228
github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg=
229229
github.com/libp2p/go-libp2p-record v0.3.1/go.mod h1:T8itUkLcWQLCYMqtX7Th6r7SexyUJpIyPgks757td/E=
230-
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 h1:HdwZj9NKovMx0vqq6YNPTh6aaNzey5zHD7HeLJtq6fI=
231-
github.com/libp2p/go-libp2p-routing-helpers v0.7.5/go.mod h1:3YaxrwP0OBPDD7my3D0KxfR89FlcX/IEbxDEDfAmj98=
230+
github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e h1:6DSfN9gsAmBa1iyAKwIuk9GlEga45iH8MBmuYAuXmpU=
231+
github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e/go.mod h1:Q1VSaOawgsvaa3hGl/PejADIhl2deiqSEsQDpB3Ggss=
232232
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
233233
github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg=
234234
github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0=

routing/http/client/client.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,3 +638,78 @@ func (c *Client) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Recor
638638

639639
return nil
640640
}
641+
642+
// GetClosestPeers obtains the closest peers to the given key (CID or Peer ID).
643+
func (c *Client) GetClosestPeers(ctx context.Context, key cid.Cid) (peers iter.ResultIter[*types.PeerRecord], err error) {
644+
m := newMeasurement("GetClosestPeers")
645+
646+
// Build the base URL path
647+
u, err := gourl.JoinPath(c.baseURL, "routing/v1/dht/closest/peers", key.String())
648+
if err != nil {
649+
return nil, err
650+
}
651+
652+
// Create the HTTP request
653+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
654+
if err != nil {
655+
return nil, err
656+
}
657+
req.Header.Set("Accept", c.accepts)
658+
659+
m.host = req.Host
660+
start := c.clock.Now()
661+
resp, err := c.httpClient.Do(req)
662+
m.latency = c.clock.Since(start)
663+
m.err = err
664+
665+
if err != nil {
666+
m.record(ctx)
667+
return nil, err
668+
}
669+
670+
var skipBodyClose bool
671+
defer func() {
672+
if !skipBodyClose {
673+
resp.Body.Close()
674+
}
675+
}()
676+
677+
m.statusCode = resp.StatusCode
678+
if resp.StatusCode == http.StatusNotFound {
679+
m.record(ctx)
680+
return iter.FromSlice[iter.Result[*types.PeerRecord]](nil), nil
681+
}
682+
683+
if resp.StatusCode != http.StatusOK {
684+
err := httpError(resp.StatusCode, resp.Body)
685+
m.record(ctx)
686+
return nil, err
687+
}
688+
689+
respContentType := resp.Header.Get("Content-Type")
690+
mediaType, _, err := mime.ParseMediaType(respContentType)
691+
if err != nil {
692+
m.err = err
693+
m.record(ctx)
694+
return nil, fmt.Errorf("parsing Content-Type: %w", err)
695+
}
696+
697+
m.mediaType = mediaType
698+
699+
var it iter.ResultIter[*types.PeerRecord]
700+
switch mediaType {
701+
case mediaTypeJSON:
702+
parsedResp := &jsontypes.PeersResponse{}
703+
err = json.NewDecoder(resp.Body).Decode(parsedResp)
704+
var sliceIt iter.Iter[*types.PeerRecord] = iter.FromSlice(parsedResp.Peers)
705+
it = iter.ToResultIter(sliceIt)
706+
case mediaTypeNDJSON:
707+
skipBodyClose = true
708+
it = ndjson.NewPeerRecordsIter(resp.Body)
709+
default:
710+
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
711+
return nil, errors.New("unknown content type")
712+
}
713+
714+
return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil
715+
}

routing/http/client/client_test.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit in
4848
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
4949
}
5050

51+
func (m *mockContentRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) {
52+
args := m.Called(ctx, key)
53+
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
54+
}
55+
5156
func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
5257
args := m.Called(ctx, name)
5358
rec, _ := args.Get(0).(*ipns.Record)
@@ -836,6 +841,132 @@ func TestClient_EmptyResponses(t *testing.T) {
836841
}
837842
}
838843

844+
func TestClient_GetClosestPeers(t *testing.T) {
845+
bitswapPeerRecord := makePeerRecord([]string{"transport-bitswap"})
846+
httpPeerRecord := makePeerRecord([]string{"transport-ipfs-gateway-http"})
847+
848+
peerRecords := []iter.Result[*types.PeerRecord]{
849+
{Val: &bitswapPeerRecord},
850+
{Val: &httpPeerRecord},
851+
}
852+
853+
key := peer.ToCid(*bitswapPeerRecord.ID)
854+
855+
cases := []struct {
856+
name string
857+
httpStatusCode int
858+
stopServer bool
859+
routerResult []iter.Result[*types.PeerRecord]
860+
routerErr error
861+
clientRequiresStreaming bool
862+
serverStreamingDisabled bool
863+
864+
expErrContains osErrContains
865+
expResult []iter.Result[*types.PeerRecord]
866+
expStreamingResponse bool
867+
expJSONResponse bool
868+
}{
869+
{
870+
name: "happy case",
871+
routerResult: peerRecords,
872+
expResult: peerRecords,
873+
expStreamingResponse: true,
874+
},
875+
{
876+
name: "server doesn't support streaming",
877+
routerResult: peerRecords,
878+
expResult: peerRecords,
879+
serverStreamingDisabled: true,
880+
expJSONResponse: true,
881+
},
882+
{
883+
name: "client requires streaming but server doesn't support it",
884+
serverStreamingDisabled: true,
885+
clientRequiresStreaming: true,
886+
expErrContains: osErrContains{expContains: "HTTP error with StatusCode=400: no supported content types"},
887+
},
888+
{
889+
name: "returns an error if there's a non-200 response",
890+
httpStatusCode: 500,
891+
expErrContains: osErrContains{expContains: "HTTP error with StatusCode=500"},
892+
},
893+
{
894+
name: "returns an error if the HTTP client returns a non-HTTP error",
895+
stopServer: true,
896+
expErrContains: osErrContains{
897+
expContains: "connect: connection refused",
898+
expContainsWin: "connectex: No connection could be made because the target machine actively refused it.",
899+
},
900+
},
901+
{
902+
name: "returns no providers if the HTTP server returns a 404 response",
903+
httpStatusCode: 404,
904+
expResult: nil,
905+
},
906+
}
907+
for _, c := range cases {
908+
t.Run(c.name, func(t *testing.T) {
909+
var clientOpts []Option
910+
var serverOpts []server.Option
911+
var onRespReceived []func(*http.Response)
912+
var onReqReceived []func(*http.Request)
913+
914+
if c.serverStreamingDisabled {
915+
serverOpts = append(serverOpts, server.WithStreamingResultsDisabled())
916+
}
917+
918+
if c.clientRequiresStreaming {
919+
clientOpts = append(clientOpts, WithStreamResultsRequired())
920+
onReqReceived = append(onReqReceived, func(r *http.Request) {
921+
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Accept"))
922+
})
923+
}
924+
925+
if c.expStreamingResponse {
926+
onRespReceived = append(onRespReceived, func(r *http.Response) {
927+
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type"))
928+
})
929+
}
930+
931+
if c.expJSONResponse {
932+
onRespReceived = append(onRespReceived, func(r *http.Response) {
933+
assert.Equal(t, mediaTypeJSON, r.Header.Get("Content-Type"))
934+
})
935+
}
936+
937+
deps := makeTestDeps(t, clientOpts, serverOpts)
938+
939+
deps.recordingHTTPClient.f = append(deps.recordingHTTPClient.f, onRespReceived...)
940+
deps.recordingHandler.f = append(deps.recordingHandler.f, onReqReceived...)
941+
942+
client := deps.client
943+
router := deps.router
944+
945+
ctx, cancel := context.WithCancel(context.Background())
946+
t.Cleanup(cancel)
947+
948+
if c.httpStatusCode != 0 {
949+
deps.server.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
950+
w.WriteHeader(c.httpStatusCode)
951+
})
952+
}
953+
954+
if c.stopServer {
955+
deps.server.Close()
956+
}
957+
958+
routerResultIter := iter.FromSlice(c.routerResult)
959+
router.On("GetClosestPeers", mock.Anything, key).Return(routerResultIter, c.routerErr)
960+
961+
resultIter, err := client.GetClosestPeers(ctx, key)
962+
c.expErrContains.errContains(t, err)
963+
964+
results := iter.ReadAll(resultIter)
965+
assert.Equal(t, c.expResult, results)
966+
})
967+
}
968+
}
969+
839970
func TestNormalizeBaseURL(t *testing.T) {
840971
cases := []struct {
841972
name string

routing/http/contentrouter/contentrouter.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@ var logger = logging.Logger("routing/http/contentrouter")
2323

2424
const ttl = 24 * time.Hour
2525

26+
// A Client provides HTTP Delegated Routing methods. See also [server.DelegatedRouter].
2627
type Client interface {
2728
FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error)
2829
ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error)
2930
FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error)
3031
GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error)
3132
PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error
33+
// GetClosestPeers returns the DHT closest peers to the given key (CID or Peer ID).
34+
GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error)
3235
}
3336

3437
type contentRouter struct {
@@ -37,12 +40,17 @@ type contentRouter struct {
3740
maxProvideBatchSize int
3841
}
3942

43+
type DHTRouter interface {
44+
GetClosestPeers(context.Context, cid.Cid) (<-chan peer.AddrInfo, error)
45+
}
46+
4047
var (
4148
_ routing.ContentRouting = (*contentRouter)(nil)
4249
_ routing.PeerRouting = (*contentRouter)(nil)
4350
_ routing.ValueStore = (*contentRouter)(nil)
4451
_ routinghelpers.ProvideManyRouter = (*contentRouter)(nil)
4552
_ routinghelpers.ReadyAbleRouter = (*contentRouter)(nil)
53+
_ DHTRouter = (*contentRouter)(nil)
4654
)
4755

4856
type option func(c *contentRouter)
@@ -59,6 +67,8 @@ func WithMaxProvideBatchSize(max int) option {
5967
}
6068
}
6169

70+
// NewContentRoutingClient returns a client that conforms to the
71+
// ContentRouting interfaces.
6272
func NewContentRoutingClient(c Client, opts ...option) *contentRouter {
6373
cr := &contentRouter{
6474
client: c,
@@ -300,3 +310,44 @@ func (c *contentRouter) SearchValue(ctx context.Context, key string, opts ...rou
300310

301311
return ch, nil
302312
}
313+
314+
func (c *contentRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (<-chan peer.AddrInfo, error) {
315+
iter, err := c.client.GetClosestPeers(ctx, key)
316+
if err != nil {
317+
return nil, err
318+
}
319+
infos := make(chan peer.AddrInfo)
320+
go func() {
321+
defer iter.Close()
322+
defer close(infos)
323+
for iter.Next() {
324+
res := iter.Val()
325+
if res.Err != nil {
326+
logger.Warnf("error iterating peer responses: %s", res.Err)
327+
continue
328+
}
329+
330+
var addrs []multiaddr.Multiaddr
331+
for _, a := range res.Val.Addrs {
332+
addrs = append(addrs, a.Multiaddr)
333+
}
334+
335+
// If there are no addresses there's nothing of value to return
336+
if len(addrs) == 0 {
337+
continue
338+
}
339+
340+
select {
341+
case <-ctx.Done():
342+
logger.Warnf("aborting GetClosestPeers: %s", ctx.Err())
343+
return
344+
case infos <- peer.AddrInfo{
345+
ID: *res.Val.ID,
346+
Addrs: addrs,
347+
}:
348+
}
349+
}
350+
}()
351+
352+
return infos, nil
353+
}

0 commit comments

Comments
 (0)