Skip to content

Commit bd45b83

Browse files
committed
routing/http/client: add support for GetClosestPeers
As the server part, this is heavily inspired in FindPeers.
1 parent 117edfe commit bd45b83

3 files changed

Lines changed: 238 additions & 0 deletions

File tree

routing/http/client/client.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ import (
99
"io"
1010
"mime"
1111
"net/http"
12+
"net/url"
1213
gourl "net/url"
1314
"slices"
15+
"strconv"
1416
"strings"
1517
"time"
1618

@@ -583,3 +585,91 @@ func (c *Client) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Recor
583585

584586
return nil
585587
}
588+
589+
// GetClosestPeers obtains the closest peers to the given peer ID.
590+
func (c *Client) GetClosestPeers(ctx context.Context, peerID, closerThan peer.ID, count int) (peers iter.ResultIter[*types.PeerRecord], err error) {
591+
m := newMeasurement("GetClosestPeers")
592+
593+
// Build the base URL path
594+
u, err := gourl.JoinPath(c.baseURL, "routing/v1/dht/closest/peers", peer.ToCid(peerID).String())
595+
if err != nil {
596+
return nil, err
597+
}
598+
599+
// Add query parameters
600+
queryParams := make(url.Values)
601+
if closerThan != "" {
602+
queryParams.Set("closerThan", closerThan.String())
603+
}
604+
if count > 0 {
605+
queryParams.Set("count", strconv.Itoa(count))
606+
}
607+
u += "?" + queryParams.Encode()
608+
609+
// Create the HTTP request
610+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
611+
if err != nil {
612+
return nil, err
613+
}
614+
req.Header.Set("Accept", c.accepts)
615+
616+
m.host = req.Host
617+
start := c.clock.Now()
618+
resp, err := c.httpClient.Do(req)
619+
m.latency = c.clock.Since(start)
620+
m.err = err
621+
622+
if err != nil {
623+
m.record(ctx)
624+
return nil, err
625+
}
626+
627+
m.statusCode = resp.StatusCode
628+
if resp.StatusCode == http.StatusNotFound {
629+
resp.Body.Close()
630+
m.record(ctx)
631+
return iter.FromSlice[iter.Result[*types.PeerRecord]](nil), nil
632+
}
633+
634+
if resp.StatusCode != http.StatusOK {
635+
err := httpError(resp.StatusCode, resp.Body)
636+
resp.Body.Close()
637+
m.record(ctx)
638+
return nil, err
639+
}
640+
641+
respContentType := resp.Header.Get("Content-Type")
642+
mediaType, _, err := mime.ParseMediaType(respContentType)
643+
if err != nil {
644+
resp.Body.Close()
645+
m.err = err
646+
m.record(ctx)
647+
return nil, fmt.Errorf("parsing Content-Type: %w", err)
648+
}
649+
650+
m.mediaType = mediaType
651+
652+
var skipBodyClose bool
653+
defer func() {
654+
if !skipBodyClose {
655+
resp.Body.Close()
656+
}
657+
}()
658+
659+
var it iter.ResultIter[*types.PeerRecord]
660+
switch mediaType {
661+
case mediaTypeJSON:
662+
parsedResp := &jsontypes.PeersResponse{}
663+
err = json.NewDecoder(resp.Body).Decode(parsedResp)
664+
var sliceIt iter.Iter[*types.PeerRecord] = iter.FromSlice(parsedResp.Peers)
665+
it = iter.ToResultIter(sliceIt)
666+
case mediaTypeNDJSON:
667+
skipBodyClose = true
668+
it = ndjson.NewPeerRecordsIter(resp.Body)
669+
default:
670+
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
671+
return nil, errors.New("unknown content type")
672+
}
673+
674+
return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil
675+
}

routing/http/client/client_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit in
4747
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
4848
}
4949

50+
func (m *mockContentRouter) GetClosestPeers(ctx context.Context, peerID, closerThan peer.ID, count int) (iter.ResultIter[*types.PeerRecord], error) {
51+
args := m.Called(ctx, peerID, closerThan, count)
52+
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
53+
}
54+
5055
func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
5156
args := m.Called(ctx, name)
5257
rec, _ := args.Get(0).(*ipns.Record)
@@ -676,6 +681,146 @@ func TestClient_FindPeers(t *testing.T) {
676681
}
677682
}
678683

