@@ -3,6 +3,7 @@ package main
33import (
44 "context"
55 "errors"
6+ "math"
67 "sync"
78 "time"
89
@@ -24,10 +25,12 @@ type router interface {
2425
2526type providersRouter interface {
2627 FindProviders (ctx context.Context , cid cid.Cid , limit int ) (iter.ResultIter [types.Record ], error )
28+ Provide (ctx context.Context , req * types.AnnouncementRecord ) (time.Duration , error )
2729}
2830
2931type peersRouter interface {
3032 FindPeers (ctx context.Context , pid peer.ID , limit int ) (iter.ResultIter [* types.PeerRecord ], error )
33+ ProvidePeer (ctx context.Context , req * types.AnnouncementRecord ) (time.Duration , error )
3134}
3235
3336type ipnsRouter interface {
@@ -50,13 +53,27 @@ func (r composableRouter) FindProviders(ctx context.Context, key cid.Cid, limit
5053 return r .providers .FindProviders (ctx , key , limit )
5154}
5255
56+ func (r composableRouter ) Provide (ctx context.Context , req * types.AnnouncementRecord ) (time.Duration , error ) {
57+ if r .providers == nil {
58+ return 0 , nil
59+ }
60+ return r .providers .Provide (ctx , req )
61+ }
62+
5363func (r composableRouter ) FindPeers (ctx context.Context , pid peer.ID , limit int ) (iter.ResultIter [* types.PeerRecord ], error ) {
5464 if r .peers == nil {
5565 return iter .ToResultIter (iter .FromSlice ([]* types.PeerRecord {})), nil
5666 }
5767 return r .peers .FindPeers (ctx , pid , limit )
5868}
5969
70+ func (r composableRouter ) ProvidePeer (ctx context.Context , req * types.AnnouncementRecord ) (time.Duration , error ) {
71+ if r .peers == nil {
72+ return 0 , nil
73+ }
74+ return r .peers .ProvidePeer (ctx , req )
75+ }
76+
6077func (r composableRouter ) GetIPNS (ctx context.Context , name ipns.Name ) (* ipns.Record , error ) {
6178 if r .ipns == nil {
6279 return nil , routing .ErrNotFound
@@ -71,11 +88,6 @@ func (r composableRouter) PutIPNS(ctx context.Context, name ipns.Name, record *i
7188 return r .ipns .PutIPNS (ctx , name , record )
7289}
7390
74- //lint:ignore SA1019 // ignore staticcheck
75- func (r composableRouter ) ProvideBitswap (ctx context.Context , req * server.BitswapWriteProvideRequest ) (time.Duration , error ) {
76- return 0 , routing .ErrNotSupported
77- }
78-
7991var _ server.ContentRouter = parallelRouter {}
8092
8193type parallelRouter struct {
@@ -206,6 +218,57 @@ func (mi *manyIter[T]) Close() error {
206218 return err
207219}
208220
221+ func (r parallelRouter ) Provide (ctx context.Context , req * types.AnnouncementRecord ) (time.Duration , error ) {
222+ return provide (ctx , r .routers , func (ctx context.Context , r router ) (time.Duration , error ) {
223+ return r .Provide (ctx , req )
224+ })
225+ }
226+
227+ func (r parallelRouter ) ProvidePeer (ctx context.Context , req * types.AnnouncementRecord ) (time.Duration , error ) {
228+ return provide (ctx , r .routers , func (ctx context.Context , r router ) (time.Duration , error ) {
229+ return r .ProvidePeer (ctx , req )
230+ })
231+ }
232+
233+ func provide (ctx context.Context , routers []router , call func (context.Context , router ) (time.Duration , error )) (time.Duration , error ) {
234+ switch len (routers ) {
235+ case 0 :
236+ return 0 , nil
237+ case 1 :
238+ return call (ctx , routers [0 ])
239+ }
240+
241+ ctx , cancel := context .WithCancel (ctx )
242+ defer cancel ()
243+
244+ var wg sync.WaitGroup
245+ resultsTTL := make ([]time.Duration , len (routers ))
246+ resultsErr := make ([]error , len (routers ))
247+ wg .Add (len (routers ))
248+ for i , ri := range routers {
249+ go func (ri router , i int ) {
250+ resultsTTL [i ], resultsErr [i ] = call (ctx , ri )
251+ wg .Done ()
252+ }(ri , i )
253+ }
254+ wg .Wait ()
255+
256+ var err error
257+ for _ , e := range resultsErr {
258+ err = errors .Join (err , e )
259+ }
260+
261+ // Choose lowest TTL to return.
262+ var ttl time.Duration = math .MaxInt64
263+ for _ , t := range resultsTTL {
264+ if t < ttl {
265+ ttl = t
266+ }
267+ }
268+
269+ return ttl , err
270+ }
271+
209272func (r parallelRouter ) GetIPNS (ctx context.Context , name ipns.Name ) (* ipns.Record , error ) {
210273 switch len (r .routers ) {
211274 case 0 :
@@ -296,11 +359,6 @@ func (r parallelRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipn
296359 return errs
297360}
298361
299- //lint:ignore SA1019 // ignore staticcheck
300- func (r parallelRouter ) ProvideBitswap (ctx context.Context , req * server.BitswapWriteProvideRequest ) (time.Duration , error ) {
301- return 0 , routing .ErrNotSupported
302- }
303-
304362var _ router = libp2pRouter {}
305363
306364type libp2pRouter struct {
@@ -316,6 +374,12 @@ func (d libp2pRouter) FindProviders(ctx context.Context, key cid.Cid, limit int)
316374 }), nil
317375}
318376
377+ func (d libp2pRouter ) Provide (ctx context.Context , req * types.AnnouncementRecord ) (time.Duration , error ) {
378+ // NOTE: this router cannot provide further to the DHT, since we can only
379+ // announce CIDs that our own node has, which is not the case.
380+ return 0 , routing .ErrNotSupported
381+ }
382+
319383func (d libp2pRouter ) FindPeers (ctx context.Context , pid peer.ID , limit int ) (iter.ResultIter [* types.PeerRecord ], error ) {
320384 ctx , cancel := context .WithCancel (ctx )
321385 defer cancel ()
@@ -337,6 +401,10 @@ func (d libp2pRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (it
337401 return iter.ToResultIter [* types.PeerRecord ](iter.FromSlice [* types.PeerRecord ]([]* types.PeerRecord {rec })), nil
338402}
339403
404+ func (r libp2pRouter ) ProvidePeer (ctx context.Context , req * types.AnnouncementRecord ) (time.Duration , error ) {
405+ return 0 , routing .ErrNotSupported
406+ }
407+
340408func (d libp2pRouter ) GetIPNS (ctx context.Context , name ipns.Name ) (* ipns.Record , error ) {
341409 ctx , cancel := context .WithCancel (ctx )
342410 defer cancel ()
@@ -409,6 +477,36 @@ func (d clientRouter) FindProviders(ctx context.Context, cid cid.Cid, limit int)
409477 return d .Client .FindProviders (ctx , cid )
410478}
411479
480+ func (d clientRouter ) Provide (ctx context.Context , req * types.AnnouncementRecord ) (time.Duration , error ) {
481+ return d .provide (func () (iter.ResultIter [* types.AnnouncementRecord ], error ) {
482+ return d .Client .ProvideRecords (ctx , req )
483+ })
484+ }
485+
412486func (d clientRouter ) FindPeers (ctx context.Context , pid peer.ID , limit int ) (iter.ResultIter [* types.PeerRecord ], error ) {
413487 return d .Client .FindPeers (ctx , pid )
414488}
489+
490+ func (d clientRouter ) ProvidePeer (ctx context.Context , req * types.AnnouncementRecord ) (time.Duration , error ) {
491+ return d .provide (func () (iter.ResultIter [* types.AnnouncementRecord ], error ) {
492+ return d .Client .ProvidePeerRecords (ctx , req )
493+ })
494+ }
495+
496+ func (d clientRouter ) provide (do func () (iter.ResultIter [* types.AnnouncementRecord ], error )) (time.Duration , error ) {
497+ resIter , err := do ()
498+ if err != nil {
499+ return 0 , err
500+ }
501+
502+ records , err := iter .ReadAllResults (resIter )
503+ if err != nil {
504+ return 0 , err
505+ }
506+
507+ if len (records ) != 1 {
508+ return 0 , errors .New ("invalid number of records returned" )
509+ }
510+
511+ return records [0 ].Payload .TTL , nil
512+ }
0 commit comments