From eebec222c2c3c86ce7e4c6cde4fd4f6062c56ae2 Mon Sep 17 00:00:00 2001 From: sbackend Date: Thu, 26 Mar 2026 00:09:37 +0100 Subject: [PATCH 1/5] chore: remove legacy code --- pkg/bzz/address.go | 15 -- pkg/hive/hive.go | 6 +- .../libp2p/internal/handshake/handshake.go | 36 +-- pkg/p2p/libp2p/libp2p.go | 43 ---- pkg/p2p/libp2p/peer.go | 39 +-- pkg/p2p/libp2p/version_test.go | 234 ------------------ pkg/p2p/p2p.go | 21 -- pkg/p2p/streamtest/streamtest.go | 6 - 8 files changed, 15 insertions(+), 385 deletions(-) delete mode 100644 pkg/p2p/libp2p/version_test.go diff --git a/pkg/bzz/address.go b/pkg/bzz/address.go index 89914ef66bf..d01c9122175 100644 --- a/pkg/bzz/address.go +++ b/pkg/bzz/address.go @@ -14,7 +14,6 @@ import ( "encoding/json" "errors" "fmt" - "slices" "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/bee/v2/pkg/crypto" @@ -38,7 +37,6 @@ type Address struct { type addressJSON struct { Overlay string `json:"overlay"` - Underlay string `json:"underlay"` // For backward compatibility Underlays []string `json:"underlays"` Signature string `json:"signature"` Nonce string `json:"transaction"` @@ -145,16 +143,8 @@ func (a *Address) MarshalJSON() ([]byte, error) { if len(a.Underlays) == 0 { return nil, fmt.Errorf("no underlays for %s", a.Overlay) } - - // select the underlay address for backward compatibility - var underlay string - if v := SelectBestAdvertisedAddress(a.Underlays, nil); v != nil { - underlay = v.String() - } - return json.Marshal(&addressJSON{ Overlay: a.Overlay.String(), - Underlay: underlay, Underlays: a.underlaysAsStrings(), Signature: base64.StdEncoding.EncodeToString(a.Signature), Nonce: common.Bytes2Hex(a.Nonce), @@ -175,11 +165,6 @@ func (a *Address) UnmarshalJSON(b []byte) error { a.Overlay = addr - // append the underlay for backward compatibility - if !slices.Contains(v.Underlays, v.Underlay) { - v.Underlays = append(v.Underlays, v.Underlay) - } - multiaddrs, err := parseMultiaddrs(v.Underlays) if err != nil { return err diff --git a/pkg/hive/hive.go b/pkg/hive/hive.go index 8f8572bcb34..1f49fe2808c 100644 --- a/pkg/hive/hive.go +++ b/pkg/hive/hive.go @@ -50,7 +50,7 @@ var ( ) type Service struct { - streamer p2p.Bee260CompatibilityStreamer + streamer p2p.Streamer addressBook addressbook.GetPutter addPeersHandler func(...swarm.Address) networkID uint64 @@ -67,7 +67,7 @@ type Service struct { overlay swarm.Address } -func New(streamer p2p.Bee260CompatibilityStreamer, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, allowPrivateCIDRs bool, overlay swarm.Address, logger log.Logger) *Service { +func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, allowPrivateCIDRs bool, overlay swarm.Address, logger log.Logger) *Service { svc := &Service{ streamer: streamer, logger: logger.WithName(loggerName).Register(), @@ -196,8 +196,6 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa continue } - advertisableUnderlays = p2p.FilterBee260CompatibleUnderlays(s.streamer.IsBee260(peer), advertisableUnderlays) - peersRequest.Peers = append(peersRequest.Peers, &pb.BzzAddress{ Overlay: addr.Overlay.Bytes(), Underlay: bzz.SerializeUnderlays(advertisableUnderlays), diff --git a/pkg/p2p/libp2p/internal/handshake/handshake.go b/pkg/p2p/libp2p/internal/handshake/handshake.go index e88dc02ef2b..32ab4b38908 100644 --- a/pkg/p2p/libp2p/internal/handshake/handshake.go +++ b/pkg/p2p/libp2p/internal/handshake/handshake.go @@ -67,20 +67,6 @@ type Addresser interface { AdvertizableAddrs() ([]ma.Multiaddr, error) } -type Option struct { - bee260compatibility bool -} - -// WithBee260Compatibility option ensures that only one underlay address is -// passed to the peer in p2p protocol messages, so that nodes with version 2.6.0 -// and older can deserialize it. This option can be safely removed when bee -// version 2.6.0 is deprecated. -func WithBee260Compatibility(yes bool) func(*Option) { - return func(o *Option) { - o.bee260compatibility = yes - } -} - // Service can perform initiate or handle a handshake between peers. type Service struct { signer crypto.Signer @@ -144,21 +130,14 @@ func (s *Service) SetPicker(n p2p.Picker) { } // Handshake initiates a handshake with a peer. -func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiaddrs []ma.Multiaddr, opts ...func(*Option)) (i *Info, err error) { +func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiaddrs []ma.Multiaddr) (i *Info, err error) { loggerV1 := s.logger.V(1).Register() - o := new(Option) - for _, set := range opts { - set(o) - } - ctx, cancel := context.WithTimeout(ctx, handshakeTimeout) defer cancel() w, r := protobuf.NewWriterAndReader(stream) - peerMultiaddrs = p2p.FilterBee260CompatibleUnderlays(o.bee260compatibility, peerMultiaddrs) - if err := w.WriteMsgWithContext(ctx, &pb.Syn{ ObservedUnderlay: bzz.SerializeUnderlays(peerMultiaddrs), }); err != nil { @@ -212,8 +191,6 @@ func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiadd return a.Equal(b) }) - advertisableUnderlays = p2p.FilterBee260CompatibleUnderlays(o.bee260compatibility, advertisableUnderlays) - bzzAddress, err := bzz.NewAddress(s.signer, advertisableUnderlays, s.overlay, s.networkID, s.nonce) if err != nil { return nil, err @@ -258,14 +235,9 @@ func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiadd } // Handle handles an incoming handshake from a peer. -func (s *Service) Handle(ctx context.Context, stream p2p.Stream, peerMultiaddrs []ma.Multiaddr, opts ...func(*Option)) (i *Info, err error) { +func (s *Service) Handle(ctx context.Context, stream p2p.Stream, peerMultiaddrs []ma.Multiaddr) (i *Info, err error) { loggerV1 := s.logger.V(1).Register() - o := new(Option) - for _, set := range opts { - set(o) - } - ctx, cancel := context.WithTimeout(ctx, handshakeTimeout) defer cancel() @@ -310,8 +282,6 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, peerMultiaddrs return a.Equal(b) }) - advertisableUnderlays = p2p.FilterBee260CompatibleUnderlays(o.bee260compatibility, advertisableUnderlays) - bzzAddress, err := bzz.NewAddress(s.signer, advertisableUnderlays, s.overlay, s.networkID, s.nonce) if err != nil { return nil, err @@ -319,8 +289,6 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, peerMultiaddrs welcomeMessage := s.GetWelcomeMessage() - peerMultiaddrs = p2p.FilterBee260CompatibleUnderlays(o.bee260compatibility, peerMultiaddrs) - if err := w.WriteMsgWithContext(ctx, &pb.SynAck{ Syn: &pb.Syn{ ObservedUnderlay: bzz.SerializeUnderlays(peerMultiaddrs), diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 9b696cd26e2..de354e7b44b 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -609,13 +609,10 @@ func (s *Service) handleIncoming(stream network.Stream) { } } - bee260Compat := s.bee260BackwardCompatibility(peerID) - i, err := s.handshakeService.Handle( s.ctx, handshakeStream, observedAddrs, - handshake.WithBee260Compatibility(bee260Compat), ) if err != nil { s.logger.Debug("stream handler: handshake: handle failed", "peer_id", peerID, "error", err) @@ -1065,13 +1062,10 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b } } - bee260Compat := s.bee260BackwardCompatibility(peerID) - i, err := s.handshakeService.Handshake( s.ctx, handshakeStream, observedAddrs, - handshake.WithBee260Compatibility(bee260Compat), ) if err != nil { _ = handshakeStream.Reset() @@ -1470,45 +1464,8 @@ func (s *Service) peerMultiaddrs(ctx context.Context, peerID libp2ppeer.ID) ([]m return buildFullMAs(mas, peerID) } -// IsBee260 implements p2p.Bee260CompatibilityStreamer interface. -// It checks if a peer is running Bee version older than 2.7.0. -func (s *Service) IsBee260(overlay swarm.Address) bool { - peerID, found := s.peers.peerID(overlay) - if !found { - return false - } - return s.bee260BackwardCompatibility(peerID) -} - var version270 = *semver.Must(semver.NewVersion("2.7.0")) -func (s *Service) bee260BackwardCompatibility(peerID libp2ppeer.ID) bool { - if compat, found := s.peers.bee260(peerID); found { - return compat - } - - userAgent := s.peerUserAgent(s.ctx, peerID) - p := strings.SplitN(userAgent, " ", 2) - if len(p) != 2 { - return false - } - version := strings.TrimPrefix(p[0], "bee/") - v, err := semver.NewVersion(version) - if err != nil { - return false - } - - // Compare major.minor.patch only (ignore pre-release) - // This way 2.7.0-rc12 is treated as >= 2.7.0 - vCore, err := semver.NewVersion(fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch)) - if err != nil { - return false - } - result := vCore.LessThan(version270) - s.peers.setBee260(peerID, result) - return result -} - // appendSpace adds a leading space character if the string is not empty. // It is useful for constructing log messages with conditional substrings. func appendSpace(s string) string { diff --git a/pkg/p2p/libp2p/peer.go b/pkg/p2p/libp2p/peer.go index 232fbf9b103..d52c23a1209 100644 --- a/pkg/p2p/libp2p/peer.go +++ b/pkg/p2p/libp2p/peer.go @@ -18,13 +18,12 @@ import ( ) type peerRegistry struct { - overlayToPeerID map[string]libp2ppeer.ID // map overlay address to underlay peer id - overlays map[libp2ppeer.ID]swarm.Address // map underlay peer id to overlay address - full map[libp2ppeer.ID]bool // map to track whether a node is full or light node (true=full) - bee260Compatibility map[libp2ppeer.ID]bool // map to track bee260 backward compatibility - connections map[libp2ppeer.ID]map[network.Conn]struct{} // list of connections for safe removal on Disconnect notification - streams map[libp2ppeer.ID]map[network.Stream]context.CancelFunc - mu sync.RWMutex + overlayToPeerID map[string]libp2ppeer.ID // map overlay address to underlay peer id + overlays map[libp2ppeer.ID]swarm.Address // map underlay peer id to overlay address + full map[libp2ppeer.ID]bool // map to track whether a node is full or light node (true=full) + connections map[libp2ppeer.ID]map[network.Conn]struct{} // list of connections for safe removal on Disconnect notification + streams map[libp2ppeer.ID]map[network.Stream]context.CancelFunc + mu sync.RWMutex //nolint:misspell disconnecter disconnecter // peerRegistry notifies libp2p on peer disconnection @@ -37,12 +36,11 @@ type disconnecter interface { func newPeerRegistry() *peerRegistry { return &peerRegistry{ - overlayToPeerID: make(map[string]libp2ppeer.ID), - overlays: make(map[libp2ppeer.ID]swarm.Address), - full: make(map[libp2ppeer.ID]bool), - bee260Compatibility: make(map[libp2ppeer.ID]bool), - connections: make(map[libp2ppeer.ID]map[network.Conn]struct{}), - streams: make(map[libp2ppeer.ID]map[network.Stream]context.CancelFunc), + overlayToPeerID: make(map[string]libp2ppeer.ID), + overlays: make(map[libp2ppeer.ID]swarm.Address), + full: make(map[libp2ppeer.ID]bool), + connections: make(map[libp2ppeer.ID]map[network.Conn]struct{}), + streams: make(map[libp2ppeer.ID]map[network.Stream]context.CancelFunc), Notifiee: new(network.NoopNotifiee), } @@ -83,7 +81,6 @@ func (r *peerRegistry) Disconnected(_ network.Network, c network.Conn) { } delete(r.streams, peerID) delete(r.full, peerID) - delete(r.bee260Compatibility, peerID) r.mu.Unlock() r.disconnecter.disconnected(overlay) @@ -179,19 +176,6 @@ func (r *peerRegistry) fullnode(peerID libp2ppeer.ID) (bool, bool) { return full, found } -func (r *peerRegistry) bee260(peerID libp2ppeer.ID) (compat, found bool) { - r.mu.RLock() - defer r.mu.RUnlock() - compat, found = r.bee260Compatibility[peerID] - return compat, found -} - -func (r *peerRegistry) setBee260(peerID libp2ppeer.ID, compat bool) { - r.mu.Lock() - defer r.mu.Unlock() - r.bee260Compatibility[peerID] = compat -} - func (r *peerRegistry) isConnected(peerID libp2ppeer.ID, remoteAddr ma.Multiaddr) (swarm.Address, bool) { if remoteAddr == nil { return swarm.ZeroAddress, false @@ -233,7 +217,6 @@ func (r *peerRegistry) remove(overlay swarm.Address) (found, full bool, peerID l delete(r.streams, peerID) full = r.full[peerID] delete(r.full, peerID) - delete(r.bee260Compatibility, peerID) r.mu.Unlock() return found, full, peerID diff --git a/pkg/p2p/libp2p/version_test.go b/pkg/p2p/libp2p/version_test.go deleted file mode 100644 index 15e0ce3eba6..00000000000 --- a/pkg/p2p/libp2p/version_test.go +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright 2026 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package libp2p - -import ( - "context" - "testing" - - "github.com/ethersphere/bee/v2/pkg/crypto" - "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/statestore/mock" - "github.com/ethersphere/bee/v2/pkg/swarm" - libp2ppeer "github.com/libp2p/go-libp2p/core/peer" -) - -func TestBee260BackwardCompatibility(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - userAgent string - want bool - }{ - // Versions < 2.7.0 should require backward compatibility - { - name: "version 2.6.0", - userAgent: "bee/2.6.0 go1.22.0 linux/amd64", - want: true, - }, - { - name: "version 2.6.5", - userAgent: "bee/2.6.5 go1.22.0 linux/amd64", - want: true, - }, - { - name: "version 2.5.0", - userAgent: "bee/2.5.0 go1.21.0 linux/amd64", - want: true, - }, - { - name: "version 2.6.0-beta1", - userAgent: "bee/2.6.0-beta1 go1.22.0 linux/amd64", - want: true, - }, - // Versions >= 2.7.0 should NOT require backward compatibility - { - name: "version 2.7.0", - userAgent: "bee/2.7.0 go1.23.0 linux/amd64", - want: false, - }, - { - name: "version 2.8.0", - userAgent: "bee/2.8.0 go1.23.0 linux/amd64", - want: false, - }, - { - name: "version 3.0.0", - userAgent: "bee/3.0.0 go1.25.0 linux/amd64", - want: false, - }, - // Pre-release versions >= 2.7.0 should NOT require backward compatibility - // This is the critical fix: 2.7.0-rcX should be treated as >= 2.7.0 - { - name: "version 2.7.0-rc1", - userAgent: "bee/2.7.0-rc1 go1.23.0 linux/amd64", - want: false, - }, - { - name: "version 2.7.0-rc12", - userAgent: "bee/2.7.0-rc12-b39629d5-dirty go1.25.6 linux/amd64", - want: false, - }, - { - name: "version 2.7.0-beta1", - userAgent: "bee/2.7.0-beta1 go1.23.0 linux/amd64", - want: false, - }, - { - name: "version 2.8.0-rc1", - userAgent: "bee/2.8.0-rc1 go1.24.0 linux/amd64", - want: false, - }, - { - name: "version 2.9.0-beta1", - userAgent: "bee/2.9.0-beta1 go1.24.0 linux/amd64", - want: false, - }, - // Edge cases that should return false (not requiring backward compat) - { - name: "empty user agent", - userAgent: "", - want: false, - }, - { - name: "malformed user agent missing space", - userAgent: "bee/2.6.0", - want: false, - }, - { - name: "non-bee user agent", - userAgent: "other/1.0.0 go1.22.0 linux/amd64", - want: false, - }, - { - name: "invalid version format", - userAgent: "bee/invalid go1.22.0 linux/amd64", - want: false, - }, - { - name: "default libp2p user agent", - userAgent: "github.com/libp2p/go-libp2p", - want: false, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - // Create a service with minimal configuration - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - swarmKey, err := crypto.GenerateSecp256k1Key() - if err != nil { - t.Fatal(err) - } - - overlay := swarm.RandAddress(t) - addr := ":0" - networkID := uint64(1) - - statestore := mock.NewStateStore() - defer statestore.Close() - - s, err := New(ctx, crypto.NewDefaultSigner(swarmKey), networkID, overlay, addr, nil, statestore, nil, log.Noop, nil, Options{}) - if err != nil { - t.Fatal(err) - } - defer s.Close() - - // Create a random test peer ID - we only need any valid libp2p peer ID - // The peerstore lookup will be mocked by setting the AgentVersion directly - libp2pPeerID, err := libp2ppeer.Decode("16Uiu2HAm3g4hXfCWTDhPBq3KkqpV3wGkPVgMJY3Jt8gGTYWiTWNZ") - if err != nil { - t.Fatal(err) - } - - // Set the user agent in the peerstore if provided - if tc.userAgent != "" { - if err := s.host.Peerstore().Put(libp2pPeerID, "AgentVersion", tc.userAgent); err != nil { - t.Fatal(err) - } - } - - // Test the backward compatibility check - got := s.bee260BackwardCompatibility(libp2pPeerID) - if got != tc.want { - t.Errorf("bee260BackwardCompatibility() = %v, want %v (userAgent: %q)", got, tc.want, tc.userAgent) - } - }) - } -} - -func TestBee260Cache(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - swarmKey, err := crypto.GenerateSecp256k1Key() - if err != nil { - t.Fatal(err) - } - - overlay := swarm.RandAddress(t) - addr := ":0" - networkID := uint64(1) - - statestore := mock.NewStateStore() - defer statestore.Close() - - s, err := New(ctx, crypto.NewDefaultSigner(swarmKey), networkID, overlay, addr, nil, statestore, nil, log.Noop, nil, Options{}) - if err != nil { - t.Fatal(err) - } - defer s.Close() - - libp2pPeerID, err := libp2ppeer.Decode("16Uiu2HAm3g4hXfCWTDhPBq3KkqpV3wGkPVgMJY3Jt8gGTYWiTWNZ") - if err != nil { - t.Fatal(err) - } - - // 1. Set user agent to 2.6.0 (compat = true) - if err := s.host.Peerstore().Put(libp2pPeerID, "AgentVersion", "bee/2.6.0 go1.22.0 linux/amd64"); err != nil { - t.Fatal(err) - } - - // 2. First call should calculate and cache it - if !s.bee260BackwardCompatibility(libp2pPeerID) { - t.Fatal("expected true for 2.6.0") - } - - // 3. Verify it's in the cache - compat, found := s.peers.bee260(libp2pPeerID) - if !found { - t.Fatal("expected value to be in cache") - } - if !compat { - t.Fatal("expected cached value to be true") - } - - // 4. Change user agent in peerstore to 2.7.0 (compat = false) - // If caching works, bee260BackwardCompatibility should still return true - if err := s.host.Peerstore().Put(libp2pPeerID, "AgentVersion", "bee/2.7.0 go1.23.0 linux/amd64"); err != nil { - t.Fatal(err) - } - - if !s.bee260BackwardCompatibility(libp2pPeerID) { - t.Fatal("expected true (cached value) even if peerstore changed") - } - - // 5. Clear cache (manually for testing) - s.peers.mu.Lock() - delete(s.peers.bee260Compatibility, libp2pPeerID) - s.peers.mu.Unlock() - - // 6. Now it should re-calculate and return false for 2.7.0 - if s.bee260BackwardCompatibility(libp2pPeerID) { - t.Fatal("expected false for 2.7.0 after cache clear") - } -} diff --git a/pkg/p2p/p2p.go b/pkg/p2p/p2p.go index ca725f73dd2..b709ffa5113 100644 --- a/pkg/p2p/p2p.go +++ b/pkg/p2p/p2p.go @@ -141,12 +141,6 @@ type Streamer interface { NewStream(ctx context.Context, address swarm.Address, h Headers, protocol, version, stream string) (Stream, error) } -// Bee260CompatibilityStreamer is able to create a new Stream and check if a peer is running Bee 2.6.0. -type Bee260CompatibilityStreamer interface { - NewStream(ctx context.Context, address swarm.Address, h Headers, protocol, version, stream string) (Stream, error) - IsBee260(address swarm.Address) bool -} - type StreamerDisconnecter interface { Streamer Disconnecter @@ -243,18 +237,3 @@ func (e *ChunkDeliveryError) Error() string { func NewChunkDeliveryError(msg string) error { return &ChunkDeliveryError{msg: msg} } - -// FilterBee260CompatibleUnderlays select a single underlay to pass if -// bee260compatibility is true. Otherwise it passes the unmodified underlays -// slice. This function can be safely removed when bee version 2.6.0 is -// deprecated. -func FilterBee260CompatibleUnderlays(bee260compatibility bool, underlays []ma.Multiaddr) []ma.Multiaddr { - if !bee260compatibility { - return underlays - } - underlay := bzz.SelectBestAdvertisedAddress(underlays, nil) - if underlay == nil { - return underlays - } - return []ma.Multiaddr{underlay} -} diff --git a/pkg/p2p/streamtest/streamtest.go b/pkg/p2p/streamtest/streamtest.go index be491c3d5eb..3265f6ea566 100644 --- a/pkg/p2p/streamtest/streamtest.go +++ b/pkg/p2p/streamtest/streamtest.go @@ -223,12 +223,6 @@ func (r *Recorder) WaitRecords(t *testing.T, addr swarm.Address, proto, version, return recs } -// IsBee260 implements p2p.Bee260CompatibilityStreamer interface. -// It always returns false. -func (r *Recorder) IsBee260(overlay swarm.Address) bool { - return false -} - type Record struct { in *record out *record From af6c38e0bccb23a6e0fdae4674c4e619b874f8ef Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 30 Mar 2026 10:51:03 +0200 Subject: [PATCH 2/5] chore: remove legacy code for underlay addresses --- pkg/bzz/underlay.go | 22 +----------------- pkg/bzz/underlay_test.go | 49 ---------------------------------------- 2 files changed, 1 insertion(+), 70 deletions(-) diff --git a/pkg/bzz/underlay.go b/pkg/bzz/underlay.go index fdddf21a404..38e83c2a4a8 100644 --- a/pkg/bzz/underlay.go +++ b/pkg/bzz/underlay.go @@ -20,15 +20,7 @@ import ( const underlayListPrefix byte = 0x99 // SerializeUnderlays serializes a slice of multiaddrs into a single byte slice. -// If the slice contains exactly one address, the standard, backward-compatible -// multiaddr format is used. For zero or more than one address, a custom list format -// prefixed with a magic byte is utilized. func SerializeUnderlays(addrs []multiaddr.Multiaddr) []byte { - // Backward compatibility if exactly one address is present. - if len(addrs) == 1 { - return addrs[0].Bytes() - } - // For 0 or 2+ addresses, the custom list format with the prefix is used. // The format is: [prefix_byte][varint_len_1][addr_1_bytes]... var buf bytes.Buffer @@ -49,19 +41,7 @@ func DeserializeUnderlays(data []byte) ([]multiaddr.Multiaddr, error) { if len(data) == 0 { return nil, errors.New("cannot deserialize empty byte slice") } - - // If the data begins with the magic prefix, it is handled as a list. - if data[0] == underlayListPrefix { - return deserializeList(data[1:]) - } - - // Otherwise, the data is handled as a single, backward-compatible multiaddr. - addr, err := multiaddr.NewMultiaddrBytes(data) - if err != nil { - return nil, fmt.Errorf("failed to parse as single multiaddr: %w", err) - } - // The result is returned as a single-element slice for a consistent return type. - return []multiaddr.Multiaddr{addr}, nil + return deserializeList(data[1:]) } // deserializeList handles the parsing of the custom list format. diff --git a/pkg/bzz/underlay_test.go b/pkg/bzz/underlay_test.go index 5bc4e34c19d..2e50cedd0ac 100644 --- a/pkg/bzz/underlay_test.go +++ b/pkg/bzz/underlay_test.go @@ -29,19 +29,6 @@ func TestSerializeUnderlays(t *testing.T) { } }) - t.Run("single address list", func(t *testing.T) { - addrs := []multiaddr.Multiaddr{dnsSwarmAddr} - serialized := bzz.SerializeUnderlays(addrs) - expected := dnsSwarmAddr.Bytes() // Should be legacy format without prefix - - if !bytes.Equal(serialized, expected) { - t.Errorf("expected single address to serialize to legacy format %x, got %x", expected, serialized) - } - if serialized[0] == bzz.UnderlayListPrefix { - t.Error("single address serialization should not have the list prefix") - } - }) - t.Run("empty list", func(t *testing.T) { addrs := []multiaddr.Multiaddr{} serialized := bzz.SerializeUnderlays(addrs) @@ -205,42 +192,6 @@ func TestSerializeUnderlaysDeserializeUnderlays(t *testing.T) { }) } -func TestLegacyCompatibility(t *testing.T) { - ip4TCPAddr := mustNewMultiaddr(t, "/ip4/1.2.3.4/tcp/5678/p2p/QmWqeeHEqG2db37JsuKUxyJ2JF8LtVJMGohKVT8h3aeCVH") - p2pAddr := mustNewMultiaddr(t, "/ip4/65.108.66.216/tcp/16341/p2p/QmVuCJ3M96c7vwv4MQBv7WY1HWQacyCEHvM99R8MUDj95d") - dnsSwarmAddr := mustNewMultiaddr(t, "/dnsaddr/mainnet.ethswarm.org") - - t.Run("legacy parser fails on new list format", func(t *testing.T) { - addrs := []multiaddr.Multiaddr{ip4TCPAddr, p2pAddr, dnsSwarmAddr} - listBytes := bzz.SerializeUnderlays(addrs) // This will have the prefix - _, err := multiaddr.NewMultiaddrBytes(listBytes) - if err == nil { - t.Error("expected legacy NewMultiaddrBytes to fail on list format, but it succeeded") - } - }) - - t.Run("legacy parser succeeds on new single-addr format", func(t *testing.T) { - addrs := []multiaddr.Multiaddr{dnsSwarmAddr} - singleBytes := bzz.SerializeUnderlays(addrs) // This will NOT have the prefix - _, err := multiaddr.NewMultiaddrBytes(singleBytes) - if err != nil { - t.Errorf("expected legacy NewMultiaddrBytes to succeed on single-addr format, but it failed: %v", err) - } - }) - - t.Run("new parser succeeds on legacy format", func(t *testing.T) { - singleBytes := p2pAddr.Bytes() - deserialized, err := bzz.DeserializeUnderlays(singleBytes) - if err != nil { - t.Fatalf("Deserialize failed on legacy bytes: %v", err) - } - expected := []multiaddr.Multiaddr{p2pAddr} - if !reflect.DeepEqual(expected, deserialized) { - t.Errorf("expected %v, got %v", expected, deserialized) - } - }) -} - func mustNewMultiaddr(tb testing.TB, s string) multiaddr.Multiaddr { tb.Helper() From 71d07fdb17ec03cc564487ed5cf6789dea48b9e0 Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 30 Mar 2026 11:21:54 +0200 Subject: [PATCH 3/5] chore: noop old migrations and remove legacy underlay test --- pkg/bzz/underlay_test.go | 11 - pkg/p2p/libp2p/libp2p.go | 3 - pkg/statestore/storeadapter/migration.go | 72 +----- pkg/storer/internal/reserve/olditems.go | 180 -------------- .../internal/stampindex/oldstampindex.go | 115 --------- pkg/storer/migration/refCntSize.go | 144 +---------- pkg/storer/migration/refCntSize_test.go | 52 ---- pkg/storer/migration/resetEpochTimestamp.go | 19 +- .../migration/resetEpochTimestamp_test.go | 52 ---- pkg/storer/migration/step_02.go | 52 +--- pkg/storer/migration/step_02_test.go | 77 ------ pkg/storer/migration/step_04.go | 48 +--- pkg/storer/migration/step_04_test.go | 98 -------- pkg/storer/migration/step_05.go | 48 +--- pkg/storer/migration/step_05_test.go | 107 -------- pkg/storer/migration/step_06.go | 228 +----------------- pkg/storer/migration/step_06_test.go | 194 --------------- 17 files changed, 46 insertions(+), 1454 deletions(-) delete mode 100644 pkg/storer/internal/reserve/olditems.go delete mode 100644 pkg/storer/internal/stampindex/oldstampindex.go delete mode 100644 pkg/storer/migration/refCntSize_test.go delete mode 100644 pkg/storer/migration/resetEpochTimestamp_test.go delete mode 100644 pkg/storer/migration/step_02_test.go delete mode 100644 pkg/storer/migration/step_04_test.go delete mode 100644 pkg/storer/migration/step_05_test.go delete mode 100644 pkg/storer/migration/step_06_test.go diff --git a/pkg/bzz/underlay_test.go b/pkg/bzz/underlay_test.go index 2e50cedd0ac..7caaf6fd64a 100644 --- a/pkg/bzz/underlay_test.go +++ b/pkg/bzz/underlay_test.go @@ -65,17 +65,6 @@ func TestDeserializeUnderlays(t *testing.T) { } }) - t.Run("single legacy multiaddr", func(t *testing.T) { - singleBytes := wssAddr.Bytes() - deserialized, err := bzz.DeserializeUnderlays(singleBytes) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if len(deserialized) != 1 || !deserialized[0].Equal(wssAddr) { - t.Errorf("expected [%v], got %v", wssAddr, deserialized) - } - }) - t.Run("empty byte slice", func(t *testing.T) { _, err := bzz.DeserializeUnderlays([]byte{}) if err == nil { diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index de354e7b44b..410ed90a60f 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -20,7 +20,6 @@ import ( "time" ocprom "contrib.go.opencensus.io/exporter/prometheus" - "github.com/coreos/go-semver/semver" "github.com/ethersphere/bee/v2" "github.com/ethersphere/bee/v2/pkg/addressbook" "github.com/ethersphere/bee/v2/pkg/bzz" @@ -1464,8 +1463,6 @@ func (s *Service) peerMultiaddrs(ctx context.Context, peerID libp2ppeer.ID) ([]m return buildFullMAs(mas, peerID) } -var version270 = *semver.Must(semver.NewVersion("2.7.0")) - // appendSpace adds a leading space character if the string is not empty. // It is useful for constructing log messages with conditional substrings. func appendSpace(s string) string { diff --git a/pkg/statestore/storeadapter/migration.go b/pkg/statestore/storeadapter/migration.go index 2444e8b06c6..2008c810e99 100644 --- a/pkg/statestore/storeadapter/migration.go +++ b/pkg/statestore/storeadapter/migration.go @@ -5,69 +5,23 @@ package storeadapter import ( - "strings" - - "github.com/ethersphere/bee/v2/pkg/puller" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/migration" ) -func allSteps(st storage.Store) migration.Steps { +// allSteps lists all state store migration steps. +// All steps are now NOOPs since all nodes have already run these migrations, +// and new nodes start with an empty database. +func allSteps(_ storage.Store) migration.Steps { + noop := func() error { return nil } return map[uint64]migration.StepFn{ - 1: epochMigration(st), - 2: deletePrefix(st, puller.IntervalPrefix), - 3: deletePrefix(st, puller.IntervalPrefix), - 4: deletePrefix(st, "blocklist"), - 5: deletePrefix(st, "batchstore"), - 6: deletePrefix(st, puller.IntervalPrefix), - 7: deletePrefix(st, puller.IntervalPrefix), - 8: deletePrefix(st, puller.IntervalPrefix), - } -} - -func deletePrefix(s storage.Store, prefix string) migration.StepFn { - return func() error { - store := &StateStorerAdapter{s} - return store.Iterate(prefix, func(key, val []byte) (stop bool, err error) { - return false, store.Delete(string(key)) - }) - } -} - -func epochMigration(s storage.Store) migration.StepFn { - - return func() error { - - var deleteEntries = []string{ - "statestore_schema", - "tags", - puller.IntervalPrefix, - "kademlia-counters", - "addressbook", - "batch", - } - - return s.Iterate(storage.Query{ - Factory: func() storage.Item { return &rawItem{&proxyItem{obj: []byte(nil)}} }, - }, func(res storage.Result) (stop bool, err error) { - if strings.HasPrefix(res.ID, stateStoreNamespace) { - return false, nil - } - for _, e := range deleteEntries { - if strings.HasPrefix(res.ID, e) { - _ = s.Delete(&rawItem{&proxyItem{key: res.ID}}) - return false, nil - } - } - - item := res.Entry.(*rawItem) - item.key = res.ID - item.ns = stateStoreNamespace - if err := s.Put(item); err != nil { - return true, err - } - _ = s.Delete(&rawItem{&proxyItem{key: res.ID}}) - return false, nil - }) + 1: noop, + 2: noop, + 3: noop, + 4: noop, + 5: noop, + 6: noop, + 7: noop, + 8: noop, } } diff --git a/pkg/storer/internal/reserve/olditems.go b/pkg/storer/internal/reserve/olditems.go deleted file mode 100644 index 37bcdd9cc75..00000000000 --- a/pkg/storer/internal/reserve/olditems.go +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright 2024 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package reserve - -import ( - "encoding/binary" - "path" - - "github.com/ethersphere/bee/v2/pkg/storage" - "github.com/ethersphere/bee/v2/pkg/swarm" -) - -// BatchRadiusItemV1 allows iteration of the chunks with respect to bin and batchID. -// Used for batch evictions of certain bins. -type BatchRadiusItemV1 struct { - Bin uint8 - BatchID []byte - Address swarm.Address - BinID uint64 -} - -func (b *BatchRadiusItemV1) Namespace() string { - return "batchRadius" -} - -func (b *BatchRadiusItemV1) ID() string { - return string(b.BatchID) + string(b.Bin) + b.Address.ByteString() -} - -func (b *BatchRadiusItemV1) String() string { - return path.Join(b.Namespace(), b.ID()) -} - -func (b *BatchRadiusItemV1) Clone() storage.Item { - if b == nil { - return nil - } - return &BatchRadiusItemV1{ - Bin: b.Bin, - BatchID: copyBytes(b.BatchID), - Address: b.Address.Clone(), - BinID: b.BinID, - } -} - -const batchRadiusItemSizeV1 = 1 + swarm.HashSize + swarm.HashSize + 8 - -func (b *BatchRadiusItemV1) Marshal() ([]byte, error) { - - if b.Address.IsZero() { - return nil, errMarshalInvalidAddress - } - - buf := make([]byte, batchRadiusItemSizeV1) - - i := 0 - - buf[i] = b.Bin - i += 1 - - copy(buf[i:i+swarm.HashSize], b.BatchID) - i += swarm.HashSize - - copy(buf[i:i+swarm.HashSize], b.Address.Bytes()) - i += swarm.HashSize - - binary.BigEndian.PutUint64(buf[i:i+8], b.BinID) - - return buf, nil -} - -func (b *BatchRadiusItemV1) Unmarshal(buf []byte) error { - - if len(buf) != batchRadiusItemSizeV1 { - return errUnmarshalInvalidSize - } - - i := 0 - b.Bin = buf[i] - i += 1 - - b.BatchID = copyBytes(buf[i : i+swarm.HashSize]) - i += swarm.HashSize - - b.Address = swarm.NewAddress(buf[i : i+swarm.HashSize]).Clone() - i += swarm.HashSize - - b.BinID = binary.BigEndian.Uint64(buf[i : i+8]) - - return nil -} - -// ChunkBinItemV1 allows for iterating on ranges of bin and binIDs for chunks. -// BinIDs come in handy when syncing the reserve contents with other peers. -type ChunkBinItemV1 struct { - Bin uint8 - BinID uint64 - Address swarm.Address - BatchID []byte - ChunkType swarm.ChunkType -} - -func (c *ChunkBinItemV1) Namespace() string { - return "chunkBin" -} - -func (c *ChunkBinItemV1) ID() string { - return binIDToString(c.Bin, c.BinID) -} - -func (c *ChunkBinItemV1) String() string { - return path.Join(c.Namespace(), c.ID()) -} - -func (c *ChunkBinItemV1) Clone() storage.Item { - if c == nil { - return nil - } - return &ChunkBinItemV1{ - Bin: c.Bin, - BinID: c.BinID, - Address: c.Address.Clone(), - BatchID: copyBytes(c.BatchID), - ChunkType: c.ChunkType, - } -} - -const chunkBinItemSizeV1 = 1 + 8 + swarm.HashSize + swarm.HashSize + 1 - -func (c *ChunkBinItemV1) Marshal() ([]byte, error) { - - if c.Address.IsZero() { - return nil, errMarshalInvalidAddress - } - - buf := make([]byte, chunkBinItemSizeV1) - i := 0 - - buf[i] = c.Bin - i += 1 - - binary.BigEndian.PutUint64(buf[i:i+8], c.BinID) - i += 8 - - copy(buf[i:i+swarm.HashSize], c.Address.Bytes()) - i += swarm.HashSize - - copy(buf[i:i+swarm.HashSize], c.BatchID) - i += swarm.HashSize - - buf[i] = uint8(c.ChunkType) - - return buf, nil -} - -func (c *ChunkBinItemV1) Unmarshal(buf []byte) error { - - if len(buf) != chunkBinItemSizeV1 { - return errUnmarshalInvalidSize - } - - i := 0 - c.Bin = buf[i] - i += 1 - - c.BinID = binary.BigEndian.Uint64(buf[i : i+8]) - i += 8 - - c.Address = swarm.NewAddress(buf[i : i+swarm.HashSize]).Clone() - i += swarm.HashSize - - c.BatchID = copyBytes(buf[i : i+swarm.HashSize]) - i += swarm.HashSize - - c.ChunkType = swarm.ChunkType(buf[i]) - - return nil -} diff --git a/pkg/storer/internal/stampindex/oldstampindex.go b/pkg/storer/internal/stampindex/oldstampindex.go deleted file mode 100644 index de24a33c21b..00000000000 --- a/pkg/storer/internal/stampindex/oldstampindex.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2024 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package stampindex - -import ( - "encoding/binary" - "fmt" - - "github.com/ethersphere/bee/v2/pkg/storage" - "github.com/ethersphere/bee/v2/pkg/storage/storageutil" - "github.com/ethersphere/bee/v2/pkg/storer/internal" - "github.com/ethersphere/bee/v2/pkg/swarm" -) - -// ItemV1 is an store.Item that represents data relevant to stamp. -type ItemV1 struct { - // Keys. - namespace []byte // The namespace of other related item. - BatchID []byte - StampIndex []byte - - // Values. - StampTimestamp []byte - ChunkAddress swarm.Address - ChunkIsImmutable bool -} - -// ID implements the storage.Item interface. -func (i ItemV1) ID() string { - return fmt.Sprintf("%s/%s/%s", string(i.namespace), string(i.BatchID), string(i.StampIndex)) -} - -// Namespace implements the storage.Item interface. -func (i ItemV1) Namespace() string { - return "stampIndex" -} - -// Marshal implements the storage.Item interface. -func (i ItemV1) Marshal() ([]byte, error) { - switch { - case len(i.namespace) == 0: - return nil, errStampItemMarshalScopeInvalid - case len(i.BatchID) != swarm.HashSize: - return nil, errStampItemMarshalBatchIDInvalid - case len(i.StampIndex) != swarm.StampIndexSize: - return nil, errStampItemMarshalBatchIndexInvalid - } - - buf := make([]byte, 8+len(i.namespace)+swarm.HashSize+swarm.StampIndexSize+swarm.StampTimestampSize+swarm.HashSize) - - l := 0 - binary.LittleEndian.PutUint64(buf[l:l+8], uint64(len(i.namespace))) - l += 8 - copy(buf[l:l+len(i.namespace)], i.namespace) - l += len(i.namespace) - copy(buf[l:l+swarm.HashSize], i.BatchID) - l += swarm.HashSize - copy(buf[l:l+swarm.StampIndexSize], i.StampIndex) - l += swarm.StampIndexSize - copy(buf[l:l+swarm.StampTimestampSize], i.StampTimestamp) - l += swarm.StampTimestampSize - copy(buf[l:l+swarm.HashSize], internal.AddressBytesOrZero(i.ChunkAddress)) - return buf, nil -} - -// Unmarshal implements the storage.Item interface. -func (i *ItemV1) Unmarshal(bytes []byte) error { - if len(bytes) < 8 { - return errStampItemUnmarshalInvalidSize - } - nsLen := int(binary.LittleEndian.Uint64(bytes)) - if len(bytes) != 8+nsLen+swarm.HashSize+swarm.StampIndexSize+swarm.StampTimestampSize+swarm.HashSize { - return errStampItemUnmarshalInvalidSize - } - - ni := new(ItemV1) - l := 8 - ni.namespace = append(make([]byte, 0, nsLen), bytes[l:l+nsLen]...) - l += nsLen - ni.BatchID = append(make([]byte, 0, swarm.HashSize), bytes[l:l+swarm.HashSize]...) - l += swarm.HashSize - ni.StampIndex = append(make([]byte, 0, swarm.StampIndexSize), bytes[l:l+swarm.StampIndexSize]...) - l += swarm.StampIndexSize - ni.StampTimestamp = append(make([]byte, 0, swarm.StampTimestampSize), bytes[l:l+swarm.StampTimestampSize]...) - l += swarm.StampTimestampSize - ni.ChunkAddress = internal.AddressOrZero(bytes[l : l+swarm.HashSize]) - *i = *ni - return nil -} - -// Clone implements the storage.Item interface. -func (i *ItemV1) Clone() storage.Item { - if i == nil { - return nil - } - return &ItemV1{ - namespace: append([]byte(nil), i.namespace...), - BatchID: append([]byte(nil), i.BatchID...), - StampIndex: append([]byte(nil), i.StampIndex...), - StampTimestamp: append([]byte(nil), i.StampTimestamp...), - ChunkAddress: i.ChunkAddress.Clone(), - ChunkIsImmutable: i.ChunkIsImmutable, - } -} - -// String implements the fmt.Stringer interface. -func (i ItemV1) String() string { - return storageutil.JoinFields(i.Namespace(), i.ID()) -} - -func (i *ItemV1) SetNamespace(ns []byte) { - i.namespace = ns -} diff --git a/pkg/storer/migration/refCntSize.go b/pkg/storer/migration/refCntSize.go index 2075c50effb..1e1e1803022 100644 --- a/pkg/storer/migration/refCntSize.go +++ b/pkg/storer/migration/refCntSize.go @@ -5,150 +5,16 @@ package migration import ( - "context" - "encoding/binary" - "errors" - "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/sharky" "github.com/ethersphere/bee/v2/pkg/storage" - "github.com/ethersphere/bee/v2/pkg/storage/storageutil" - "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore" - "github.com/ethersphere/bee/v2/pkg/swarm" -) - -const oldRretrievalIndexItemSize = swarm.HashSize + 8 + sharky.LocationSize + 1 - -var _ storage.Item = (*OldRetrievalIndexItem)(nil) - -var ( - // errMarshalInvalidRetrievalIndexAddress is returned if the RetrievalIndexItem address is zero during marshaling. - errMarshalInvalidRetrievalIndexAddress = errors.New("marshal RetrievalIndexItem: address is zero") - // errMarshalInvalidRetrievalIndexLocation is returned if the RetrievalIndexItem location is invalid during marshaling. - errMarshalInvalidRetrievalIndexLocation = errors.New("marshal RetrievalIndexItem: location is invalid") - // errUnmarshalInvalidRetrievalIndexSize is returned during unmarshaling if the passed buffer is not the expected size. - errUnmarshalInvalidRetrievalIndexSize = errors.New("unmarshal RetrievalIndexItem: invalid size") - // errUnmarshalInvalidRetrievalIndexLocationBytes is returned during unmarshaling if the location buffer is invalid. - errUnmarshalInvalidRetrievalIndexLocationBytes = errors.New("unmarshal RetrievalIndexItem: invalid location bytes") ) -// OldRetrievalIndexItem is the index which gives us the sharky location from the swarm.Address. -// The RefCnt stores the reference of each time a Put operation is issued on this Address. -type OldRetrievalIndexItem struct { - Address swarm.Address - Timestamp uint64 - Location sharky.Location - RefCnt uint8 -} - -func (r *OldRetrievalIndexItem) ID() string { return r.Address.ByteString() } - -func (OldRetrievalIndexItem) Namespace() string { return "retrievalIdx" } - -// Stored in bytes as: -// |--Address(32)--|--Timestamp(8)--|--Location(7)--|--RefCnt(1)--| -func (r *OldRetrievalIndexItem) Marshal() ([]byte, error) { - if r.Address.IsZero() { - return nil, errMarshalInvalidRetrievalIndexAddress - } - - locBuf, err := r.Location.MarshalBinary() - if err != nil { - return nil, errMarshalInvalidRetrievalIndexLocation - } - - buf := make([]byte, oldRretrievalIndexItemSize) - copy(buf, r.Address.Bytes()) - binary.LittleEndian.PutUint64(buf[swarm.HashSize:], r.Timestamp) - copy(buf[swarm.HashSize+8:], locBuf) - buf[oldRretrievalIndexItemSize-1] = r.RefCnt - return buf, nil -} - -func (r *OldRetrievalIndexItem) Unmarshal(buf []byte) error { - if len(buf) != oldRretrievalIndexItemSize { - return errUnmarshalInvalidRetrievalIndexSize - } - - loc := new(sharky.Location) - if err := loc.UnmarshalBinary(buf[swarm.HashSize+8:]); err != nil { - return errUnmarshalInvalidRetrievalIndexLocationBytes - } - - ni := new(OldRetrievalIndexItem) - ni.Address = swarm.NewAddress(append(make([]byte, 0, swarm.HashSize), buf[:swarm.HashSize]...)) - ni.Timestamp = binary.LittleEndian.Uint64(buf[swarm.HashSize:]) - ni.Location = *loc - ni.RefCnt = buf[oldRretrievalIndexItemSize-1] - *r = *ni - return nil -} - -func (r *OldRetrievalIndexItem) Clone() storage.Item { - if r == nil { - return nil - } - return &OldRetrievalIndexItem{ - Address: r.Address.Clone(), - Timestamp: r.Timestamp, - Location: r.Location, - RefCnt: r.RefCnt, - } -} - -func (r OldRetrievalIndexItem) String() string { - return storageutil.JoinFields(r.Namespace(), r.ID()) -} - -func RefCountSizeInc(s storage.BatchStore, logger log.Logger) func() error { +// RefCountSizeInc was a migration step that replaced chunkstore items +// to increase refCnt capacity from uint8 to uint32. +// It is now a NOOP since all nodes have already run this migration, +// and new nodes start with an empty database. +func RefCountSizeInc(_ storage.BatchStore, _ log.Logger) func() error { return func() error { - - logger := logger.WithName("migration-RefCountSizeInc").Register() - - logger.Info("starting migration of replacing chunkstore items to increase refCnt capacity") - - var itemsToDelete []*OldRetrievalIndexItem - - err := s.Iterate( - storage.Query{ - Factory: func() storage.Item { return &OldRetrievalIndexItem{} }, - }, - func(res storage.Result) (bool, error) { - item := res.Entry.(*OldRetrievalIndexItem) - itemsToDelete = append(itemsToDelete, item) - return false, nil - }, - ) - if err != nil { - return err - } - - for i := 0; i < len(itemsToDelete); i += 10000 { - end := min(i+10000, len(itemsToDelete)) - - b := s.Batch(context.Background()) - for _, item := range itemsToDelete[i:end] { - - //create new - err = b.Put(&chunkstore.RetrievalIndexItem{ - Address: item.Address, - Timestamp: item.Timestamp, - Location: item.Location, - RefCnt: uint32(item.RefCnt), - }) - if err != nil { - return err - } - } - - err = b.Commit() - if err != nil { - return err - } - } - - logger.Info("migration complete") - return nil } } diff --git a/pkg/storer/migration/refCntSize_test.go b/pkg/storer/migration/refCntSize_test.go deleted file mode 100644 index d5e5424283e..00000000000 --- a/pkg/storer/migration/refCntSize_test.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2023 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package migration_test - -import ( - "math/rand" - "testing" - - "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/sharky" - "github.com/ethersphere/bee/v2/pkg/storage/inmemstore" - "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore" - localmigration "github.com/ethersphere/bee/v2/pkg/storer/migration" - "github.com/ethersphere/bee/v2/pkg/swarm" - "github.com/stretchr/testify/assert" -) - -func Test_RefCntSize(t *testing.T) { - t.Parallel() - - stepFn := localmigration.RefCountSizeInc - store := inmemstore.New() - - // simulate old cacheEntryItem with some random bytes. - oldItems := make([]*localmigration.OldRetrievalIndexItem, 0, 10) - for range 10 { - entry := &localmigration.OldRetrievalIndexItem{ - Address: swarm.RandAddress(t), - Timestamp: uint64(rand.Int()), - Location: sharky.Location{Shard: uint8(rand.Int()), Slot: uint32(rand.Int()), Length: uint16(rand.Int())}, - RefCnt: uint8(rand.Int()), - } - oldItems = append(oldItems, entry) - err := store.Put(entry) - assert.NoError(t, err) - } - - assert.NoError(t, stepFn(store, log.Noop)()) - - // check if all entries are migrated. - for _, entry := range oldItems { - cEntry := &chunkstore.RetrievalIndexItem{Address: entry.Address} - err := store.Get(cEntry) - assert.NoError(t, err) - assert.Equal(t, entry.Address, cEntry.Address) - assert.Equal(t, entry.Timestamp, cEntry.Timestamp) - assert.Equal(t, entry.Location, cEntry.Location) - assert.Equal(t, uint32(entry.RefCnt), cEntry.RefCnt) - } -} diff --git a/pkg/storer/migration/resetEpochTimestamp.go b/pkg/storer/migration/resetEpochTimestamp.go index 11c42ba7616..4cbd93e741d 100644 --- a/pkg/storer/migration/resetEpochTimestamp.go +++ b/pkg/storer/migration/resetEpochTimestamp.go @@ -4,19 +4,14 @@ package migration -import ( - "context" +import "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" - "github.com/ethersphere/bee/v2/pkg/storer/internal/reserve" - "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" -) - -// resetReserveEpochTimestamp is a migration that resets the epoch timestamp of the reserve -// so that peers in the network can resync chunks. -func resetReserveEpochTimestamp(st transaction.Storage) func() error { +// resetReserveEpochTimestamp was a migration that reset the epoch timestamp +// of the reserve so that peers in the network could resync chunks. +// It is now a NOOP since all nodes have already run this migration, +// and new nodes start with an empty database. +func resetReserveEpochTimestamp(_ transaction.Storage) func() error { return func() error { - return st.Run(context.Background(), func(s transaction.Store) error { - return s.IndexStore().Delete(&reserve.EpochItem{}) - }) + return nil } } diff --git a/pkg/storer/migration/resetEpochTimestamp_test.go b/pkg/storer/migration/resetEpochTimestamp_test.go deleted file mode 100644 index f965da458c2..00000000000 --- a/pkg/storer/migration/resetEpochTimestamp_test.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2024 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package migration_test - -import ( - "context" - "testing" - "time" - - "github.com/ethersphere/bee/v2/pkg/sharky" - "github.com/ethersphere/bee/v2/pkg/storage/inmemstore" - "github.com/ethersphere/bee/v2/pkg/storer/internal/reserve" - "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" - localmigration "github.com/ethersphere/bee/v2/pkg/storer/migration" - "github.com/ethersphere/bee/v2/pkg/swarm" - "github.com/ethersphere/bee/v2/pkg/util/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func Test_ResetEpochTimestamp(t *testing.T) { - t.Parallel() - - sharkyDir := t.TempDir() - sharkyStore, err := sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize) - assert.NoError(t, err) - store := inmemstore.New() - storage := transaction.NewStorage(sharkyStore, store) - testutil.CleanupCloser(t, storage) - - err = storage.Run(context.Background(), func(s transaction.Store) error { - return s.IndexStore().Put(&reserve.EpochItem{Timestamp: uint64(time.Now().Second())}) - }) - require.NoError(t, err) - - has, err := storage.IndexStore().Has(&reserve.EpochItem{}) - require.NoError(t, err) - if !has { - t.Fatal("epoch item should exist") - } - - err = localmigration.ResetEpochTimestamp(storage)() - require.NoError(t, err) - - has, err = storage.IndexStore().Has(&reserve.EpochItem{}) - require.NoError(t, err) - if has { - t.Fatal("epoch item should be deleted") - } -} diff --git a/pkg/storer/migration/step_02.go b/pkg/storer/migration/step_02.go index 7e2a087f51c..781bb9f06f4 100644 --- a/pkg/storer/migration/step_02.go +++ b/pkg/storer/migration/step_02.go @@ -4,53 +4,13 @@ package migration -import ( - "context" - "time" - - storage "github.com/ethersphere/bee/v2/pkg/storage" - "github.com/ethersphere/bee/v2/pkg/storer/internal/cache" - "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" - "github.com/ethersphere/bee/v2/pkg/swarm" -) - -// step_02 migrates the cache to the new format. -// the old cacheEntry item has the same key, but the value is different. So only -// a Put is needed. -func step_02(st transaction.Storage) func() error { +import "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" +// step_02 was a migration step that migrated the cache to a new format. +// It is now a NOOP since all nodes have already run this migration, +// and new nodes start with an empty database. +func step_02(_ transaction.Storage) func() error { return func() error { - - trx, done := st.NewTransaction(context.Background()) - defer done() - - var entries []*cache.CacheEntryItem - err := trx.IndexStore().Iterate( - storage.Query{ - Factory: func() storage.Item { return &cache.CacheEntryItem{} }, - ItemProperty: storage.QueryItemID, - }, - func(res storage.Result) (bool, error) { - entry := &cache.CacheEntryItem{ - Address: swarm.NewAddress([]byte(res.ID)), - AccessTimestamp: time.Now().UnixNano(), - } - entries = append(entries, entry) - return false, nil - }, - ) - if err != nil { - return err - } - - for _, entry := range entries { - err := trx.IndexStore().Put(entry) - if err != nil { - return err - } - } - - return trx.Commit() + return nil } - } diff --git a/pkg/storer/migration/step_02_test.go b/pkg/storer/migration/step_02_test.go deleted file mode 100644 index c1c741ac2fa..00000000000 --- a/pkg/storer/migration/step_02_test.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2023 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package migration_test - -import ( - "context" - "crypto/rand" - "testing" - - "github.com/stretchr/testify/assert" - - storage "github.com/ethersphere/bee/v2/pkg/storage" - "github.com/ethersphere/bee/v2/pkg/storer/internal" - "github.com/ethersphere/bee/v2/pkg/storer/internal/cache" - "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" - localmigration "github.com/ethersphere/bee/v2/pkg/storer/migration" - "github.com/ethersphere/bee/v2/pkg/swarm" -) - -type testEntry struct { - address swarm.Address -} - -func (e *testEntry) Namespace() string { return "cacheEntry" } - -func (e *testEntry) ID() string { return e.address.ByteString() } - -func (e *testEntry) Marshal() ([]byte, error) { - buf := make([]byte, 32*3) - _, _ = rand.Read(buf) - return buf, nil -} - -func (e *testEntry) Unmarshal(buf []byte) error { - return nil -} - -func (e *testEntry) Clone() storage.Item { - return &testEntry{ - address: e.address, - } -} - -func (e testEntry) String() string { - return "testEntry" -} - -func Test_Step_02(t *testing.T) { - t.Parallel() - - stepFn := localmigration.Step_02 - store := internal.NewInmemStorage() - - // simulate old cacheEntryItem with some random bytes. - addrs := make([]*testEntry, 0, 10) - for range 10 { - entry := &testEntry{address: swarm.RandAddress(t)} - addrs = append(addrs, entry) - err := store.Run(context.Background(), func(s transaction.Store) error { - return s.IndexStore().Put(entry) - }) - assert.NoError(t, err) - } - - assert.NoError(t, stepFn(store)()) - - // check if all entries are migrated. - for _, entry := range addrs { - cEntry := &cache.CacheEntryItem{Address: entry.address} - err := store.IndexStore().Get(cEntry) - assert.NoError(t, err) - assert.Equal(t, entry.address, cEntry.Address) - assert.Greater(t, cEntry.AccessTimestamp, int64(0)) - } -} diff --git a/pkg/storer/migration/step_04.go b/pkg/storer/migration/step_04.go index 481b6744d83..bcfa495408e 100644 --- a/pkg/storer/migration/step_04.go +++ b/pkg/storer/migration/step_04.go @@ -5,53 +5,15 @@ package migration import ( - "context" - "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/sharky" - "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" - "github.com/ethersphere/bee/v2/pkg/swarm" ) -// step_04 is the fourth step of the migration. It forces a sharky recovery to -// be run on the localstore. -func step_04( - sharkyBasePath string, - sharkyNoOfShards int, - st transaction.Storage, - logger log.Logger, -) func() error { +// step_04 was a migration step that forced a sharky recovery on the localstore. +// It is now a NOOP since all nodes have already run this migration, +// and new nodes start with an empty database. +func step_04(_ string, _ int, _ transaction.Storage, _ log.Logger) func() error { return func() error { - // for in-mem store, skip this step - if sharkyBasePath == "" { - return nil - } - logger := logger.WithName("migration-step-04").Register() - - logger.Info("starting sharky recovery") - sharkyRecover, err := sharky.NewRecovery(sharkyBasePath, sharkyNoOfShards, swarm.SocMaxChunkSize) - if err != nil { - return err - } - - c := chunkstore.IterateLocations(context.Background(), st.IndexStore()) - - for res := range c { - if res.Err != nil { - return res.Err - } - - if err := sharkyRecover.Add(res.Location); err != nil { - return err - } - } - - if err := sharkyRecover.Save(); err != nil { - return err - } - logger.Info("finished sharky recovery") - - return sharkyRecover.Close() + return nil } } diff --git a/pkg/storer/migration/step_04_test.go b/pkg/storer/migration/step_04_test.go deleted file mode 100644 index fcf54225af4..00000000000 --- a/pkg/storer/migration/step_04_test.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2023 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package migration_test - -import ( - "context" - "io/fs" - "os" - "path/filepath" - "testing" - - "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/sharky" - "github.com/ethersphere/bee/v2/pkg/storage/inmemstore" - chunktest "github.com/ethersphere/bee/v2/pkg/storage/testing" - "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore" - "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" - localmigration "github.com/ethersphere/bee/v2/pkg/storer/migration" - "github.com/ethersphere/bee/v2/pkg/swarm" - "github.com/stretchr/testify/assert" -) - -type dirFS struct { - basedir string -} - -func (d *dirFS) Open(path string) (fs.File, error) { - return os.OpenFile(filepath.Join(d.basedir, path), os.O_RDWR|os.O_CREATE, 0644) -} - -func Test_Step_04(t *testing.T) { - t.Parallel() - - sharkyDir := t.TempDir() - sharkyStore, err := sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize) - assert.NoError(t, err) - store := inmemstore.New() - storage := transaction.NewStorage(sharkyStore, store) - - stepFn := localmigration.Step_04(sharkyDir, 1, storage, log.Noop) - - chunks := chunktest.GenerateTestRandomChunks(10) - - for _, ch := range chunks { - err = storage.Run(context.Background(), func(s transaction.Store) error { - return s.ChunkStore().Put(context.Background(), ch) - }) - assert.NoError(t, err) - } - - for _, ch := range chunks[:2] { - err = storage.Run(context.Background(), func(s transaction.Store) error { - return s.IndexStore().Delete(&chunkstore.RetrievalIndexItem{Address: ch.Address()}) - }) - assert.NoError(t, err) - } - - err = storage.Close() - assert.NoError(t, err) - - assert.NoError(t, stepFn()) - - sharkyStore, err = sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize) - assert.NoError(t, err) - - store2 := transaction.NewStorage(sharkyStore, store) - - // check that the chunks are still there - for _, ch := range chunks[2:] { - _, err := store2.ChunkStore().Get(context.Background(), ch.Address()) - assert.NoError(t, err) - } - - err = sharkyStore.Close() - assert.NoError(t, err) - - // check that the sharky files are there - f, err := os.Open(filepath.Join(sharkyDir, "free_000")) - assert.NoError(t, err) - - buf := make([]byte, 2) - _, err = f.Read(buf) - assert.NoError(t, err) - - for i := range 10 { - if i < 2 { - // if the chunk is deleted, the bit is set to 1 - assert.Greater(t, buf[i/8]&(1<<(i%8)), byte(0)) - } else { - // if the chunk is not deleted, the bit is 0 - assert.Equal(t, byte(0), buf[i/8]&(1<<(i%8))) - } - } - - assert.NoError(t, f.Close()) -} diff --git a/pkg/storer/migration/step_05.go b/pkg/storer/migration/step_05.go index 94b23d9ef67..90492210124 100644 --- a/pkg/storer/migration/step_05.go +++ b/pkg/storer/migration/step_05.go @@ -5,53 +5,15 @@ package migration import ( - "context" - "fmt" - "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" - "github.com/ethersphere/bee/v2/pkg/storer/internal/upload" ) -// step_05 is a migration step that removes all upload items from the store. -func step_05(st transaction.Storage, logger log.Logger) func() error { +// step_05 was a migration step that removed all upload items from the store. +// It is now a NOOP since all nodes have already run this migration, +// and new nodes start with an empty database. +func step_05(_ transaction.Storage, _ log.Logger) func() error { return func() error { - - logger := logger.WithName("migration-step-05").Register() - - logger.Info("start removing upload items") - - itemC := make(chan storage.Item) - errC := make(chan error) - go func() { - for item := range itemC { - err := st.Run(context.Background(), func(s transaction.Store) error { - return s.IndexStore().Delete(item) - }) - if err != nil { - errC <- fmt.Errorf("delete upload item: %w", err) - return - } - } - close(errC) - }() - - err := upload.IterateAll(st.IndexStore(), func(u storage.Item) (bool, error) { - select { - case itemC <- u: - case err := <-errC: - return true, err - } - return false, nil - }) - close(itemC) - if err != nil { - return err - } - - logger.Info("finished removing upload items") - return <-errC + return nil } - } diff --git a/pkg/storer/migration/step_05_test.go b/pkg/storer/migration/step_05_test.go deleted file mode 100644 index aeacd310f3e..00000000000 --- a/pkg/storer/migration/step_05_test.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2024 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package migration_test - -import ( - "context" - "testing" - - "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/sharky" - "github.com/ethersphere/bee/v2/pkg/storage" - "github.com/ethersphere/bee/v2/pkg/storage/leveldbstore" - chunktest "github.com/ethersphere/bee/v2/pkg/storage/testing" - - "github.com/ethersphere/bee/v2/pkg/storer/internal" - "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" - "github.com/ethersphere/bee/v2/pkg/storer/internal/upload" - localmigration "github.com/ethersphere/bee/v2/pkg/storer/migration" - "github.com/ethersphere/bee/v2/pkg/swarm" - "github.com/stretchr/testify/assert" -) - -func Test_Step_05(t *testing.T) { - t.Parallel() - - sharkyDir := t.TempDir() - sharkyStore, err := sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize) - assert.NoError(t, err) - - lstore, err := leveldbstore.New("", nil) - assert.NoError(t, err) - - store := transaction.NewStorage(sharkyStore, lstore) - t.Cleanup(func() { - err := store.Close() - if err != nil { - t.Fatalf("Close(): unexpected closing storer: %v", err) - } - }) - - ctx := context.Background() - - wantCount := func(t *testing.T, st storage.Reader, want int) { - t.Helper() - count := 0 - err := upload.IterateAll(st, func(_ storage.Item) (bool, error) { - count++ - return false, nil - }) - if err != nil { - t.Fatalf("iterate upload items: %v", err) - } - if count != want { - t.Fatalf("expected %d upload items, got %d", want, count) - } - } - - var tag upload.TagItem - err = store.Run(context.Background(), func(s transaction.Store) error { - tag, err = upload.NextTag(s.IndexStore()) - return err - }) - if err != nil { - t.Fatalf("create tag: %v", err) - } - - var putter internal.PutterCloserWithReference - err = store.Run(context.Background(), func(s transaction.Store) error { - putter, err = upload.NewPutter(s.IndexStore(), tag.TagID) - return err - }) - if err != nil { - t.Fatalf("create putter: %v", err) - } - - chunks := chunktest.GenerateTestRandomChunks(10) - - err = store.Run(context.Background(), func(s transaction.Store) error { - for _, ch := range chunks { - err := putter.Put(ctx, s, ch) - if err != nil { - return err - } - } - return nil - }) - if err != nil { - t.Fatalf("put chunk: %v", err) - } - - err = store.Run(ctx, func(s transaction.Store) error { - return putter.Close(s.IndexStore(), swarm.RandAddress(t)) - }) - if err != nil { - t.Fatalf("close putter: %v", err) - } - - wantCount(t, store.IndexStore(), 10) - - err = localmigration.Step_05(store, log.Noop)() - if err != nil { - t.Fatalf("step 05: %v", err) - } - wantCount(t, store.IndexStore(), 0) -} diff --git a/pkg/storer/migration/step_06.go b/pkg/storer/migration/step_06.go index 4160e198b4c..a976e8c1e1c 100644 --- a/pkg/storer/migration/step_06.go +++ b/pkg/storer/migration/step_06.go @@ -5,234 +5,16 @@ package migration import ( - "bytes" - "context" - "errors" - "fmt" - "runtime" - "sync/atomic" - "time" - "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/storage" - "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstamp" - "github.com/ethersphere/bee/v2/pkg/storer/internal/reserve" - "github.com/ethersphere/bee/v2/pkg/storer/internal/stampindex" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" - "golang.org/x/sync/errgroup" ) -// step_06 is a migration step that adds a stampHash to all BatchRadiusItems, ChunkBinItems and StampIndexItems. -func step_06(st transaction.Storage, logger log.Logger) func() error { +// step_06 was a migration step that added a stampHash to all +// BatchRadiusItems, ChunkBinItems and StampIndexItems. +// It is now a NOOP since all nodes have already run this migration, +// and new nodes start with an empty database. +func step_06(_ transaction.Storage, _ log.Logger) func() error { return func() error { - logger := logger.WithName("migration-step-06").Register() - - logger.Info("start adding stampHash to BatchRadiusItems, ChunkBinItems and StampIndexItems") - - seenCount, doneCount, err := addStampHash(logger, st) - if err != nil { - return fmt.Errorf("add stamp hash migration: %w", err) - } - logger.Info("finished migrating items", "seen", seenCount, "migrated", doneCount) return nil } } - -func addStampHash(logger log.Logger, st transaction.Storage) (int64, int64, error) { - - preBatchRadiusCnt, err := st.IndexStore().Count(&reserve.BatchRadiusItemV1{}) - if err != nil { - return 0, 0, err - } - - preChunkBinCnt, err := st.IndexStore().Count(&reserve.ChunkBinItemV1{}) - if err != nil { - return 0, 0, err - } - - if preBatchRadiusCnt != preChunkBinCnt { - return 0, 0, fmt.Errorf("pre-migration check: index counts do not match, %d vs %d", preBatchRadiusCnt, preChunkBinCnt) - } - - // Delete epoch timestamp - err = st.Run(context.Background(), func(s transaction.Store) error { - return s.IndexStore().Delete(&reserve.EpochItem{}) - }) - if err != nil { - return 0, 0, err - } - - itemC := make(chan *reserve.BatchRadiusItemV1) - - errC := make(chan error, 1) - doneC := make(chan any) - defer close(doneC) - defer close(errC) - - var eg errgroup.Group - eg.SetLimit(runtime.NumCPU()) - - var doneCount atomic.Int64 - var seenCount int64 - - go func() { - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() - for { - select { - case <-ticker.C: - logger.Info("still migrating items...") - case <-doneC: - return - } - } - }() - - go func() { - _ = st.IndexStore().Iterate(storage.Query{ - Factory: func() storage.Item { return new(reserve.BatchRadiusItemV1) }, - }, func(result storage.Result) (bool, error) { - seenCount++ - item := result.Entry.(*reserve.BatchRadiusItemV1) - select { - case itemC <- item: - case err := <-errC: - return true, err - } - return false, nil - }) - close(itemC) - }() - - for item := range itemC { - batchRadiusItemV1 := item - eg.Go(func() error { - err := st.Run(context.Background(), func(s transaction.Store) error { - idxStore := s.IndexStore() - stamp, err := chunkstamp.LoadWithBatchID(idxStore, "reserve", batchRadiusItemV1.Address, batchRadiusItemV1.BatchID) - if err != nil { - return err - } - stampHash, err := stamp.Hash() - if err != nil { - return err - } - - // Since the ID format has changed, we should delete the old item and put a new one with the new ID format. - err = idxStore.Delete(batchRadiusItemV1) - if err != nil { - return err - } - err = idxStore.Put(&reserve.BatchRadiusItem{ - StampHash: stampHash, - Bin: batchRadiusItemV1.Bin, - BatchID: batchRadiusItemV1.BatchID, - Address: batchRadiusItemV1.Address, - BinID: batchRadiusItemV1.BinID, - }) - if err != nil { - return err - } - - chunkBinItemV1 := &reserve.ChunkBinItemV1{ - Bin: batchRadiusItemV1.Bin, - BinID: batchRadiusItemV1.BinID, - } - err = idxStore.Get(chunkBinItemV1) - if err != nil { - return err - } - - // same id. Will replace. - err = idxStore.Put(&reserve.ChunkBinItem{ - StampHash: stampHash, - Bin: chunkBinItemV1.Bin, - BinID: chunkBinItemV1.BinID, - Address: chunkBinItemV1.Address, - BatchID: chunkBinItemV1.BatchID, - ChunkType: chunkBinItemV1.ChunkType, - }) - if err != nil { - return err - } - - // same id. Will replace. - stampIndexItem := &stampindex.Item{ - StampHash: stampHash, - BatchID: chunkBinItemV1.BatchID, - StampIndex: stamp.Index(), - StampTimestamp: stamp.Timestamp(), - ChunkAddress: chunkBinItemV1.Address, - } - stampIndexItem.SetScope([]byte("reserve")) - err = idxStore.Put(stampIndexItem) - if err != nil { - return err - } - doneCount.Add(1) - return nil - }) - if err != nil { - errC <- err - return err - } - return nil - }) - } - - err = eg.Wait() - if err != nil { - return 0, 0, err - } - - postBatchRadiusCnt, err := st.IndexStore().Count(&reserve.BatchRadiusItem{}) - if err != nil { - return 0, 0, err - } - - postChunkBinCnt, err := st.IndexStore().Count(&reserve.ChunkBinItem{}) - if err != nil { - return 0, 0, err - } - - if postBatchRadiusCnt != postChunkBinCnt || preBatchRadiusCnt != postBatchRadiusCnt || preChunkBinCnt != postChunkBinCnt { - return 0, 0, fmt.Errorf("post-migration check: index counts do not match, %d vs %d. It's recommended that the nuke cmd is run to reset the node", postBatchRadiusCnt, postChunkBinCnt) - } - - err = st.IndexStore().Iterate(storage.Query{ - Factory: func() storage.Item { return new(reserve.ChunkBinItem) }, - }, func(result storage.Result) (bool, error) { - item := result.Entry.(*reserve.ChunkBinItem) - - batchRadiusItem := &reserve.BatchRadiusItem{BatchID: item.BatchID, Bin: item.Bin, Address: item.Address, StampHash: item.StampHash} - if err := st.IndexStore().Get(batchRadiusItem); err != nil { - return false, fmt.Errorf("batch radius item get: %w", err) - } - - stamp, err := chunkstamp.LoadWithBatchID(st.IndexStore(), "reserve", item.Address, item.BatchID) - if err != nil { - return false, fmt.Errorf("stamp item get: %w", err) - } - - stampIndex, err := stampindex.Load(st.IndexStore(), "reserve", stamp) - if err != nil { - return false, fmt.Errorf("stamp index get: %w", err) - } - - if !bytes.Equal(item.StampHash, batchRadiusItem.StampHash) { - return false, fmt.Errorf("batch radius item stamp hash, %x vs %x", item.StampHash, batchRadiusItem.StampHash) - } - - if !bytes.Equal(item.StampHash, stampIndex.StampHash) { - return false, fmt.Errorf("stamp index item stamp hash, %x vs %x", item.StampHash, stampIndex.StampHash) - } - - return false, nil - }) - - if err != nil { - return 0, 0, errors.New("post-migration check: items fields not match. It's recommended that the nuke cmd is run to reset the node") - } - - return seenCount, doneCount.Load(), nil -} diff --git a/pkg/storer/migration/step_06_test.go b/pkg/storer/migration/step_06_test.go deleted file mode 100644 index b5a5d3ecc7a..00000000000 --- a/pkg/storer/migration/step_06_test.go +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright 2024 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package migration_test - -import ( - "context" - "testing" - - "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/sharky" - "github.com/ethersphere/bee/v2/pkg/storage" - "github.com/ethersphere/bee/v2/pkg/storage/leveldbstore" - chunktest "github.com/ethersphere/bee/v2/pkg/storage/testing" - "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstamp" - "github.com/ethersphere/bee/v2/pkg/storer/internal/reserve" - "github.com/ethersphere/bee/v2/pkg/storer/internal/stampindex" - "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" - localmigration "github.com/ethersphere/bee/v2/pkg/storer/migration" - "github.com/ethersphere/bee/v2/pkg/swarm" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -type oldAndNewItem[K storage.Item, V storage.Item] struct { - old K - new V -} - -func Test_Step_06(t *testing.T) { - t.Parallel() - - sharkyDir := t.TempDir() - sharkyStore, err := sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize) - require.NoError(t, err) - - lstore, err := leveldbstore.New("", nil) - require.NoError(t, err) - - store := transaction.NewStorage(sharkyStore, lstore) - t.Cleanup(func() { - err := store.Close() - require.NoError(t, err) - }) - - chunks := chunktest.GenerateTestRandomChunks(100) - ctx := context.Background() - - batchRadiusItems := make(map[string]oldAndNewItem[*reserve.BatchRadiusItemV1, *reserve.BatchRadiusItem]) - chunkBinItems := make(map[string]oldAndNewItem[*reserve.ChunkBinItemV1, *reserve.ChunkBinItem]) - stampIndexItems := make(map[string]oldAndNewItem[*stampindex.ItemV1, *stampindex.Item]) - - for i, ch := range chunks { - err = store.Run(ctx, func(s transaction.Store) error { - b := &reserve.BatchRadiusItemV1{ - Bin: uint8(i), - BatchID: ch.Stamp().BatchID(), - Address: ch.Address(), - BinID: uint64(i), - } - err := s.IndexStore().Put(b) - if err != nil { - return err - } - batchRadiusItems[string(b.BatchID)+string(b.Bin)+b.Address.ByteString()] = oldAndNewItem[*reserve.BatchRadiusItemV1, *reserve.BatchRadiusItem]{old: b, new: nil} - - c := &reserve.ChunkBinItemV1{ - Bin: uint8(i), - BinID: uint64(i), - Address: ch.Address(), - BatchID: ch.Stamp().BatchID(), - ChunkType: swarm.ChunkTypeSingleOwner, - } - err = s.IndexStore().Put(c) - if err != nil { - return err - } - chunkBinItems[c.ID()] = oldAndNewItem[*reserve.ChunkBinItemV1, *reserve.ChunkBinItem]{old: c, new: nil} - - sIdxItem := &stampindex.ItemV1{ - BatchID: ch.Stamp().BatchID(), - StampIndex: ch.Stamp().Index(), - StampTimestamp: ch.Stamp().Timestamp(), - ChunkAddress: ch.Address(), - ChunkIsImmutable: true, - } - sIdxItem.SetNamespace([]byte("reserve")) - err = s.IndexStore().Put(sIdxItem) - if err != nil { - return err - } - - stampIndexItems[sIdxItem.ID()] = oldAndNewItem[*stampindex.ItemV1, *stampindex.Item]{old: sIdxItem, new: nil} - - return chunkstamp.Store(s.IndexStore(), "reserve", ch) - }) - require.NoError(t, err) - } - - require.NoError(t, err) - err = localmigration.Step_06(store, log.Noop)() - require.NoError(t, err) - - has, err := store.IndexStore().Has(&reserve.EpochItem{}) - if has { - t.Fatal("epoch item should be deleted") - } - require.NoError(t, err) - - checkBatchRadiusItems(t, store.IndexStore(), len(chunks), batchRadiusItems) - checkChunkBinItems(t, store.IndexStore(), len(chunks), chunkBinItems) - checkStampIndex(t, store.IndexStore(), len(chunks), stampIndexItems) -} - -func checkBatchRadiusItems(t *testing.T, s storage.Reader, wantCount int, m map[string]oldAndNewItem[*reserve.BatchRadiusItemV1, *reserve.BatchRadiusItem]) { - t.Helper() - count := 0 - - err := s.Iterate(storage.Query{ - Factory: func() storage.Item { return new(reserve.BatchRadiusItem) }, - }, func(result storage.Result) (bool, error) { - count++ - b := result.Entry.(*reserve.BatchRadiusItem) - id := string(b.BatchID) + string(b.Bin) + b.Address.ByteString() - found, ok := m[id] - require.True(t, ok) - found.new = b - m[id] = found - return false, nil - }) - require.NoError(t, err) - assert.Equal(t, wantCount, count) - - for _, v := range m { - assert.Equal(t, v.old.Bin, v.new.Bin) - assert.Equal(t, v.old.BatchID, v.new.BatchID) - assert.Equal(t, v.old.Address, v.new.Address) - assert.Equal(t, v.old.BinID, v.new.BinID) - assert.NotEqual(t, swarm.EmptyAddress.Bytes(), v.new.StampHash) - } -} - -func checkChunkBinItems(t *testing.T, s storage.Reader, wantCount int, m map[string]oldAndNewItem[*reserve.ChunkBinItemV1, *reserve.ChunkBinItem]) { - t.Helper() - count := 0 - err := s.Iterate(storage.Query{ - Factory: func() storage.Item { return new(reserve.ChunkBinItem) }, - }, func(result storage.Result) (bool, error) { - count++ - b := result.Entry.(*reserve.ChunkBinItem) - found, ok := m[b.ID()] - require.True(t, ok) - found.new = b - m[b.ID()] = found - return false, nil - }) - require.NoError(t, err) - assert.Equal(t, wantCount, count) - for _, v := range m { - assert.Equal(t, v.old.Bin, v.new.Bin) - assert.Equal(t, v.old.BatchID, v.new.BatchID) - assert.Equal(t, v.old.Address, v.new.Address) - assert.Equal(t, v.old.BinID, v.new.BinID) - assert.Equal(t, v.old.ChunkType, v.new.ChunkType) - assert.NotEqual(t, swarm.EmptyAddress.Bytes(), v.new.StampHash) - } -} - -func checkStampIndex(t *testing.T, s storage.Reader, wantCount int, m map[string]oldAndNewItem[*stampindex.ItemV1, *stampindex.Item]) { - t.Helper() - count := 0 - err := s.Iterate(storage.Query{ - Factory: func() storage.Item { return new(stampindex.Item) }, - }, func(result storage.Result) (bool, error) { - count++ - b := result.Entry.(*stampindex.Item) - found, ok := m[b.ID()] - require.True(t, ok) - found.new = b - m[b.ID()] = found - return false, nil - }) - require.NoError(t, err) - assert.Equal(t, wantCount, count) - for _, v := range m { - assert.Equal(t, v.old.Namespace(), v.new.Namespace()) - assert.Equal(t, v.old.BatchID, v.new.BatchID) - assert.Equal(t, v.old.StampIndex, v.new.StampIndex) - assert.Equal(t, v.old.StampTimestamp, v.new.StampTimestamp) - assert.Equal(t, v.old.ChunkAddress, v.new.ChunkAddress) - assert.NotEqual(t, swarm.EmptyAddress.Bytes(), v.new.StampHash) - } -} From 2ee43af44e048a342ef3c13d9cfc31a676403aef Mon Sep 17 00:00:00 2001 From: sbackend Date: Tue, 31 Mar 2026 11:02:27 +0200 Subject: [PATCH 4/5] chore: remove underlayListPrefix prefix --- pkg/bzz/address_test.go | 2 +- pkg/bzz/export_test.go | 7 ------ pkg/bzz/underlay.go | 14 ++--------- pkg/bzz/underlay_test.go | 54 ++++++++-------------------------------- 4 files changed, 13 insertions(+), 64 deletions(-) delete mode 100644 pkg/bzz/export_test.go diff --git a/pkg/bzz/address_test.go b/pkg/bzz/address_test.go index 8d038010e25..799d4a222f5 100644 --- a/pkg/bzz/address_test.go +++ b/pkg/bzz/address_test.go @@ -40,7 +40,7 @@ func TestBzzAddress(t *testing.T) { t.Fatal(err) } - bzzAddress2, err := bzz.ParseAddress(node1ma.Bytes(), overlay.Bytes(), bzzAddress.Signature, nonce, true, 3) + bzzAddress2, err := bzz.ParseAddress(bzz.SerializeUnderlays([]multiaddr.Multiaddr{node1ma}), overlay.Bytes(), bzzAddress.Signature, nonce, true, 3) if err != nil { t.Fatal(err) } diff --git a/pkg/bzz/export_test.go b/pkg/bzz/export_test.go deleted file mode 100644 index 531121edc4f..00000000000 --- a/pkg/bzz/export_test.go +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright 2025 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package bzz - -const UnderlayListPrefix = underlayListPrefix diff --git a/pkg/bzz/underlay.go b/pkg/bzz/underlay.go index 38e83c2a4a8..2602a81339f 100644 --- a/pkg/bzz/underlay.go +++ b/pkg/bzz/underlay.go @@ -13,19 +13,11 @@ import ( "github.com/multiformats/go-varint" ) -// underlayListPrefix is a magic byte designated for identifying a serialized list of multiaddrs. -// A value of 0x99 (153) was chosen as it is not a defined multiaddr protocol code. -// This ensures that a failure is triggered by the original multiaddr.NewMultiaddrBytes function, -// which expects a valid protocol code at the start of the data. -const underlayListPrefix byte = 0x99 - // SerializeUnderlays serializes a slice of multiaddrs into a single byte slice. func SerializeUnderlays(addrs []multiaddr.Multiaddr) []byte { // For 0 or 2+ addresses, the custom list format with the prefix is used. - // The format is: [prefix_byte][varint_len_1][addr_1_bytes]... + // The format is: [varint_len_1][addr_1_bytes]... var buf bytes.Buffer - buf.WriteByte(underlayListPrefix) - for _, addr := range addrs { addrBytes := addr.Bytes() buf.Write(varint.ToUvarint(uint64(len(addrBytes)))) @@ -35,13 +27,11 @@ func SerializeUnderlays(addrs []multiaddr.Multiaddr) []byte { } // DeserializeUnderlays deserializes a byte slice into a slice of multiaddrs. -// The data format is automatically detected as either a single legacy multiaddr -// or a list of multiaddrs (identified by underlayListPrefix), and is parsed accordingly. func DeserializeUnderlays(data []byte) ([]multiaddr.Multiaddr, error) { if len(data) == 0 { return nil, errors.New("cannot deserialize empty byte slice") } - return deserializeList(data[1:]) + return deserializeList(data) } // deserializeList handles the parsing of the custom list format. diff --git a/pkg/bzz/underlay_test.go b/pkg/bzz/underlay_test.go index 7caaf6fd64a..0a18f1789fa 100644 --- a/pkg/bzz/underlay_test.go +++ b/pkg/bzz/underlay_test.go @@ -15,35 +15,19 @@ import ( ) func TestSerializeUnderlays(t *testing.T) { - ip4TCPAddr := mustNewMultiaddr(t, "/ip4/127.0.0.1/tcp/80/p2p/QmWqeeHEqG2db37JsuKUxyJ2JF8LtVJMGohKVT8h3aeCVH") - p2pAddr := mustNewMultiaddr(t, "/ip4/65.108.66.216/tcp/16341/p2p/QmVuCJ3M96c7vwv4MQBv7WY1HWQacyCEHvM99R8MUDj95d") - wssAddr := mustNewMultiaddr(t, "/ip4/127.0.0.1/tcp/443/wss/p2p/QmWqeeHEqG2db37JsuKUxyJ2JF8LtVJMGohKVT8h3aeCVH") - dnsSwarmAddr := mustNewMultiaddr(t, "/dnsaddr/mainnet.ethswarm.org") - - t.Run("multiple addresses list", func(t *testing.T) { - addrs := []multiaddr.Multiaddr{ip4TCPAddr, p2pAddr, wssAddr, dnsSwarmAddr} - serialized := bzz.SerializeUnderlays(addrs) - - if serialized[0] != bzz.UnderlayListPrefix { - t.Errorf("expected prefix %x for multiple addresses, got %x", bzz.UnderlayListPrefix, serialized[0]) - } - }) - t.Run("empty list", func(t *testing.T) { addrs := []multiaddr.Multiaddr{} serialized := bzz.SerializeUnderlays(addrs) - expected := []byte{bzz.UnderlayListPrefix} - if !bytes.Equal(serialized, expected) { - t.Errorf("expected %x for empty list, got %x", expected, serialized) + if len(serialized) != 0 { + t.Errorf("expected empty serialized addresses") } }) t.Run("nil list", func(t *testing.T) { var addrs []multiaddr.Multiaddr = nil serialized := bzz.SerializeUnderlays(addrs) - expected := []byte{bzz.UnderlayListPrefix} - if !bytes.Equal(serialized, expected) { - t.Errorf("expected %x for nil list, got %x", expected, serialized) + if len(serialized) != 0 { + t.Errorf("expected empty serialized addresses") } }) } @@ -72,25 +56,12 @@ func TestDeserializeUnderlays(t *testing.T) { } }) - t.Run("list with only prefix", func(t *testing.T) { - deserialized, err := bzz.DeserializeUnderlays([]byte{bzz.UnderlayListPrefix}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if len(deserialized) != 0 { - t.Errorf("expected empty slice, got %v", deserialized) - } - }) - t.Run("serialize deserialize empty list", func(t *testing.T) { addrs := []multiaddr.Multiaddr{} serialized := bzz.SerializeUnderlays(addrs) - if !bytes.Equal(serialized, []byte{bzz.UnderlayListPrefix}) { - t.Errorf("expected %v, got %v", []byte{bzz.UnderlayListPrefix}, serialized) - } deserialized, err := bzz.DeserializeUnderlays(serialized) - if err != nil { - t.Fatalf("unexpected error: %v", err) + if err == nil { + t.Fatalf("expected error") } if len(deserialized) != 0 { t.Errorf("expected empty slice, got %v", deserialized) @@ -99,12 +70,9 @@ func TestDeserializeUnderlays(t *testing.T) { t.Run("serialize deserialize nil list", func(t *testing.T) { serialized := bzz.SerializeUnderlays(nil) - if !bytes.Equal(serialized, []byte{bzz.UnderlayListPrefix}) { - t.Errorf("expected %v, got %v", []byte{bzz.UnderlayListPrefix}, serialized) - } deserialized, err := bzz.DeserializeUnderlays(serialized) - if err != nil { - t.Fatalf("unexpected error: %v", err) + if err == nil { + t.Fatalf("expected error") } if len(deserialized) != 0 { t.Errorf("expected empty slice, got %v", deserialized) @@ -114,7 +82,6 @@ func TestDeserializeUnderlays(t *testing.T) { t.Run("corrupted list - length too long", func(t *testing.T) { maBytes := ip4TCPAddr.Bytes() var buf bytes.Buffer - buf.WriteByte(bzz.UnderlayListPrefix) buf.Write(varint.ToUvarint(uint64(len(maBytes) + 5))) // Write a length that is too long buf.Write(maBytes) @@ -127,7 +94,6 @@ func TestDeserializeUnderlays(t *testing.T) { t.Run("corrupted list - invalid multiaddr bytes", func(t *testing.T) { invalidAddrBytes := []byte{0xde, 0xad, 0xbe, 0xef} var buf bytes.Buffer - buf.WriteByte(bzz.UnderlayListPrefix) buf.Write(varint.ToUvarint(uint64(len(invalidAddrBytes)))) buf.Write(invalidAddrBytes) @@ -172,8 +138,8 @@ func TestSerializeUnderlaysDeserializeUnderlays(t *testing.T) { addrs := []multiaddr.Multiaddr{} serialized := bzz.SerializeUnderlays(addrs) deserialized, err := bzz.DeserializeUnderlays(serialized) - if err != nil { - t.Fatalf("unexpected error: %v", err) + if err == nil { + t.Fatalf("expected error") } if len(deserialized) != 0 { t.Errorf("expected empty slice from round trip, got %v", deserialized) From 68613ec4db0e31d0337665f2af793510047e880d Mon Sep 17 00:00:00 2001 From: sbackend Date: Wed, 1 Apr 2026 14:25:19 +0200 Subject: [PATCH 5/5] fix: restore underlay prefix and tests for backward compatibility with 2.7.0 version --- pkg/bzz/export_test.go | 7 +++ pkg/bzz/underlay.go | 22 +++++++- pkg/bzz/underlay_test.go | 54 +++++++++++++++---- .../libp2p/internal/handshake/handshake.go | 30 ++++++++++- 4 files changed, 99 insertions(+), 14 deletions(-) create mode 100644 pkg/bzz/export_test.go diff --git a/pkg/bzz/export_test.go b/pkg/bzz/export_test.go new file mode 100644 index 00000000000..531121edc4f --- /dev/null +++ b/pkg/bzz/export_test.go @@ -0,0 +1,7 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package bzz + +const UnderlayListPrefix = underlayListPrefix diff --git a/pkg/bzz/underlay.go b/pkg/bzz/underlay.go index 2602a81339f..1d4dc2e3bda 100644 --- a/pkg/bzz/underlay.go +++ b/pkg/bzz/underlay.go @@ -13,11 +13,19 @@ import ( "github.com/multiformats/go-varint" ) +// underlayListPrefix is a magic byte designated for identifying a serialized list of multiaddrs. +// A value of 0x99 (153) was chosen as it is not a defined multiaddr protocol code. +// This ensures that a failure is triggered by the original multiaddr.NewMultiaddrBytes function, +// which expects a valid protocol code at the start of the data. +const underlayListPrefix byte = 0x99 + // SerializeUnderlays serializes a slice of multiaddrs into a single byte slice. func SerializeUnderlays(addrs []multiaddr.Multiaddr) []byte { // For 0 or 2+ addresses, the custom list format with the prefix is used. - // The format is: [varint_len_1][addr_1_bytes]... + // The format is: [prefix_byte][varint_len_1][addr_1_bytes]... var buf bytes.Buffer + buf.WriteByte(underlayListPrefix) + for _, addr := range addrs { addrBytes := addr.Bytes() buf.Write(varint.ToUvarint(uint64(len(addrBytes)))) @@ -31,7 +39,17 @@ func DeserializeUnderlays(data []byte) ([]multiaddr.Multiaddr, error) { if len(data) == 0 { return nil, errors.New("cannot deserialize empty byte slice") } - return deserializeList(data) + if data[0] == underlayListPrefix { + return deserializeList(data[1:]) + } + + // Otherwise, the data is handled as a single, backward-compatible multiaddr. + addr, err := multiaddr.NewMultiaddrBytes(data) + if err != nil { + return nil, fmt.Errorf("failed to parse as single multiaddr: %w", err) + } + // The result is returned as a single-element slice for a consistent return type. + return []multiaddr.Multiaddr{addr}, nil } // deserializeList handles the parsing of the custom list format. diff --git a/pkg/bzz/underlay_test.go b/pkg/bzz/underlay_test.go index 0a18f1789fa..7caaf6fd64a 100644 --- a/pkg/bzz/underlay_test.go +++ b/pkg/bzz/underlay_test.go @@ -15,19 +15,35 @@ import ( ) func TestSerializeUnderlays(t *testing.T) { + ip4TCPAddr := mustNewMultiaddr(t, "/ip4/127.0.0.1/tcp/80/p2p/QmWqeeHEqG2db37JsuKUxyJ2JF8LtVJMGohKVT8h3aeCVH") + p2pAddr := mustNewMultiaddr(t, "/ip4/65.108.66.216/tcp/16341/p2p/QmVuCJ3M96c7vwv4MQBv7WY1HWQacyCEHvM99R8MUDj95d") + wssAddr := mustNewMultiaddr(t, "/ip4/127.0.0.1/tcp/443/wss/p2p/QmWqeeHEqG2db37JsuKUxyJ2JF8LtVJMGohKVT8h3aeCVH") + dnsSwarmAddr := mustNewMultiaddr(t, "/dnsaddr/mainnet.ethswarm.org") + + t.Run("multiple addresses list", func(t *testing.T) { + addrs := []multiaddr.Multiaddr{ip4TCPAddr, p2pAddr, wssAddr, dnsSwarmAddr} + serialized := bzz.SerializeUnderlays(addrs) + + if serialized[0] != bzz.UnderlayListPrefix { + t.Errorf("expected prefix %x for multiple addresses, got %x", bzz.UnderlayListPrefix, serialized[0]) + } + }) + t.Run("empty list", func(t *testing.T) { addrs := []multiaddr.Multiaddr{} serialized := bzz.SerializeUnderlays(addrs) - if len(serialized) != 0 { - t.Errorf("expected empty serialized addresses") + expected := []byte{bzz.UnderlayListPrefix} + if !bytes.Equal(serialized, expected) { + t.Errorf("expected %x for empty list, got %x", expected, serialized) } }) t.Run("nil list", func(t *testing.T) { var addrs []multiaddr.Multiaddr = nil serialized := bzz.SerializeUnderlays(addrs) - if len(serialized) != 0 { - t.Errorf("expected empty serialized addresses") + expected := []byte{bzz.UnderlayListPrefix} + if !bytes.Equal(serialized, expected) { + t.Errorf("expected %x for nil list, got %x", expected, serialized) } }) } @@ -56,12 +72,25 @@ func TestDeserializeUnderlays(t *testing.T) { } }) + t.Run("list with only prefix", func(t *testing.T) { + deserialized, err := bzz.DeserializeUnderlays([]byte{bzz.UnderlayListPrefix}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(deserialized) != 0 { + t.Errorf("expected empty slice, got %v", deserialized) + } + }) + t.Run("serialize deserialize empty list", func(t *testing.T) { addrs := []multiaddr.Multiaddr{} serialized := bzz.SerializeUnderlays(addrs) + if !bytes.Equal(serialized, []byte{bzz.UnderlayListPrefix}) { + t.Errorf("expected %v, got %v", []byte{bzz.UnderlayListPrefix}, serialized) + } deserialized, err := bzz.DeserializeUnderlays(serialized) - if err == nil { - t.Fatalf("expected error") + if err != nil { + t.Fatalf("unexpected error: %v", err) } if len(deserialized) != 0 { t.Errorf("expected empty slice, got %v", deserialized) @@ -70,9 +99,12 @@ func TestDeserializeUnderlays(t *testing.T) { t.Run("serialize deserialize nil list", func(t *testing.T) { serialized := bzz.SerializeUnderlays(nil) + if !bytes.Equal(serialized, []byte{bzz.UnderlayListPrefix}) { + t.Errorf("expected %v, got %v", []byte{bzz.UnderlayListPrefix}, serialized) + } deserialized, err := bzz.DeserializeUnderlays(serialized) - if err == nil { - t.Fatalf("expected error") + if err != nil { + t.Fatalf("unexpected error: %v", err) } if len(deserialized) != 0 { t.Errorf("expected empty slice, got %v", deserialized) @@ -82,6 +114,7 @@ func TestDeserializeUnderlays(t *testing.T) { t.Run("corrupted list - length too long", func(t *testing.T) { maBytes := ip4TCPAddr.Bytes() var buf bytes.Buffer + buf.WriteByte(bzz.UnderlayListPrefix) buf.Write(varint.ToUvarint(uint64(len(maBytes) + 5))) // Write a length that is too long buf.Write(maBytes) @@ -94,6 +127,7 @@ func TestDeserializeUnderlays(t *testing.T) { t.Run("corrupted list - invalid multiaddr bytes", func(t *testing.T) { invalidAddrBytes := []byte{0xde, 0xad, 0xbe, 0xef} var buf bytes.Buffer + buf.WriteByte(bzz.UnderlayListPrefix) buf.Write(varint.ToUvarint(uint64(len(invalidAddrBytes)))) buf.Write(invalidAddrBytes) @@ -138,8 +172,8 @@ func TestSerializeUnderlaysDeserializeUnderlays(t *testing.T) { addrs := []multiaddr.Multiaddr{} serialized := bzz.SerializeUnderlays(addrs) deserialized, err := bzz.DeserializeUnderlays(serialized) - if err == nil { - t.Fatalf("expected error") + if err != nil { + t.Fatalf("unexpected error: %v", err) } if len(deserialized) != 0 { t.Errorf("expected empty slice from round trip, got %v", deserialized) diff --git a/pkg/p2p/libp2p/internal/handshake/handshake.go b/pkg/p2p/libp2p/internal/handshake/handshake.go index 32ab4b38908..8d599d65c9c 100644 --- a/pkg/p2p/libp2p/internal/handshake/handshake.go +++ b/pkg/p2p/libp2p/internal/handshake/handshake.go @@ -137,9 +137,11 @@ func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiadd defer cancel() w, r := protobuf.NewWriterAndReader(stream) + synUnderlay := bzz.SerializeUnderlays(peerMultiaddrs) + s.logger.Debug("handshake outbound syn underlay", "payload_len", len(synUnderlay), "first_byte", firstByteString(synUnderlay), "payload_prefix", payloadPrefix(synUnderlay)) if err := w.WriteMsgWithContext(ctx, &pb.Syn{ - ObservedUnderlay: bzz.SerializeUnderlays(peerMultiaddrs), + ObservedUnderlay: synUnderlay, }); err != nil { return nil, fmt.Errorf("write syn message: %w", err) } @@ -151,6 +153,7 @@ func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiadd observedUnderlays, err := bzz.DeserializeUnderlays(resp.Syn.ObservedUnderlay) if err != nil { + s.logger.Debug("handshake invalid synack observed underlay payload", "payload_len", len(resp.Syn.ObservedUnderlay), "first_byte", firstByteString(resp.Syn.ObservedUnderlay), "payload_prefix", payloadPrefix(resp.Syn.ObservedUnderlay), "error", err) return nil, ErrInvalidSyn } @@ -252,6 +255,7 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, peerMultiaddrs observedUnderlays, err := bzz.DeserializeUnderlays(syn.ObservedUnderlay) if err != nil { + s.logger.Debug("handshake invalid inbound syn observed underlay payload", "payload_len", len(syn.ObservedUnderlay), "first_byte", firstByteString(syn.ObservedUnderlay), "payload_prefix", payloadPrefix(syn.ObservedUnderlay), "error", err) return nil, ErrInvalidSyn } @@ -289,9 +293,12 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, peerMultiaddrs welcomeMessage := s.GetWelcomeMessage() + synAckObservedUnderlay := bzz.SerializeUnderlays(peerMultiaddrs) + s.logger.Debug("handshake outbound synack observed underlay", "payload_len", len(synAckObservedUnderlay), "first_byte", firstByteString(synAckObservedUnderlay), "payload_prefix", payloadPrefix(synAckObservedUnderlay)) + if err := w.WriteMsgWithContext(ctx, &pb.SynAck{ Syn: &pb.Syn{ - ObservedUnderlay: bzz.SerializeUnderlays(peerMultiaddrs), + ObservedUnderlay: synAckObservedUnderlay, }, Ack: &pb.Ack{ Address: &pb.BzzAddress{ @@ -366,8 +373,27 @@ func (s *Service) GetWelcomeMessage() string { func (s *Service) parseCheckAck(ack *pb.Ack) (*bzz.Address, error) { bzzAddress, err := bzz.ParseAddress(ack.Address.Underlay, ack.Address.Overlay, ack.Address.Signature, ack.Nonce, s.validateOverlay, s.networkID) if err != nil { + s.logger.Debug("handshake invalid ack address payload", "underlay_len", len(ack.Address.Underlay), "underlay_first_byte", firstByteString(ack.Address.Underlay), "underlay_prefix", payloadPrefix(ack.Address.Underlay), "overlay_len", len(ack.Address.Overlay), "error", err) return nil, ErrInvalidAck } return bzzAddress, nil } + +func firstByteString(data []byte) string { + if len(data) == 0 { + return "none" + } + return fmt.Sprintf("0x%02x", data[0]) +} + +func payloadPrefix(data []byte) string { + if len(data) == 0 { + return "" + } + n := 16 + if len(data) < n { + n = len(data) + } + return fmt.Sprintf("%x", data[:n]) +}