684+
func TestClient_GetClosestPeers(t *testing.T) {
685+
bitswapPeerRecord := makePeerRecord([]string{"transport-bitswap"})
686+
httpPeerRecord := makePeerRecord([]string{"transport-ipfs-gateway-http"})
687+
688+
peerRecords := []iter.Result[*types.PeerRecord]{
689+
{Val: &bitswapPeerRecord},
690+
{Val: &httpPeerRecord},
691+
}
692+
693+
pid := *bitswapPeerRecord.ID
694+
695+
cases := []struct {
696+
name string
697+
httpStatusCode int
698+
stopServer bool
699+
routerResult []iter.Result[*types.PeerRecord]
700+
routerErr error
701+
clientRequiresStreaming bool
702+
serverStreamingDisabled bool
703+
closerThan peer.ID
704+
count int
705+
706+
expErrContains osErrContains
707+
expResult []iter.Result[*types.PeerRecord]
708+
expStreamingResponse bool
709+
expJSONResponse bool
710+
}{
711+
{
712+
name: "happy case",
713+
routerResult: peerRecords,
714+
expResult: peerRecords,
715+
expStreamingResponse: true,
716+
},
717+
{
718+
name: "server doesn't support streaming",
719+
routerResult: peerRecords,
720+
expResult: peerRecords,
721+
serverStreamingDisabled: true,
722+
expJSONResponse: true,
723+
},
724+
{
725+
name: "client requires streaming but server doesn't support it",
726+
serverStreamingDisabled: true,
727+
clientRequiresStreaming: true,
728+
expErrContains: osErrContains{expContains: "HTTP error with StatusCode=400: no supported content types"},
729+
},
730+
{
731+
name: "returns an error if there's a non-200 response",
732+
httpStatusCode: 500,
733+
expErrContains: osErrContains{expContains: "HTTP error with StatusCode=500"},
734+
},
735+
{
736+
name: "returns an error if the HTTP client returns a non-HTTP error",
737+
stopServer: true,
738+
expErrContains: osErrContains{
739+
expContains: "connect: connection refused",
740+
expContainsWin: "connectex: No connection could be made because the target machine actively refused it.",
741+
},
742+
},
743+
{
744+
name: "returns no providers if the HTTP server returns a 404 response",
745+
httpStatusCode: 404,
746+
expResult: nil,
747+
},
748+
{
749+
name: "passes count and closerThan along",
750+
expStreamingResponse: true,
751+
routerResult: peerRecords,
752+
expResult: peerRecords,
753+
count: 10,
754+
closerThan: pid,
755+
},
756+
}
757+
for _, c := range cases {
758+
t.Run(c.name, func(t *testing.T) {
759+
var clientOpts []Option
760+
var serverOpts []server.Option
761+
var onRespReceived []func(*http.Response)
762+
var onReqReceived []func(*http.Request)
763+
764+
if c.serverStreamingDisabled {
765+
serverOpts = append(serverOpts, server.WithStreamingResultsDisabled())
766+
}
767+
768+
if c.clientRequiresStreaming {
769+
clientOpts = append(clientOpts, WithStreamResultsRequired())
770+
onReqReceived = append(onReqReceived, func(r *http.Request) {
771+
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Accept"))
772+
})
773+
}
774+
775+
if c.expStreamingResponse {
776+
onRespReceived = append(onRespReceived, func(r *http.Response) {
777+
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type"))
778+
})
779+
}
780+
781+
if c.expJSONResponse {
782+
onRespReceived = append(onRespReceived, func(r *http.Response) {
783+
assert.Equal(t, mediaTypeJSON, r.Header.Get("Content-Type"))
784+
})
785+
}
786+
787+
deps := makeTestDeps(t, clientOpts, serverOpts)
788+
789+
deps.recordingHTTPClient.f = append(deps.recordingHTTPClient.f, onRespReceived...)
790+
deps.recordingHandler.f = append(deps.recordingHandler.f, onReqReceived...)
791+
792+
client := deps.client
793+
router := deps.router
794+
795+
ctx, cancel := context.WithCancel(context.Background())
796+
t.Cleanup(cancel)
797+
798+
if c.httpStatusCode != 0 {
799+
deps.server.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
800+
w.WriteHeader(c.httpStatusCode)
801+
})
802+
}
803+
804+
if c.stopServer {
805+
deps.server.Close()
806+
}
807+
808+
count := c.count
809+
if count == 0 {
810+
count = 20
811+
}
812+
routerResultIter := iter.FromSlice(c.routerResult)
813+
router.On("GetClosestPeers", mock.Anything, pid, c.closerThan, count).Return(routerResultIter, c.routerErr)
814+
815+
resultIter, err := client.GetClosestPeers(ctx, pid, c.closerThan, c.count)
816+
c.expErrContains.errContains(t, err)
817+
818+
results := iter.ReadAll(resultIter)
819+
assert.Equal(t, c.expResult, results)
820+
})
821+
}
822+
}
823+
679824
func TestNormalizeBaseURL(t *testing.T) {
680825
cases := []struct {
681826
name string

routing/http/contentrouter/contentrouter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ var logger = logging.Logger("routing/http/contentrouter")
2323

2424
const ttl = 24 * time.Hour
2525

26+
// A Client provides HTTP Content Routing methods. See also [server.ContentRouter].
2627
type Client interface {
2728
FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error)
2829
ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error)
@@ -59,6 +60,8 @@ func WithMaxProvideBatchSize(max int) option {
5960
}
6061
}
6162

63+
// NewContentRoutingClient returns a client that conforms to the
64+
// ContentRouting interfaces.
6265
func NewContentRoutingClient(c Client, opts ...option) *contentRouter {
6366
cr := &contentRouter{
6467
client: c,

0 commit comments

Comments
 (0)