Skip to content
6 changes: 4 additions & 2 deletions pkg/hive/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var (
)

type Service struct {
streamer p2p.Streamer
streamer p2p.Bee260CompatibilityStreamer
addressBook addressbook.GetPutter
addPeersHandler func(...swarm.Address)
networkID uint64
Expand All @@ -67,7 +67,7 @@ type Service struct {
overlay swarm.Address
}

func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, allowPrivateCIDRs bool, overlay swarm.Address, logger log.Logger) *Service {
func New(streamer p2p.Bee260CompatibilityStreamer, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, allowPrivateCIDRs bool, overlay swarm.Address, logger log.Logger) *Service {
svc := &Service{
streamer: streamer,
logger: logger.WithName(loggerName).Register(),
Expand Down Expand Up @@ -196,6 +196,8 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
continue
}

advertisableUnderlays = p2p.FilterBee260CompatibleUnderlays(s.streamer.IsBee260(peer), advertisableUnderlays)

peersRequest.Peers = append(peersRequest.Peers, &pb.BzzAddress{
Overlay: addr.Overlay.Bytes(),
Underlay: bzz.SerializeUnderlays(advertisableUnderlays),
Expand Down
23 changes: 4 additions & 19 deletions pkg/p2p/libp2p/internal/handshake/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiadd

w, r := protobuf.NewWriterAndReader(stream)

peerMultiaddrs = filterBee260CompatibleUnderlays(o.bee260compatibility, peerMultiaddrs)
peerMultiaddrs = p2p.FilterBee260CompatibleUnderlays(o.bee260compatibility, peerMultiaddrs)

if err := w.WriteMsgWithContext(ctx, &pb.Syn{
ObservedUnderlay: bzz.SerializeUnderlays(peerMultiaddrs),
Expand Down Expand Up @@ -208,7 +208,7 @@ func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiadd
return a.Equal(b)
})

advertisableUnderlays = filterBee260CompatibleUnderlays(o.bee260compatibility, advertisableUnderlays)
advertisableUnderlays = p2p.FilterBee260CompatibleUnderlays(o.bee260compatibility, advertisableUnderlays)

bzzAddress, err := bzz.NewAddress(s.signer, advertisableUnderlays, s.overlay, s.networkID, s.nonce)
if err != nil {
Expand Down Expand Up @@ -306,7 +306,7 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, peerMultiaddrs
return a.Equal(b)
})

advertisableUnderlays = filterBee260CompatibleUnderlays(o.bee260compatibility, advertisableUnderlays)
advertisableUnderlays = p2p.FilterBee260CompatibleUnderlays(o.bee260compatibility, advertisableUnderlays)

bzzAddress, err := bzz.NewAddress(s.signer, advertisableUnderlays, s.overlay, s.networkID, s.nonce)
if err != nil {
Expand All @@ -315,7 +315,7 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, peerMultiaddrs

welcomeMessage := s.GetWelcomeMessage()

peerMultiaddrs = filterBee260CompatibleUnderlays(o.bee260compatibility, peerMultiaddrs)
peerMultiaddrs = p2p.FilterBee260CompatibleUnderlays(o.bee260compatibility, peerMultiaddrs)

if err := w.WriteMsgWithContext(ctx, &pb.SynAck{
Syn: &pb.Syn{
Expand Down Expand Up @@ -395,18 +395,3 @@ func (s *Service) parseCheckAck(ack *pb.Ack) (*bzz.Address, error) {

return bzzAddress, nil
}

// filterBee260CompatibleUnderlays select a single underlay to pass if
// bee260compatibility is true. Otherwise it passes the unmodified underlays
// slice. This function can be safely removed when bee version 2.6.0 is
// deprecated.
func filterBee260CompatibleUnderlays(bee260compatibility bool, underlays []ma.Multiaddr) []ma.Multiaddr {
if !bee260compatibility {
return underlays
}
underlay := bzz.SelectBestAdvertisedAddress(underlays, nil)
if underlay == nil {
return underlays
}
return []ma.Multiaddr{underlay}
}
15 changes: 15 additions & 0 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,9 +1475,23 @@ func (s *Service) peerMultiaddrs(ctx context.Context, peerID libp2ppeer.ID) ([]m
return buildFullMAs(waitPeerAddrs(waitPeersCtx, s.host.Peerstore(), peerID), peerID)
}

// IsBee260 implements p2p.Bee260CompatibilityStreamer interface.
// It checks if a peer is running Bee version older than 2.7.0.
func (s *Service) IsBee260(overlay swarm.Address) bool {
peerID, found := s.peers.peerID(overlay)
if !found {
return false
}
return s.bee260BackwardCompatibility(peerID)
}

var version270 = *semver.Must(semver.NewVersion("2.7.0"))

func (s *Service) bee260BackwardCompatibility(peerID libp2ppeer.ID) bool {
if compat, found := s.peers.bee260(peerID); found {
return compat
}

userAgent := s.peerUserAgent(s.ctx, peerID)
p := strings.SplitN(userAgent, " ", 2)
if len(p) != 2 {
Expand All @@ -1496,6 +1510,7 @@ func (s *Service) bee260BackwardCompatibility(peerID libp2ppeer.ID) bool {
return false
}
result := vCore.LessThan(version270)
s.peers.setBee260(peerID, result)
return result
}

Expand Down
39 changes: 28 additions & 11 deletions pkg/p2p/libp2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
)

type peerRegistry struct {
underlays map[string]libp2ppeer.ID // map overlay address to underlay peer id
overlays map[libp2ppeer.ID]swarm.Address // map underlay peer id to overlay address
full map[libp2ppeer.ID]bool // map to track whether a node is full or light node (true=full)
connections map[libp2ppeer.ID]map[network.Conn]struct{} // list of connections for safe removal on Disconnect notification
streams map[libp2ppeer.ID]map[network.Stream]context.CancelFunc
mu sync.RWMutex
underlays map[string]libp2ppeer.ID // map overlay address to underlay peer id
overlays map[libp2ppeer.ID]swarm.Address // map underlay peer id to overlay address
full map[libp2ppeer.ID]bool // map to track whether a node is full or light node (true=full)
bee260Compatibility map[libp2ppeer.ID]bool // map to track bee260 backward compatibility
connections map[libp2ppeer.ID]map[network.Conn]struct{} // list of connections for safe removal on Disconnect notification
streams map[libp2ppeer.ID]map[network.Stream]context.CancelFunc
mu sync.RWMutex

//nolint:misspell
disconnecter disconnecter // peerRegistry notifies libp2p on peer disconnection
Expand All @@ -36,11 +37,12 @@ type disconnecter interface {

func newPeerRegistry() *peerRegistry {
return &peerRegistry{
underlays: make(map[string]libp2ppeer.ID),
overlays: make(map[libp2ppeer.ID]swarm.Address),
full: make(map[libp2ppeer.ID]bool),
connections: make(map[libp2ppeer.ID]map[network.Conn]struct{}),
streams: make(map[libp2ppeer.ID]map[network.Stream]context.CancelFunc),
underlays: make(map[string]libp2ppeer.ID),
overlays: make(map[libp2ppeer.ID]swarm.Address),
full: make(map[libp2ppeer.ID]bool),
bee260Compatibility: make(map[libp2ppeer.ID]bool),
connections: make(map[libp2ppeer.ID]map[network.Conn]struct{}),
streams: make(map[libp2ppeer.ID]map[network.Stream]context.CancelFunc),

Notifiee: new(network.NoopNotifiee),
}
Expand Down Expand Up @@ -81,6 +83,7 @@ func (r *peerRegistry) Disconnected(_ network.Network, c network.Conn) {
}
delete(r.streams, peerID)
delete(r.full, peerID)
delete(r.bee260Compatibility, peerID)
r.mu.Unlock()
r.disconnecter.disconnected(overlay)

Expand Down Expand Up @@ -176,6 +179,19 @@ func (r *peerRegistry) fullnode(peerID libp2ppeer.ID) (bool, bool) {
return full, found
}

func (r *peerRegistry) bee260(peerID libp2ppeer.ID) (compat, found bool) {
r.mu.RLock()
defer r.mu.RUnlock()
compat, found = r.bee260Compatibility[peerID]
return compat, found
}

func (r *peerRegistry) setBee260(peerID libp2ppeer.ID, compat bool) {
r.mu.Lock()
defer r.mu.Unlock()
r.bee260Compatibility[peerID] = compat
}

func (r *peerRegistry) isConnected(peerID libp2ppeer.ID, remoteAddr ma.Multiaddr) (swarm.Address, bool) {
if remoteAddr == nil {
return swarm.ZeroAddress, false
Expand Down Expand Up @@ -217,6 +233,7 @@ func (r *peerRegistry) remove(overlay swarm.Address) (found, full bool, peerID l
delete(r.streams, peerID)
full = r.full[peerID]
delete(r.full, peerID)
delete(r.bee260Compatibility, peerID)
r.mu.Unlock()

return found, full, peerID
Expand Down
69 changes: 69 additions & 0 deletions pkg/p2p/libp2p/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,72 @@ func TestBee260BackwardCompatibility(t *testing.T) {
})
}
}

func TestBee260Cache(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

swarmKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}

overlay := swarm.RandAddress(t)
addr := ":0"
networkID := uint64(1)

statestore := mock.NewStateStore()
defer statestore.Close()

s, err := New(ctx, crypto.NewDefaultSigner(swarmKey), networkID, overlay, addr, nil, statestore, nil, log.Noop, nil, Options{})
if err != nil {
t.Fatal(err)
}
defer s.Close()

libp2pPeerID, err := libp2ppeer.Decode("16Uiu2HAm3g4hXfCWTDhPBq3KkqpV3wGkPVgMJY3Jt8gGTYWiTWNZ")
if err != nil {
t.Fatal(err)
}

// 1. Set user agent to 2.6.0 (compat = true)
if err := s.host.Peerstore().Put(libp2pPeerID, "AgentVersion", "bee/2.6.0 go1.22.0 linux/amd64"); err != nil {
t.Fatal(err)
}

// 2. First call should calculate and cache it
if !s.bee260BackwardCompatibility(libp2pPeerID) {
t.Fatal("expected true for 2.6.0")
}

// 3. Verify it's in the cache
compat, found := s.peers.bee260(libp2pPeerID)
if !found {
t.Fatal("expected value to be in cache")
}
if !compat {
t.Fatal("expected cached value to be true")
}

// 4. Change user agent in peerstore to 2.7.0 (compat = false)
// If caching works, bee260BackwardCompatibility should still return true
if err := s.host.Peerstore().Put(libp2pPeerID, "AgentVersion", "bee/2.7.0 go1.23.0 linux/amd64"); err != nil {
t.Fatal(err)
}

if !s.bee260BackwardCompatibility(libp2pPeerID) {
t.Fatal("expected true (cached value) even if peerstore changed")
}

// 5. Clear cache (manually for testing)
s.peers.mu.Lock()
delete(s.peers.bee260Compatibility, libp2pPeerID)
s.peers.mu.Unlock()

// 6. Now it should re-calculate and return false for 2.7.0
if s.bee260BackwardCompatibility(libp2pPeerID) {
t.Fatal("expected false for 2.7.0 after cache clear")
}
}
21 changes: 21 additions & 0 deletions pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ type Streamer interface {
NewStream(ctx context.Context, address swarm.Address, h Headers, protocol, version, stream string) (Stream, error)
}

// Bee260CompatibilityStreamer is able to create a new Stream and check if a peer is running Bee 2.6.0.
type Bee260CompatibilityStreamer interface {
NewStream(ctx context.Context, address swarm.Address, h Headers, protocol, version, stream string) (Stream, error)
IsBee260(address swarm.Address) bool
}

type StreamerDisconnecter interface {
Streamer
Disconnecter
Expand Down Expand Up @@ -237,3 +243,18 @@ func (e *ChunkDeliveryError) Error() string {
func NewChunkDeliveryError(msg string) error {
return &ChunkDeliveryError{msg: msg}
}

// FilterBee260CompatibleUnderlays select a single underlay to pass if
// bee260compatibility is true. Otherwise it passes the unmodified underlays
// slice. This function can be safely removed when bee version 2.6.0 is
// deprecated.
func FilterBee260CompatibleUnderlays(bee260compatibility bool, underlays []ma.Multiaddr) []ma.Multiaddr {
if !bee260compatibility {
return underlays
}
underlay := bzz.SelectBestAdvertisedAddress(underlays, nil)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we could only return TCP. It is prefered, but 2.6 doesn't support other transports, so we can send TCP only.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

if underlay == nil {
return underlays
}
return []ma.Multiaddr{underlay}
}
6 changes: 6 additions & 0 deletions pkg/p2p/streamtest/streamtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ func (r *Recorder) WaitRecords(t *testing.T, addr swarm.Address, proto, version,
return recs
}

// IsBee260 implements p2p.Bee260CompatibilityStreamer interface.
// It always returns false.
func (r *Recorder) IsBee260(overlay swarm.Address) bool {
return false
}

type Record struct {
in *record
out *record
Expand Down
Loading