From ad4877803c2ac25492893f7b92e4cddfcc610c47 Mon Sep 17 00:00:00 2001 From: sbackend Date: Thu, 12 Mar 2026 15:02:02 +0100 Subject: [PATCH 1/8] feat: rpc calls optimisation Added cach layer Remove redundant calls. --- cmd/bee/cmd/deploy.go | 1 + go.mod | 2 + pkg/node/chain.go | 3 +- pkg/node/node.go | 1 + pkg/transaction/monitor.go | 10 +- pkg/transaction/transaction.go | 25 ++- pkg/transaction/wrapped/cache/cache.go | 91 ++++++++++ pkg/transaction/wrapped/cache/cache_test.go | 185 ++++++++++++++++++++ pkg/transaction/wrapped/cache/keys.go | 7 + pkg/transaction/wrapped/cache/metrics.go | 96 ++++++++++ pkg/transaction/wrapped/wrapped.go | 29 ++- 11 files changed, 421 insertions(+), 29 deletions(-) create mode 100644 pkg/transaction/wrapped/cache/cache.go create mode 100644 pkg/transaction/wrapped/cache/cache_test.go create mode 100644 pkg/transaction/wrapped/cache/keys.go create mode 100644 pkg/transaction/wrapped/cache/metrics.go diff --git a/cmd/bee/cmd/deploy.go b/cmd/bee/cmd/deploy.go index c0e20cb63e7..c8b3a0bcd38 100644 --- a/cmd/bee/cmd/deploy.go +++ b/cmd/bee/cmd/deploy.go @@ -60,6 +60,7 @@ func (c *command) initDeployCmd() error { blocktime, true, c.config.GetUint64(optionNameMinimumGasTipCap), + blocktime-2, ) if err != nil { return err diff --git a/go.mod b/go.mod index cf602914109..e6f1b81cc92 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,8 @@ require ( resenje.org/web v0.4.3 ) +require github.com/kylelemons/godebug v1.1.0 // indirect + require ( github.com/BurntSushi/toml v1.3.2 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect diff --git a/pkg/node/chain.go b/pkg/node/chain.go index 78fc0a5448d..9cd3c60574e 100644 --- a/pkg/node/chain.go +++ b/pkg/node/chain.go @@ -51,6 +51,7 @@ func InitChain( pollingInterval time.Duration, chainEnabled bool, minimumGasTipCap uint64, + blockCacheTTl time.Duration, ) (transaction.Backend, common.Address, int64, transaction.Monitor, transaction.Service, error) { backend := backendnoop.New(chainID) @@ -72,7 +73,7 @@ func InitChain( logger.Info("connected to blockchain backend", "version", versionString) - backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap) + backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, blockCacheTTl) } backendChainID, err := backend.ChainID(ctx) diff --git a/pkg/node/node.go b/pkg/node/node.go index 50ed1a44b82..e75054a8f77 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -409,6 +409,7 @@ func NewBee( o.BlockTime, chainEnabled, o.MinimumGasTipCap, + o.BlockTime*85/100, ) if err != nil { return nil, fmt.Errorf("init chain: %w", err) diff --git a/pkg/transaction/monitor.go b/pkg/transaction/monitor.go index bbd4a50ec58..daaa1404441 100644 --- a/pkg/transaction/monitor.go +++ b/pkg/transaction/monitor.go @@ -214,16 +214,16 @@ func (tm *transactionMonitor) checkPending(block uint64) error { } } + oldNonce, err := tm.backend.NonceAt(tm.ctx, tm.sender, new(big.Int).SetUint64(block-tm.cancellationDepth)) + if err != nil { + return err + } + for nonceGroup := range tm.watchesByNonce { if _, ok := confirmedNonces[nonceGroup]; ok { continue } - oldNonce, err := tm.backend.NonceAt(tm.ctx, tm.sender, new(big.Int).SetUint64(block-tm.cancellationDepth)) - if err != nil { - return err - } - if nonceGroup < oldNonce { cancelledNonces = append(cancelledNonces, nonceGroup) } diff --git a/pkg/transaction/transaction.go b/pkg/transaction/transaction.go index cc88e7a3ab4..da2b2746fd5 100644 --- a/pkg/transaction/transaction.go +++ b/pkg/transaction/transaction.go @@ -154,9 +154,9 @@ func (t *transactionService) waitForAllPendingTx() error { return err } - pendingTxs = t.filterPendingTransactions(t.ctx, pendingTxs) + pending := t.filterPendingTransactions(t.ctx, pendingTxs) - for _, txHash := range pendingTxs { + for txHash := range pending { t.waitForPendingTx(txHash) } @@ -339,19 +339,15 @@ func (t *transactionService) nextNonce(ctx context.Context) (uint64, error) { return 0, err } - pendingTxs = t.filterPendingTransactions(t.ctx, pendingTxs) + pending := t.filterPendingTransactions(t.ctx, pendingTxs) // PendingNonceAt returns the nonce we should use, but we will // compare this to our pending tx list, therefore the -1. maxNonce := onchainNonce - 1 - for _, txHash := range pendingTxs { - trx, _, err := t.backend.TransactionByHash(ctx, txHash) - if err != nil { - t.logger.Error(err, "pending transaction not found", "tx", txHash) - return 0, err + for _, trx := range pending { + if trx != nil { + maxNonce = max(maxNonce, trx.Nonce()) } - - maxNonce = max(maxNonce, trx.Nonce()) } return maxNonce + 1, nil @@ -404,11 +400,12 @@ func (t *transactionService) PendingTransactions() ([]common.Hash, error) { // filterPendingTransactions will filter supplied transaction hashes removing those that are not pending anymore. // Removed transactions will be also removed from store. -func (t *transactionService) filterPendingTransactions(ctx context.Context, txHashes []common.Hash) []common.Hash { - result := make([]common.Hash, 0, len(txHashes)) +// Returns the pending transactions keyed by hash. +func (t *transactionService) filterPendingTransactions(ctx context.Context, txHashes []common.Hash) map[common.Hash]*types.Transaction { + result := make(map[common.Hash]*types.Transaction, len(txHashes)) for _, txHash := range txHashes { - _, isPending, err := t.backend.TransactionByHash(ctx, txHash) + trx, isPending, err := t.backend.TransactionByHash(ctx, txHash) // When error occurres consider transaction as pending (so this transaction won't be filtered out), // unless it was not found if err != nil { @@ -422,7 +419,7 @@ func (t *transactionService) filterPendingTransactions(ctx context.Context, txHa } if isPending { - result = append(result, txHash) + result[txHash] = trx } else { err := t.store.Delete(pendingTransactionKey(txHash)) if err != nil { diff --git a/pkg/transaction/wrapped/cache/cache.go b/pkg/transaction/wrapped/cache/cache.go new file mode 100644 index 00000000000..008c6562ab5 --- /dev/null +++ b/pkg/transaction/wrapped/cache/cache.go @@ -0,0 +1,91 @@ +package cache + +import ( + "sync" + "time" + + "golang.org/x/sync/singleflight" +) + +type Loader[T any] func() (T, error) + +type ExpiringSingleFlightCache[T any] struct { + ttl time.Duration + + mu sync.RWMutex + value T + valid bool + expiresAt time.Time + + group singleflight.Group + key Key + metrics metricSet +} + +func NewExpiringSingleFlightCache[T any](ttl time.Duration, key Key) *ExpiringSingleFlightCache[T] { + return &ExpiringSingleFlightCache[T]{ + ttl: ttl, + key: key, + metrics: newMetricSet(string(key)), + } +} + +func (c *ExpiringSingleFlightCache[T]) Get(now time.Time) (T, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + if c.valid && now.Before(c.expiresAt) { + return c.value, true + } + + var zero T + return zero, false +} + +func (c *ExpiringSingleFlightCache[T]) Set(value T, now time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + + c.value = value + c.valid = true + c.expiresAt = now.Add(c.ttl) +} + +func (c *ExpiringSingleFlightCache[T]) Invalidate() { + c.mu.Lock() + defer c.mu.Unlock() + + c.valid = false + c.metrics.Invalidates.Inc() +} + +func (c *ExpiringSingleFlightCache[T]) GetOrLoad(now time.Time, loader Loader[T]) (T, error) { + if v, ok := c.Get(now); ok { + c.metrics.Hits.Inc() + return v, nil + } + + c.metrics.Misses.Inc() + + result, err, shared := c.group.Do(string(c.key), func() (any, error) { + c.metrics.Loads.Inc() + val, err := loader() + if err != nil { + c.metrics.LoadErrors.Inc() + return val, err + } + c.Set(val, time.Now()) + return val, nil + }) + + if shared { + c.metrics.SharedLoads.Inc() + } + + if err != nil { + var zero T + return zero, err + } + + return result.(T), nil +} diff --git a/pkg/transaction/wrapped/cache/cache_test.go b/pkg/transaction/wrapped/cache/cache_test.go new file mode 100644 index 00000000000..5de3e1700e8 --- /dev/null +++ b/pkg/transaction/wrapped/cache/cache_test.go @@ -0,0 +1,185 @@ +package cache + +import ( + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const testKey Key = "test" + +func newTestCache(ttl time.Duration) *ExpiringSingleFlightCache[uint64] { + return &ExpiringSingleFlightCache[uint64]{ + ttl: ttl, + key: testKey, + metrics: newMetricSet(string(testKey)), + } +} + +func TestSetGet(t *testing.T) { + t.Parallel() + + now := time.Now() + c := newTestCache(time.Second) + c.Set(42, now) + + val, ok := c.Get(now) + assert.True(t, ok) + assert.Equal(t, uint64(42), val) +} + +func TestTTLExpiry(t *testing.T) { + t.Parallel() + + now := time.Now() + c := newTestCache(time.Second) + c.Set(42, now) + + _, ok := c.Get(now.Add(time.Second + time.Millisecond)) + assert.False(t, ok) +} + +func TestInvalidate(t *testing.T) { + t.Parallel() + + now := time.Now() + c := newTestCache(5 * time.Second) + c.Set(42, now) + c.Invalidate() + + _, ok := c.Get(now) + assert.False(t, ok) +} + +func TestGetOrLoadMiss(t *testing.T) { + t.Parallel() + + c := newTestCache(time.Second) + var loadCount atomic.Int32 + + val, err := c.GetOrLoad(time.Now(), func() (uint64, error) { + loadCount.Add(1) + return 99, nil + }) + + require.NoError(t, err) + assert.Equal(t, uint64(99), val) + assert.Equal(t, int32(1), loadCount.Load()) + + cached, ok := c.Get(time.Now()) + assert.True(t, ok) + assert.Equal(t, uint64(99), cached) +} + +func TestGetOrLoadHit(t *testing.T) { + t.Parallel() + + now := time.Now() + c := newTestCache(time.Second) + c.Set(42, now) + + var loadCount atomic.Int32 + val, err := c.GetOrLoad(now, func() (uint64, error) { + loadCount.Add(1) + return 99, nil + }) + + require.NoError(t, err) + assert.Equal(t, uint64(42), val) + assert.Equal(t, int32(0), loadCount.Load()) +} + +func TestGetOrLoadError(t *testing.T) { + t.Parallel() + + c := newTestCache(time.Second) + errLoad := errors.New("load failed") + + val, err := c.GetOrLoad(time.Now(), func() (uint64, error) { + return 0, errLoad + }) + + assert.ErrorIs(t, err, errLoad) + assert.Equal(t, uint64(0), val) + + _, ok := c.Get(time.Now()) + assert.False(t, ok) +} + +func TestGetOrLoadSingleflight(t *testing.T) { + t.Parallel() + + c := newTestCache(5 * time.Second) + var loadCount atomic.Int32 + gate := make(chan struct{}) + + const n = 10 + var wg sync.WaitGroup + results := make([]uint64, n) + errs := make([]error, n) + + wg.Add(n) + for i := range n { + go func(idx int) { + defer wg.Done() + results[idx], errs[idx] = c.GetOrLoad(time.Now(), func() (uint64, error) { + loadCount.Add(1) + <-gate + return 77, nil + }) + }(i) + } + + time.Sleep(50 * time.Millisecond) + close(gate) + wg.Wait() + + assert.Equal(t, int32(1), loadCount.Load()) + for i := range n { + assert.NoError(t, errs[i]) + assert.Equal(t, uint64(77), results[i]) + } +} + +func TestGetOrLoadReloadAfterExpiry(t *testing.T) { + t.Parallel() + + now := time.Now() + c := newTestCache(time.Second) + c.Set(42, now) + + val, err := c.GetOrLoad(now.Add(time.Second+time.Millisecond), func() (uint64, error) { + return 100, nil + }) + + require.NoError(t, err) + assert.Equal(t, uint64(100), val) +} + +func TestMetrics(t *testing.T) { + t.Parallel() + + now := time.Now() + c := newTestCache(time.Second) + + // miss + load + _, _ = c.GetOrLoad(now, func() (uint64, error) { return 42, nil }) + // hit + _, _ = c.GetOrLoad(now, func() (uint64, error) { return 0, nil }) + // invalidate + c.Invalidate() + // miss + load error + _, _ = c.GetOrLoad(now, func() (uint64, error) { return 0, errors.New("fail") }) + + assert.Equal(t, float64(1), testutil.ToFloat64(c.metrics.Hits)) + assert.Equal(t, float64(2), testutil.ToFloat64(c.metrics.Misses)) + assert.Equal(t, float64(2), testutil.ToFloat64(c.metrics.Loads)) + assert.Equal(t, float64(1), testutil.ToFloat64(c.metrics.LoadErrors)) + assert.Equal(t, float64(1), testutil.ToFloat64(c.metrics.Invalidates)) +} diff --git a/pkg/transaction/wrapped/cache/keys.go b/pkg/transaction/wrapped/cache/keys.go new file mode 100644 index 00000000000..a647fb0c78d --- /dev/null +++ b/pkg/transaction/wrapped/cache/keys.go @@ -0,0 +1,7 @@ +package cache + +type Key string + +const ( + BlockNumberKey Key = "block-number" +) diff --git a/pkg/transaction/wrapped/cache/metrics.go b/pkg/transaction/wrapped/cache/metrics.go new file mode 100644 index 00000000000..55594c7ac38 --- /dev/null +++ b/pkg/transaction/wrapped/cache/metrics.go @@ -0,0 +1,96 @@ +package cache + +import ( + m "github.com/ethersphere/bee/v2/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +type metricSet struct { + Hits prometheus.Counter + Misses prometheus.Counter + Loads prometheus.Counter + SharedLoads prometheus.Counter + LoadErrors prometheus.Counter + Invalidates prometheus.Counter +} + +type Metrics struct { + BlockNumber metricSet + Unknown metricSet +} + +func newMetricSet(prefix string) metricSet { + subsystem := "eth_backend_cache" + return metricSet{ + Hits: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: prefix + "_hits", + Help: prefix + " cache hits", + }), + Misses: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: prefix + "_misses", + Help: prefix + " cache misses", + }), + Loads: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: prefix + "_loads", + Help: prefix + " cache backend loads", + }), + SharedLoads: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: prefix + "_shared_loads", + Help: prefix + " cache shared loads", + }), + LoadErrors: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: prefix + "_load_errors", + Help: prefix + " cache load errors", + }), + Invalidates: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: prefix + "_invalidations", + Help: prefix + " cache invalidations", + }), + } +} + +func NewMetrics() Metrics { + return Metrics{ + BlockNumber: newMetricSet("block_number"), + Unknown: newMetricSet("unknown"), + } +} + +func (mtr *Metrics) metricsForKey(key Key) *metricSet { + switch key { + case BlockNumberKey: + return &mtr.BlockNumber + default: + return &mtr.Unknown + } +} + +func (mtr *Metrics) Collectors() []prometheus.Collector { + return []prometheus.Collector{ + mtr.BlockNumber.Hits, + mtr.BlockNumber.Misses, + mtr.BlockNumber.Loads, + mtr.BlockNumber.SharedLoads, + mtr.BlockNumber.LoadErrors, + mtr.BlockNumber.Invalidates, + + mtr.Unknown.Hits, + mtr.Unknown.Misses, + mtr.Unknown.Loads, + mtr.Unknown.SharedLoads, + mtr.Unknown.LoadErrors, + mtr.Unknown.Invalidates, + } +} diff --git a/pkg/transaction/wrapped/wrapped.go b/pkg/transaction/wrapped/wrapped.go index 1d4e452aef3..7e46f19b092 100644 --- a/pkg/transaction/wrapped/wrapped.go +++ b/pkg/transaction/wrapped/wrapped.go @@ -8,12 +8,14 @@ import ( "context" "errors" "math/big" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethersphere/bee/v2/pkg/transaction" "github.com/ethersphere/bee/v2/pkg/transaction/backend" + "github.com/ethersphere/bee/v2/pkg/transaction/wrapped/cache" ) var ( @@ -24,13 +26,19 @@ type wrappedBackend struct { backend backend.Geth metrics metrics minimumGasTipCap int64 + blockNumberCache *cache.ExpiringSingleFlightCache[uint64] } -func NewBackend(backend backend.Geth, minimumGasTipCap uint64) transaction.Backend { +func NewBackend( + backend backend.Geth, + minimumGasTipCap uint64, + blockNumberTTL time.Duration, +) transaction.Backend { return &wrappedBackend{ backend: backend, minimumGasTipCap: int64(minimumGasTipCap), metrics: newMetrics(), + blockNumberCache: cache.NewExpiringSingleFlightCache[uint64](blockNumberTTL, cache.BlockNumberKey), } } @@ -61,14 +69,17 @@ func (b *wrappedBackend) TransactionByHash(ctx context.Context, hash common.Hash } func (b *wrappedBackend) BlockNumber(ctx context.Context) (uint64, error) { - b.metrics.TotalRPCCalls.Inc() - b.metrics.BlockNumberCalls.Inc() - blockNumber, err := b.backend.BlockNumber(ctx) - if err != nil { - b.metrics.TotalRPCErrors.Inc() - return 0, err - } - return blockNumber, nil + return b.blockNumberCache.GetOrLoad(time.Now(), func() (uint64, error) { + b.metrics.TotalRPCCalls.Inc() + b.metrics.BlockNumberCalls.Inc() + + blockNumber, err := b.backend.BlockNumber(ctx) + if err != nil { + b.metrics.TotalRPCErrors.Inc() + return 0, err + } + return blockNumber, nil + }) } func (b *wrappedBackend) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { From 68ecefbbec29a8ec703d8c44533dde87e919471a Mon Sep 17 00:00:00 2001 From: sbackend Date: Thu, 12 Mar 2026 23:41:58 +0100 Subject: [PATCH 2/8] fix: linter issues --- cmd/bee/cmd/db_test.go | 3 -- pkg/api/chunk_stream_test.go | 2 +- pkg/bmt/proof_test.go | 4 +-- pkg/file/buffer_test.go | 4 +-- pkg/file/pipeline/bmt/bmt_test.go | 2 +- pkg/postage/mock/service.go | 2 +- pkg/shed/index_test.go | 32 +++++++++++---------- pkg/storage/migration/steps_chain_test.go | 2 +- pkg/storage/storagetest/storage.go | 2 +- pkg/storer/internal/reserve/reserve_test.go | 4 +-- pkg/storer/mock/mockstorer.go | 2 +- pkg/storer/reserve_test.go | 3 +- pkg/storer/sample_test.go | 2 +- pkg/transaction/wrapped/cache/cache.go | 4 +++ pkg/transaction/wrapped/cache/cache_test.go | 4 +++ pkg/transaction/wrapped/cache/keys.go | 3 ++ pkg/transaction/wrapped/cache/metrics.go | 13 +++------ pkg/transaction/wrapped/fee_test.go | 2 ++ 18 files changed, 49 insertions(+), 41 deletions(-) diff --git a/cmd/bee/cmd/db_test.go b/cmd/bee/cmd/db_test.go index 7d32b5c3080..85593ab0fbe 100644 --- a/cmd/bee/cmd/db_test.go +++ b/cmd/bee/cmd/db_test.go @@ -214,9 +214,6 @@ func TestDBNuke_FLAKY(t *testing.T) { Logger: log.Noop, ReserveCapacity: storer.DefaultReserveCapacity, }, path.Join(dataDir, "localstore")) - if err != nil { - t.Fatal(err) - } defer db.Close() info, err = db.DebugInfo(ctx) diff --git a/pkg/api/chunk_stream_test.go b/pkg/api/chunk_stream_test.go index 5ba62cdd76c..05a77e79ef3 100644 --- a/pkg/api/chunk_stream_test.go +++ b/pkg/api/chunk_stream_test.go @@ -37,7 +37,7 @@ func TestChunkUploadStream(t *testing.T) { ) t.Run("upload and verify", func(t *testing.T) { - chsToGet := []swarm.Chunk{} + chsToGet := make([]swarm.Chunk, 0, 5) for range 5 { ch := testingc.GenerateTestRandomChunk() diff --git a/pkg/bmt/proof_test.go b/pkg/bmt/proof_test.go index 5b3625acefb..4658cb353c7 100644 --- a/pkg/bmt/proof_test.go +++ b/pkg/bmt/proof_test.go @@ -26,7 +26,7 @@ func TestProofCorrectness(t *testing.T) { verifySegments := func(t *testing.T, exp []string, found [][]byte) { t.Helper() - var expSegments [][]byte + expSegments := make([][]byte, 0, len(exp)) for _, v := range exp { decoded, err := hex.DecodeString(v) if err != nil { @@ -154,7 +154,7 @@ func TestProofCorrectness(t *testing.T) { "745bae095b6ff5416b4a351a167f731db6d6f5924f30cd88d48e74261795d27b", } - var segments [][]byte + segments := make([][]byte, 0, len(segmentStrings)) for _, v := range segmentStrings { decoded, err := hex.DecodeString(v) if err != nil { diff --git a/pkg/file/buffer_test.go b/pkg/file/buffer_test.go index 71b374b7f4b..a41f9d4426b 100644 --- a/pkg/file/buffer_test.go +++ b/pkg/file/buffer_test.go @@ -114,10 +114,10 @@ func TestCopyBuffer(t *testing.T) { swarm.ChunkSize*17 + 3, } - testCases := []struct { + testCases := make([]struct { readBufferSize int dataSize int - }{} + }, 0, len(dataSizes)*len(readBufferSizes)) for i := range readBufferSizes { for j := range dataSizes { diff --git a/pkg/file/pipeline/bmt/bmt_test.go b/pkg/file/pipeline/bmt/bmt_test.go index 7fba1d763d0..09a9c965eee 100644 --- a/pkg/file/pipeline/bmt/bmt_test.go +++ b/pkg/file/pipeline/bmt/bmt_test.go @@ -52,7 +52,7 @@ func TestBmtWriter(t *testing.T) { mockChainWriter := mock.NewChainWriter() writer := bmt.NewBmtWriter(mockChainWriter) - var data []byte + data := make([]byte, 0, len(tc.data)+8) if !tc.noSpan { data = make([]byte, 8) diff --git a/pkg/postage/mock/service.go b/pkg/postage/mock/service.go index b4fffd07761..9584ffeeb1c 100644 --- a/pkg/postage/mock/service.go +++ b/pkg/postage/mock/service.go @@ -70,7 +70,7 @@ func (m *mockPostage) StampIssuers() []*postage.StampIssuer { m.issuerLock.Lock() defer m.issuerLock.Unlock() - issuers := make([]*postage.StampIssuer, 0) + issuers := make([]*postage.StampIssuer, 0, len(m.issuersMap)) for _, v := range m.issuersMap { issuers = append(issuers, v) } diff --git a/pkg/shed/index_test.go b/pkg/shed/index_test.go index 91eb657b649..fff3c14bde7 100644 --- a/pkg/shed/index_test.go +++ b/pkg/shed/index_test.go @@ -345,7 +345,7 @@ func TestIndex(t *testing.T) { } t.Run("not found", func(t *testing.T) { - items := make([]Item, len(want)) + items := make([]Item, len(want), len(want)+1) for i, w := range want { items[i] = Item{ Address: w.Address, @@ -373,28 +373,29 @@ func TestIndex_Iterate(t *testing.T) { t.Fatal(err) } - items := []Item{ - { + items := make([]Item, 0, 6) + items = append(items, + Item{ Address: []byte("iterate-hash-01"), Data: []byte("data80"), }, - { + Item{ Address: []byte("iterate-hash-03"), Data: []byte("data22"), }, - { + Item{ Address: []byte("iterate-hash-05"), Data: []byte("data41"), }, - { + Item{ Address: []byte("iterate-hash-02"), Data: []byte("data84"), }, - { + Item{ Address: []byte("iterate-hash-06"), Data: []byte("data1"), }, - } + ) batch := new(leveldb.Batch) for _, i := range items { err = index.PutInBatch(batch, i) @@ -555,28 +556,29 @@ func TestIndex_IterateReverse(t *testing.T) { t.Fatal(err) } - items := []Item{ - { + items := make([]Item, 0, 6) + items = append(items, + Item{ Address: []byte("iterate-hash-01"), Data: []byte("data80"), }, - { + Item{ Address: []byte("iterate-hash-03"), Data: []byte("data22"), }, - { + Item{ Address: []byte("iterate-hash-05"), Data: []byte("data41"), }, - { + Item{ Address: []byte("iterate-hash-02"), Data: []byte("data84"), }, - { + Item{ Address: []byte("iterate-hash-06"), Data: []byte("data1"), }, - } + ) batch := new(leveldb.Batch) for _, i := range items { err = index.PutInBatch(batch, i) diff --git a/pkg/storage/migration/steps_chain_test.go b/pkg/storage/migration/steps_chain_test.go index 327a41331d7..5bcc1cb6aeb 100644 --- a/pkg/storage/migration/steps_chain_test.go +++ b/pkg/storage/migration/steps_chain_test.go @@ -19,7 +19,7 @@ func TestNewStepsChain(t *testing.T) { store := inmemstore.New() populateStore(t, store, populateItemsCount) - stepsFn := make([]migration.StepFn, 0) + stepsFn := make([]migration.StepFn, 0, 10) // Create 10 step functions where each would remove single element, having value [0-10) for i := range 10 { diff --git a/pkg/storage/storagetest/storage.go b/pkg/storage/storagetest/storage.go index 9ee41971b43..7869227e43c 100644 --- a/pkg/storage/storagetest/storage.go +++ b/pkg/storage/storagetest/storage.go @@ -95,7 +95,7 @@ func (o *obj1) ID() string { return o.Id } func (obj1) Namespace() string { return "obj1" } func (o *obj1) Marshal() ([]byte, error) { - buf := make([]byte, 40) + buf := make([]byte, 40, 40+len(o.Buf)) copy(buf[:32], o.Id) binary.LittleEndian.PutUint64(buf[32:], o.SomeInt) buf = append(buf, o.Buf[:]...) diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 87f700a98c8..e0dbd6a4305 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -537,7 +537,7 @@ func TestEvict(t *testing.T) { ts := internal.NewInmemStorage() chunksPerBatch := 50 - var chunks []swarm.Chunk + chunks := make([]swarm.Chunk, 0, 3*chunksPerBatch) batches := []*postage.Batch{postagetesting.MustNewBatch(), postagetesting.MustNewBatch(), postagetesting.MustNewBatch()} evictBatch := batches[1] @@ -690,7 +690,7 @@ func TestEvictMaxCount(t *testing.T) { t.Fatal(err) } - var chunks []swarm.Chunk + chunks := make([]swarm.Chunk, 0, 20) batch := postagetesting.MustNewBatch() diff --git a/pkg/storer/mock/mockstorer.go b/pkg/storer/mock/mockstorer.go index efdce20bbf4..2d851355714 100644 --- a/pkg/storer/mock/mockstorer.go +++ b/pkg/storer/mock/mockstorer.go @@ -132,7 +132,7 @@ func (m *mockStorer) ListSessions(offset, limit int) ([]storer.SessionInfo, erro m.mu.Lock() defer m.mu.Unlock() - sessions := []storer.SessionInfo{} + sessions := make([]storer.SessionInfo, 0, len(m.activeSessions)) for _, v := range m.activeSessions { sessions = append(sessions, *v) } diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index 0e5ab3b9214..8be3be183c2 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -220,8 +220,8 @@ func TestEvictBatch(t *testing.T) { } ctx := context.Background() - var chunks []swarm.Chunk var chunksPerPO uint64 = 10 + chunks := make([]swarm.Chunk, 0, chunksPerPO*3) batches := []*postage.Batch{postagetesting.MustNewBatch(), postagetesting.MustNewBatch(), postagetesting.MustNewBatch()} evictBatch := batches[1] @@ -543,6 +543,7 @@ func TestSubscribeBin(t *testing.T) { chunksPerPO uint64 = 50 putter = storer.ReservePutter() ) + chunks = make([]swarm.Chunk, 0, chunksPerPO*2) for j := range 2 { for range chunksPerPO { diff --git a/pkg/storer/sample_test.go b/pkg/storer/sample_test.go index a73ebbcf96e..c1a34ddec73 100644 --- a/pkg/storer/sample_test.go +++ b/pkg/storer/sample_test.go @@ -26,7 +26,7 @@ func TestReserveSampler(t *testing.T) { const maxPO = 10 randChunks := func(baseAddr swarm.Address, timeVar uint64) []swarm.Chunk { - var chs []swarm.Chunk + chs := make([]swarm.Chunk, 0, chunkCountPerPO*maxPO) for po := range maxPO { for range chunkCountPerPO { ch := chunk.GenerateValidRandomChunkAt(t, baseAddr, po).WithBatch(3, 2, false) diff --git a/pkg/transaction/wrapped/cache/cache.go b/pkg/transaction/wrapped/cache/cache.go index 008c6562ab5..28691ba9c9f 100644 --- a/pkg/transaction/wrapped/cache/cache.go +++ b/pkg/transaction/wrapped/cache/cache.go @@ -1,3 +1,7 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package cache import ( diff --git a/pkg/transaction/wrapped/cache/cache_test.go b/pkg/transaction/wrapped/cache/cache_test.go index 5de3e1700e8..a40dd8041c3 100644 --- a/pkg/transaction/wrapped/cache/cache_test.go +++ b/pkg/transaction/wrapped/cache/cache_test.go @@ -1,3 +1,7 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package cache import ( diff --git a/pkg/transaction/wrapped/cache/keys.go b/pkg/transaction/wrapped/cache/keys.go index a647fb0c78d..1397e99f15f 100644 --- a/pkg/transaction/wrapped/cache/keys.go +++ b/pkg/transaction/wrapped/cache/keys.go @@ -1,3 +1,6 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. package cache type Key string diff --git a/pkg/transaction/wrapped/cache/metrics.go b/pkg/transaction/wrapped/cache/metrics.go index 55594c7ac38..4764822c359 100644 --- a/pkg/transaction/wrapped/cache/metrics.go +++ b/pkg/transaction/wrapped/cache/metrics.go @@ -1,3 +1,7 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package cache import ( @@ -68,15 +72,6 @@ func NewMetrics() Metrics { } } -func (mtr *Metrics) metricsForKey(key Key) *metricSet { - switch key { - case BlockNumberKey: - return &mtr.BlockNumber - default: - return &mtr.Unknown - } -} - func (mtr *Metrics) Collectors() []prometheus.Collector { return []prometheus.Collector{ mtr.BlockNumber.Hits, diff --git a/pkg/transaction/wrapped/fee_test.go b/pkg/transaction/wrapped/fee_test.go index 41afb69f28f..d78728df00b 100644 --- a/pkg/transaction/wrapped/fee_test.go +++ b/pkg/transaction/wrapped/fee_test.go @@ -9,6 +9,7 @@ import ( "errors" "math/big" "testing" + "time" "github.com/ethereum/go-ethereum/core/types" "github.com/ethersphere/bee/v2/pkg/transaction/backendmock" @@ -105,6 +106,7 @@ func TestSuggestedFeeAndTip(t *testing.T) { }), ), minimumGasTipCap, + 5*time.Second, ) gasFeeCap, gasTipCap, err := backend.SuggestedFeeAndTip(ctx, tc.gasPrice, tc.boostPercent) From 1ff4f755ef27325d10b900737ccb04795e68c37d Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 16 Mar 2026 11:53:00 +0100 Subject: [PATCH 3/8] fix: enable cache metrics --- pkg/transaction/wrapped/cache/cache.go | 12 ++++++++++++ pkg/transaction/wrapped/cache/keys.go | 2 +- pkg/transaction/wrapped/metrics.go | 4 +++- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/pkg/transaction/wrapped/cache/cache.go b/pkg/transaction/wrapped/cache/cache.go index 28691ba9c9f..0ca2b56d1e1 100644 --- a/pkg/transaction/wrapped/cache/cache.go +++ b/pkg/transaction/wrapped/cache/cache.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/singleflight" ) @@ -34,6 +35,17 @@ func NewExpiringSingleFlightCache[T any](ttl time.Duration, key Key) *ExpiringSi } } +func (c *ExpiringSingleFlightCache[T]) Collectors() []prometheus.Collector { + return []prometheus.Collector{ + c.metrics.Hits, + c.metrics.Misses, + c.metrics.Loads, + c.metrics.SharedLoads, + c.metrics.LoadErrors, + c.metrics.Invalidates, + } +} + func (c *ExpiringSingleFlightCache[T]) Get(now time.Time) (T, bool) { c.mu.RLock() defer c.mu.RUnlock() diff --git a/pkg/transaction/wrapped/cache/keys.go b/pkg/transaction/wrapped/cache/keys.go index 1397e99f15f..15a9f7b7eb7 100644 --- a/pkg/transaction/wrapped/cache/keys.go +++ b/pkg/transaction/wrapped/cache/keys.go @@ -6,5 +6,5 @@ package cache type Key string const ( - BlockNumberKey Key = "block-number" + BlockNumberKey Key = "block_number" ) diff --git a/pkg/transaction/wrapped/metrics.go b/pkg/transaction/wrapped/metrics.go index a5cfaa554a0..cd9364696a8 100644 --- a/pkg/transaction/wrapped/metrics.go +++ b/pkg/transaction/wrapped/metrics.go @@ -126,5 +126,7 @@ func newMetrics() metrics { } func (b *wrappedBackend) Metrics() []prometheus.Collector { - return m.PrometheusCollectorsFromFields(b.metrics) + collectors := m.PrometheusCollectorsFromFields(b.metrics) + collectors = append(collectors, b.blockNumberCache.Collectors()...) + return collectors } From 095f4afe171f0f37e94b43be561cfdfdb61283c7 Mon Sep 17 00:00:00 2001 From: sbackend Date: Tue, 17 Mar 2026 14:44:56 +0100 Subject: [PATCH 4/8] fix: review issues --- pkg/node/chain.go | 4 ++-- pkg/node/node.go | 4 ++++ pkg/transaction/wrapped/cache/metrics.go | 7 ------- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/pkg/node/chain.go b/pkg/node/chain.go index 9cd3c60574e..c49213c2375 100644 --- a/pkg/node/chain.go +++ b/pkg/node/chain.go @@ -51,7 +51,7 @@ func InitChain( pollingInterval time.Duration, chainEnabled bool, minimumGasTipCap uint64, - blockCacheTTl time.Duration, + blockCacheTTL time.Duration, ) (transaction.Backend, common.Address, int64, transaction.Monitor, transaction.Service, error) { backend := backendnoop.New(chainID) @@ -73,7 +73,7 @@ func InitChain( logger.Info("connected to blockchain backend", "version", versionString) - backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, blockCacheTTl) + backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, blockCacheTTL) } backendChainID, err := backend.ChainID(ctx) diff --git a/pkg/node/node.go b/pkg/node/node.go index e75054a8f77..8923f0767cd 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -399,6 +399,10 @@ func NewBee( } } + if o.BlockTime.Seconds() == 1 { + + } + chainBackend, overlayEthAddress, chainID, transactionMonitor, transactionService, err := InitChain( ctx, logger, diff --git a/pkg/transaction/wrapped/cache/metrics.go b/pkg/transaction/wrapped/cache/metrics.go index 4764822c359..26b2910a37a 100644 --- a/pkg/transaction/wrapped/cache/metrics.go +++ b/pkg/transaction/wrapped/cache/metrics.go @@ -65,13 +65,6 @@ func newMetricSet(prefix string) metricSet { } } -func NewMetrics() Metrics { - return Metrics{ - BlockNumber: newMetricSet("block_number"), - Unknown: newMetricSet("unknown"), - } -} - func (mtr *Metrics) Collectors() []prometheus.Collector { return []prometheus.Collector{ mtr.BlockNumber.Hits, From e3b1aa5f4d7b9fe6dc2ddace9206b69e6f45d81b Mon Sep 17 00:00:00 2001 From: sbackend Date: Wed, 18 Mar 2026 13:51:32 +0100 Subject: [PATCH 5/8] fix: dead code --- pkg/node/node.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 8923f0767cd..e75054a8f77 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -399,10 +399,6 @@ func NewBee( } } - if o.BlockTime.Seconds() == 1 { - - } - chainBackend, overlayEthAddress, chainID, transactionMonitor, transactionService, err := InitChain( ctx, logger, From f8e304924fe09cd4e20fb0695227ed6006587e43 Mon Sep 17 00:00:00 2001 From: sbackend Date: Wed, 25 Mar 2026 16:22:35 +0100 Subject: [PATCH 6/8] fix: update cache implementation --- cmd/bee/cmd/deploy.go | 2 +- pkg/node/chain.go | 4 +- pkg/node/node.go | 3 +- pkg/transaction/wrapped/cache/cache.go | 46 ++--- pkg/transaction/wrapped/cache/cache_test.go | 189 ++++++++++++-------- pkg/transaction/wrapped/cache/keys.go | 10 -- pkg/transaction/wrapped/cache/metrics.go | 9 - pkg/transaction/wrapped/fee_test.go | 1 + pkg/transaction/wrapped/wrapped.go | 64 +++++-- pkg/transaction/wrapped/wrapped_test.go | 174 ++++++++++++++++++ 10 files changed, 361 insertions(+), 141 deletions(-) delete mode 100644 pkg/transaction/wrapped/cache/keys.go create mode 100644 pkg/transaction/wrapped/wrapped_test.go diff --git a/cmd/bee/cmd/deploy.go b/cmd/bee/cmd/deploy.go index c8b3a0bcd38..46b0e46e22b 100644 --- a/cmd/bee/cmd/deploy.go +++ b/cmd/bee/cmd/deploy.go @@ -60,7 +60,7 @@ func (c *command) initDeployCmd() error { blocktime, true, c.config.GetUint64(optionNameMinimumGasTipCap), - blocktime-2, + 0, ) if err != nil { return err diff --git a/pkg/node/chain.go b/pkg/node/chain.go index c49213c2375..f63e70bbd14 100644 --- a/pkg/node/chain.go +++ b/pkg/node/chain.go @@ -51,7 +51,7 @@ func InitChain( pollingInterval time.Duration, chainEnabled bool, minimumGasTipCap uint64, - blockCacheTTL time.Duration, + blockCacheTTLPercent uint64, ) (transaction.Backend, common.Address, int64, transaction.Monitor, transaction.Service, error) { backend := backendnoop.New(chainID) @@ -73,7 +73,7 @@ func InitChain( logger.Info("connected to blockchain backend", "version", versionString) - backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, blockCacheTTL) + backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, pollingInterval, blockCacheTTLPercent) } backendChainID, err := backend.ChainID(ctx) diff --git a/pkg/node/node.go b/pkg/node/node.go index e75054a8f77..b9b21593bca 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -135,6 +135,7 @@ type Options struct { BlockchainRpcEndpoint string BlockProfile bool BlockTime time.Duration + BlockCacheTTLPercent uint64 BootnodeMode bool Bootnodes []string CacheCapacity uint64 @@ -409,7 +410,7 @@ func NewBee( o.BlockTime, chainEnabled, o.MinimumGasTipCap, - o.BlockTime*85/100, + o.BlockCacheTTLPercent, ) if err != nil { return nil, fmt.Errorf("init chain: %w", err) diff --git a/pkg/transaction/wrapped/cache/cache.go b/pkg/transaction/wrapped/cache/cache.go index 0ca2b56d1e1..c553f33182a 100644 --- a/pkg/transaction/wrapped/cache/cache.go +++ b/pkg/transaction/wrapped/cache/cache.go @@ -1,37 +1,33 @@ -// Copyright 2025 The Swarm Authors. All rights reserved. +// Copyright 2026 The Swarm Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package cache import ( + "context" "sync" "time" "github.com/prometheus/client_golang/prometheus" - "golang.org/x/sync/singleflight" + "resenje.org/singleflight" ) -type Loader[T any] func() (T, error) - +type Loader[T any] func() (T, time.Time, error) type ExpiringSingleFlightCache[T any] struct { - ttl time.Duration - mu sync.RWMutex value T - valid bool expiresAt time.Time - group singleflight.Group - key Key + group singleflight.Group[string, any] + key string metrics metricSet } -func NewExpiringSingleFlightCache[T any](ttl time.Duration, key Key) *ExpiringSingleFlightCache[T] { +func NewExpiringSingleFlightCache[T any](metricsPrefix string) *ExpiringSingleFlightCache[T] { return &ExpiringSingleFlightCache[T]{ - ttl: ttl, - key: key, - metrics: newMetricSet(string(key)), + key: metricsPrefix, + metrics: newMetricSet(metricsPrefix), } } @@ -42,7 +38,6 @@ func (c *ExpiringSingleFlightCache[T]) Collectors() []prometheus.Collector { c.metrics.Loads, c.metrics.SharedLoads, c.metrics.LoadErrors, - c.metrics.Invalidates, } } @@ -50,7 +45,7 @@ func (c *ExpiringSingleFlightCache[T]) Get(now time.Time) (T, bool) { c.mu.RLock() defer c.mu.RUnlock() - if c.valid && now.Before(c.expiresAt) { + if now.Before(c.expiresAt) { return c.value, true } @@ -58,24 +53,15 @@ func (c *ExpiringSingleFlightCache[T]) Get(now time.Time) (T, bool) { return zero, false } -func (c *ExpiringSingleFlightCache[T]) Set(value T, now time.Time) { +func (c *ExpiringSingleFlightCache[T]) Set(value T, expiresAt time.Time) { c.mu.Lock() defer c.mu.Unlock() c.value = value - c.valid = true - c.expiresAt = now.Add(c.ttl) -} - -func (c *ExpiringSingleFlightCache[T]) Invalidate() { - c.mu.Lock() - defer c.mu.Unlock() - - c.valid = false - c.metrics.Invalidates.Inc() + c.expiresAt = expiresAt } -func (c *ExpiringSingleFlightCache[T]) GetOrLoad(now time.Time, loader Loader[T]) (T, error) { +func (c *ExpiringSingleFlightCache[T]) GetOrLoad(ctx context.Context, now time.Time, loader Loader[T]) (T, error) { if v, ok := c.Get(now); ok { c.metrics.Hits.Inc() return v, nil @@ -83,14 +69,14 @@ func (c *ExpiringSingleFlightCache[T]) GetOrLoad(now time.Time, loader Loader[T] c.metrics.Misses.Inc() - result, err, shared := c.group.Do(string(c.key), func() (any, error) { + result, shared, err := c.group.Do(ctx, c.key, func(ctx context.Context) (any, error) { c.metrics.Loads.Inc() - val, err := loader() + val, expiresAt, err := loader() if err != nil { c.metrics.LoadErrors.Inc() return val, err } - c.Set(val, time.Now()) + c.Set(val, expiresAt) return val, nil }) diff --git a/pkg/transaction/wrapped/cache/cache_test.go b/pkg/transaction/wrapped/cache/cache_test.go index a40dd8041c3..684cd4729fc 100644 --- a/pkg/transaction/wrapped/cache/cache_test.go +++ b/pkg/transaction/wrapped/cache/cache_test.go @@ -1,14 +1,16 @@ -// Copyright 2025 The Swarm Authors. All rights reserved. +// Copyright 2026 The Swarm Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package cache import ( + "context" "errors" "sync" "sync/atomic" "testing" + "testing/synctest" "time" "github.com/prometheus/client_golang/prometheus/testutil" @@ -16,13 +18,12 @@ import ( "github.com/stretchr/testify/require" ) -const testKey Key = "test" +const testMetricsPrefix = "test" -func newTestCache(ttl time.Duration) *ExpiringSingleFlightCache[uint64] { +func newTestCache() *ExpiringSingleFlightCache[uint64] { return &ExpiringSingleFlightCache[uint64]{ - ttl: ttl, - key: testKey, - metrics: newMetricSet(string(testKey)), + key: testMetricsPrefix, + metrics: newMetricSet(testMetricsPrefix), } } @@ -30,8 +31,9 @@ func TestSetGet(t *testing.T) { t.Parallel() now := time.Now() - c := newTestCache(time.Second) - c.Set(42, now) + expiresAt := now.Add(time.Second) + c := newTestCache() + c.Set(42, expiresAt) val, ok := c.Get(now) assert.True(t, ok) @@ -42,34 +44,22 @@ func TestTTLExpiry(t *testing.T) { t.Parallel() now := time.Now() - c := newTestCache(time.Second) + c := newTestCache() c.Set(42, now) _, ok := c.Get(now.Add(time.Second + time.Millisecond)) assert.False(t, ok) } -func TestInvalidate(t *testing.T) { - t.Parallel() - - now := time.Now() - c := newTestCache(5 * time.Second) - c.Set(42, now) - c.Invalidate() - - _, ok := c.Get(now) - assert.False(t, ok) -} - func TestGetOrLoadMiss(t *testing.T) { t.Parallel() - c := newTestCache(time.Second) + c := newTestCache() var loadCount atomic.Int32 - val, err := c.GetOrLoad(time.Now(), func() (uint64, error) { + val, err := c.GetOrLoad(context.Background(), time.Now(), func() (uint64, time.Time, error) { loadCount.Add(1) - return 99, nil + return 99, time.Now().Add(time.Second), nil }) require.NoError(t, err) @@ -83,30 +73,32 @@ func TestGetOrLoadMiss(t *testing.T) { func TestGetOrLoadHit(t *testing.T) { t.Parallel() + const expectedVal = uint64(42) now := time.Now() - c := newTestCache(time.Second) - c.Set(42, now) + expiresAt := now.Add(time.Second) + c := newTestCache() + c.Set(expectedVal, expiresAt) var loadCount atomic.Int32 - val, err := c.GetOrLoad(now, func() (uint64, error) { + val, err := c.GetOrLoad(context.Background(), now, func() (uint64, time.Time, error) { loadCount.Add(1) - return 99, nil + return expectedVal, now.Add(time.Second), nil }) require.NoError(t, err) - assert.Equal(t, uint64(42), val) + assert.Equal(t, expectedVal, val) assert.Equal(t, int32(0), loadCount.Load()) } func TestGetOrLoadError(t *testing.T) { t.Parallel() - c := newTestCache(time.Second) + c := newTestCache() errLoad := errors.New("load failed") - val, err := c.GetOrLoad(time.Now(), func() (uint64, error) { - return 0, errLoad + val, err := c.GetOrLoad(context.Background(), time.Now(), func() (uint64, time.Time, error) { + return 0, time.Time{}, errLoad }) assert.ErrorIs(t, err, errLoad) @@ -117,73 +109,120 @@ func TestGetOrLoadError(t *testing.T) { } func TestGetOrLoadSingleflight(t *testing.T) { - t.Parallel() - - c := newTestCache(5 * time.Second) - var loadCount atomic.Int32 - gate := make(chan struct{}) - - const n = 10 - var wg sync.WaitGroup - results := make([]uint64, n) - errs := make([]error, n) - - wg.Add(n) - for i := range n { - go func(idx int) { - defer wg.Done() - results[idx], errs[idx] = c.GetOrLoad(time.Now(), func() (uint64, error) { - loadCount.Add(1) - <-gate - return 77, nil - }) - }(i) - } - - time.Sleep(50 * time.Millisecond) - close(gate) - wg.Wait() - - assert.Equal(t, int32(1), loadCount.Load()) - for i := range n { - assert.NoError(t, errs[i]) - assert.Equal(t, uint64(77), results[i]) - } + const value = uint64(77) + synctest.Test(t, func(t *testing.T) { + c := newTestCache() + var loadCount atomic.Int32 + gate := make(chan struct{}) + + const n = 10 + var wg sync.WaitGroup + results := make([]uint64, n) + errs := make([]error, n) + + wg.Add(n) + for i := range n { + go func(idx int) { + defer wg.Done() + now := time.Now() + results[idx], errs[idx] = c.GetOrLoad(context.Background(), now, func() (uint64, time.Time, error) { + loadCount.Add(1) + <-gate + return value, now.Add(time.Second), nil + }) + }(i) + } + + time.Sleep(50 * time.Millisecond) + close(gate) + wg.Wait() + + assert.Equal(t, int32(1), loadCount.Load()) + for i := range n { + assert.NoError(t, errs[i]) + assert.Equal(t, value, results[i]) + } + }) } func TestGetOrLoadReloadAfterExpiry(t *testing.T) { t.Parallel() now := time.Now() - c := newTestCache(time.Second) + c := newTestCache() c.Set(42, now) - val, err := c.GetOrLoad(now.Add(time.Second+time.Millisecond), func() (uint64, error) { - return 100, nil + val, err := c.GetOrLoad(context.Background(), now.Add(time.Second+time.Millisecond), func() (uint64, time.Time, error) { + return 100, now.Add(time.Second + time.Millisecond), nil }) require.NoError(t, err) assert.Equal(t, uint64(100), val) } +func TestGetOrLoadContextCancellation(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + const expectedVal = 55 + c := newTestCache() + gate := make(chan struct{}) + + ctx1, cancel1 := context.WithCancel(context.Background()) + ctx2 := context.Background() + + var wg sync.WaitGroup + var result1, result2 uint64 + var err1, err2 error + + wg.Add(2) + go func() { + defer wg.Done() + result1, err1 = c.GetOrLoad(ctx1, time.Now(), func() (uint64, time.Time, error) { + <-gate + return expectedVal, time.Now().Add(time.Second), nil + }) + }() + + go func() { + defer wg.Done() + result2, err2 = c.GetOrLoad(ctx2, time.Now(), func() (uint64, time.Time, error) { + <-gate + return expectedVal, time.Now().Add(time.Second), nil + }) + }() + + time.Sleep(50 * time.Millisecond) + cancel1() + time.Sleep(50 * time.Millisecond) + close(gate) + wg.Wait() + + assert.ErrorIs(t, err1, context.Canceled) + assert.Equal(t, uint64(0), result1) + + require.NoError(t, err2) + assert.Equal(t, uint64(expectedVal), result2) + }) +} + func TestMetrics(t *testing.T) { t.Parallel() now := time.Now() - c := newTestCache(time.Second) + expiresAt := now.Add(time.Second) + c := newTestCache() + + ctx := context.Background() + + // miss + load error + _, _ = c.GetOrLoad(ctx, now, func() (uint64, time.Time, error) { return 0, time.Time{}, errors.New("fail") }) // miss + load - _, _ = c.GetOrLoad(now, func() (uint64, error) { return 42, nil }) + _, _ = c.GetOrLoad(ctx, now, func() (uint64, time.Time, error) { return 42, expiresAt, nil }) // hit - _, _ = c.GetOrLoad(now, func() (uint64, error) { return 0, nil }) - // invalidate - c.Invalidate() - // miss + load error - _, _ = c.GetOrLoad(now, func() (uint64, error) { return 0, errors.New("fail") }) + _, _ = c.GetOrLoad(ctx, now, func() (uint64, time.Time, error) { return 0, expiresAt, nil }) assert.Equal(t, float64(1), testutil.ToFloat64(c.metrics.Hits)) assert.Equal(t, float64(2), testutil.ToFloat64(c.metrics.Misses)) assert.Equal(t, float64(2), testutil.ToFloat64(c.metrics.Loads)) assert.Equal(t, float64(1), testutil.ToFloat64(c.metrics.LoadErrors)) - assert.Equal(t, float64(1), testutil.ToFloat64(c.metrics.Invalidates)) } diff --git a/pkg/transaction/wrapped/cache/keys.go b/pkg/transaction/wrapped/cache/keys.go deleted file mode 100644 index 15a9f7b7eb7..00000000000 --- a/pkg/transaction/wrapped/cache/keys.go +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright 2025 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. -package cache - -type Key string - -const ( - BlockNumberKey Key = "block_number" -) diff --git a/pkg/transaction/wrapped/cache/metrics.go b/pkg/transaction/wrapped/cache/metrics.go index 26b2910a37a..039a3ed7824 100644 --- a/pkg/transaction/wrapped/cache/metrics.go +++ b/pkg/transaction/wrapped/cache/metrics.go @@ -15,7 +15,6 @@ type metricSet struct { Loads prometheus.Counter SharedLoads prometheus.Counter LoadErrors prometheus.Counter - Invalidates prometheus.Counter } type Metrics struct { @@ -56,12 +55,6 @@ func newMetricSet(prefix string) metricSet { Name: prefix + "_load_errors", Help: prefix + " cache load errors", }), - Invalidates: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: m.Namespace, - Subsystem: subsystem, - Name: prefix + "_invalidations", - Help: prefix + " cache invalidations", - }), } } @@ -72,13 +65,11 @@ func (mtr *Metrics) Collectors() []prometheus.Collector { mtr.BlockNumber.Loads, mtr.BlockNumber.SharedLoads, mtr.BlockNumber.LoadErrors, - mtr.BlockNumber.Invalidates, mtr.Unknown.Hits, mtr.Unknown.Misses, mtr.Unknown.Loads, mtr.Unknown.SharedLoads, mtr.Unknown.LoadErrors, - mtr.Unknown.Invalidates, } } diff --git a/pkg/transaction/wrapped/fee_test.go b/pkg/transaction/wrapped/fee_test.go index d78728df00b..33ddc31da88 100644 --- a/pkg/transaction/wrapped/fee_test.go +++ b/pkg/transaction/wrapped/fee_test.go @@ -107,6 +107,7 @@ func TestSuggestedFeeAndTip(t *testing.T) { ), minimumGasTipCap, 5*time.Second, + 90, ) gasFeeCap, gasTipCap, err := backend.SuggestedFeeAndTip(ctx, tc.gasPrice, tc.boostPercent) diff --git a/pkg/transaction/wrapped/wrapped.go b/pkg/transaction/wrapped/wrapped.go index 7e46f19b092..fe640020bff 100644 --- a/pkg/transaction/wrapped/wrapped.go +++ b/pkg/transaction/wrapped/wrapped.go @@ -18,27 +18,37 @@ import ( "github.com/ethersphere/bee/v2/pkg/transaction/wrapped/cache" ) +const DefaultBlockNumberTTLPercent = 85 + var ( _ transaction.Backend = (*wrappedBackend)(nil) ) type wrappedBackend struct { - backend backend.Geth - metrics metrics - minimumGasTipCap int64 - blockNumberCache *cache.ExpiringSingleFlightCache[uint64] + backend backend.Geth + metrics metrics + minimumGasTipCap int64 + blockTime time.Duration + blockNumberTTLPercent uint64 + blockNumberCache *cache.ExpiringSingleFlightCache[uint64] } func NewBackend( backend backend.Geth, minimumGasTipCap uint64, - blockNumberTTL time.Duration, + blockTime time.Duration, + blockNumberTTLPercent uint64, ) transaction.Backend { + if blockNumberTTLPercent == 0 || blockNumberTTLPercent >= 100 { + blockNumberTTLPercent = DefaultBlockNumberTTLPercent + } return &wrappedBackend{ - backend: backend, - minimumGasTipCap: int64(minimumGasTipCap), - metrics: newMetrics(), - blockNumberCache: cache.NewExpiringSingleFlightCache[uint64](blockNumberTTL, cache.BlockNumberKey), + backend: backend, + minimumGasTipCap: int64(minimumGasTipCap), + blockTime: blockTime, + blockNumberTTLPercent: blockNumberTTLPercent, + metrics: newMetrics(), + blockNumberCache: cache.NewExpiringSingleFlightCache[uint64]("block_number"), } } @@ -69,16 +79,44 @@ func (b *wrappedBackend) TransactionByHash(ctx context.Context, hash common.Hash } func (b *wrappedBackend) BlockNumber(ctx context.Context) (uint64, error) { - return b.blockNumberCache.GetOrLoad(time.Now(), func() (uint64, error) { + now := time.Now().UTC() + + return b.blockNumberCache.GetOrLoad(ctx, now, func() (uint64, time.Time, error) { b.metrics.TotalRPCCalls.Inc() b.metrics.BlockNumberCalls.Inc() - blockNumber, err := b.backend.BlockNumber(ctx) + header, err := b.backend.HeaderByNumber(ctx, nil) if err != nil { b.metrics.TotalRPCErrors.Inc() - return 0, err + return 0, time.Time{}, err + } + if header == nil || header.Number == nil { + b.metrics.TotalRPCErrors.Inc() + return 0, time.Time{}, errors.New("latest block header unavailable") + } + + now = time.Now().UTC() + ttl := b.blockTime * time.Duration(b.blockNumberTTLPercent) / 100 + if ttl < 500*time.Millisecond { + ttl = time.Second + } + + expiresAt := time.Unix(int64(header.Time), 0).Add(ttl) + maxExpiresAt := now.Add(ttl) + if expiresAt.After(maxExpiresAt) { + // in case if local clocks are behind blockchain clocks + expiresAt = maxExpiresAt + } + + if !expiresAt.After(now) { + // in case if local clocks are ahead blockchain clocks, or we got block header right before new block + retryTTL := b.blockTime / 10 + if retryTTL < 100*time.Millisecond { + retryTTL = time.Second + } + expiresAt = now.Add(retryTTL) } - return blockNumber, nil + return header.Number.Uint64(), expiresAt, nil }) } diff --git a/pkg/transaction/wrapped/wrapped_test.go b/pkg/transaction/wrapped/wrapped_test.go new file mode 100644 index 00000000000..65731f77189 --- /dev/null +++ b/pkg/transaction/wrapped/wrapped_test.go @@ -0,0 +1,174 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package wrapped + +import ( + "context" + "math/big" + "sync/atomic" + "testing" + "testing/synctest" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethersphere/bee/v2/pkg/transaction/backendmock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testBlockTime = 5 * time.Second + testTTLPercent = 90 + testMinimumGasTipCap = 0 +) + +func TestBlockNumberUsesLatestHeaderCache(t *testing.T) { + t.Parallel() + + const expectedBlock = uint64(10) + var headerCalls atomic.Int32 + + backend := NewBackend( + backendmock.New( + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + headerCalls.Add(1) + assert.Nil(t, number) + return &types.Header{ + Number: big.NewInt(int64(expectedBlock)), + Time: uint64(time.Now().Add(time.Second).Unix()), + BaseFee: big.NewInt(1), + }, nil + }), + ), + testMinimumGasTipCap, + testBlockTime, + testTTLPercent, + ) + + first, err := backend.BlockNumber(context.Background()) + require.NoError(t, err) + assert.Equal(t, expectedBlock, first) + + second, err := backend.BlockNumber(context.Background()) + require.NoError(t, err) + assert.Equal(t, expectedBlock, second) + assert.Equal(t, int32(1), headerCalls.Load()) +} + +func TestBlockNumberNearExpiry(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + const ( + staleBlock = uint64(100) + freshBlock = uint64(101) + ) + var headerCalls atomic.Int32 + + now := time.Now() + backend := NewBackend( + backendmock.New( + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + if headerCalls.Load() == 1 { + return &types.Header{ + Number: big.NewInt(int64(staleBlock)), + Time: uint64(now.Add(-testBlockTime).Unix()), + }, nil + } + return &types.Header{ + Number: big.NewInt(int64(freshBlock)), + Time: uint64(now.Unix()), + }, nil + }), + ), + testMinimumGasTipCap, + testBlockTime, + testTTLPercent, + ) + + ctx := context.Background() + first, err := backend.BlockNumber(ctx) + require.NoError(t, err) + assert.Equal(t, staleBlock, first) + + time.Sleep(testBlockTime/10 + 100*time.Millisecond) + + second, err := backend.BlockNumber(ctx) + require.NoError(t, err) + assert.Equal(t, freshBlock, second) + assert.Equal(t, int32(2), headerCalls.Load()) + }) +} + +func TestBlockNumberLocalClockBehind(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + const expectedBlock = uint64(200) + var headerCalls atomic.Int32 + + backend := NewBackend( + backendmock.New( + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + headerCalls.Add(1) + return &types.Header{ + Number: big.NewInt(int64(expectedBlock)), + Time: uint64(time.Now().Add(30 * time.Second).Unix()), + }, nil + }), + ), + testMinimumGasTipCap, + testBlockTime, + testTTLPercent, + ) + + ttl := testBlockTime * testTTLPercent / 100 + + val, err := backend.BlockNumber(context.Background()) + require.NoError(t, err) + assert.Equal(t, expectedBlock, val) + + time.Sleep(ttl + 100*time.Millisecond) + + _, err = backend.BlockNumber(context.Background()) + require.NoError(t, err) + assert.Equal(t, int32(2), headerCalls.Load()) + }) +} + +func TestBlockNumberLocalClockAhead(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + + const expectedBlock = uint64(300) + var headerCalls atomic.Int32 + + backend := NewBackend( + backendmock.New( + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + headerCalls.Add(1) + return &types.Header{ + Number: big.NewInt(int64(expectedBlock)), + Time: uint64(time.Now().Add(-2 * testBlockTime).Unix()), + }, nil + }), + ), + testMinimumGasTipCap, + testBlockTime, + testTTLPercent, + ) + + val, err := backend.BlockNumber(context.Background()) + require.NoError(t, err) + assert.Equal(t, expectedBlock, val) + + retryTTL := testBlockTime / 10 + time.Sleep(retryTTL + 100*time.Millisecond) + + _, err = backend.BlockNumber(context.Background()) + require.NoError(t, err) + assert.Equal(t, int32(2), headerCalls.Load()) + }) +} From e6945ef30ab793eb84d3044e9653bb3d76a654e9 Mon Sep 17 00:00:00 2001 From: sbackend Date: Wed, 25 Mar 2026 23:51:45 +0100 Subject: [PATCH 7/8] fix: wrapped backend test --- pkg/transaction/wrapped/wrapped_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/transaction/wrapped/wrapped_test.go b/pkg/transaction/wrapped/wrapped_test.go index 65731f77189..632c0742d4b 100644 --- a/pkg/transaction/wrapped/wrapped_test.go +++ b/pkg/transaction/wrapped/wrapped_test.go @@ -70,6 +70,7 @@ func TestBlockNumberNearExpiry(t *testing.T) { backend := NewBackend( backendmock.New( backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + headerCalls.Add(1) if headerCalls.Load() == 1 { return &types.Header{ Number: big.NewInt(int64(staleBlock)), From a9fd8889d7bda1dfce144c8b4de2b01436c6c6f4 Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 30 Mar 2026 11:29:13 +0200 Subject: [PATCH 8/8] fix: make linter happy --- pkg/crypto/crypto_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/crypto/crypto_test.go b/pkg/crypto/crypto_test.go index 8a477730108..e1b5b78bb3a 100644 --- a/pkg/crypto/crypto_test.go +++ b/pkg/crypto/crypto_test.go @@ -22,6 +22,7 @@ func TestGenerateSecp256k1Key(t *testing.T) { if err != nil { t.Fatal(err) } + //nolint:staticcheck // SA5011 false positive: t.Fatal terminates test if k1 == nil { t.Fatal("nil key") } @@ -29,10 +30,12 @@ func TestGenerateSecp256k1Key(t *testing.T) { if err != nil { t.Fatal(err) } + //nolint:staticcheck // SA5011 false positive: t.Fatal terminates test if k2 == nil { t.Fatal("nil key") } + //nolint:staticcheck // SA5011 false positive: t.Fatal terminates test if bytes.Equal(k1.D.Bytes(), k2.D.Bytes()) { t.Fatal("two generated keys are equal") } @@ -45,6 +48,7 @@ func TestGenerateSecp256k1EDG(t *testing.T) { if err != nil { t.Fatal(err) } + //nolint:staticcheck // SA5011 false positive: t.Fatal terminates test if k1 == nil { t.Fatal("nil key") } @@ -52,10 +56,11 @@ func TestGenerateSecp256k1EDG(t *testing.T) { if err != nil { t.Fatal(err) } + //nolint:staticcheck // SA5011 false positive: t.Fatal terminates test if k2 == nil { t.Fatal("nil key") } - + //nolint:staticcheck // SA5011 false positive: t.Fatal terminates test if bytes.Equal(k1.D.Bytes(), k2.D.Bytes()) { t.Fatal("two generated keys are equal") }