diff --git a/cmd/bee/cmd/db_test.go b/cmd/bee/cmd/db_test.go index 7d32b5c3080..85593ab0fbe 100644 --- a/cmd/bee/cmd/db_test.go +++ b/cmd/bee/cmd/db_test.go @@ -214,9 +214,6 @@ func TestDBNuke_FLAKY(t *testing.T) { Logger: log.Noop, ReserveCapacity: storer.DefaultReserveCapacity, }, path.Join(dataDir, "localstore")) - if err != nil { - t.Fatal(err) - } defer db.Close() info, err = db.DebugInfo(ctx) diff --git a/cmd/bee/cmd/deploy.go b/cmd/bee/cmd/deploy.go index c0e20cb63e7..46b0e46e22b 100644 --- a/cmd/bee/cmd/deploy.go +++ b/cmd/bee/cmd/deploy.go @@ -60,6 +60,7 @@ func (c *command) initDeployCmd() error { blocktime, true, c.config.GetUint64(optionNameMinimumGasTipCap), + 0, ) if err != nil { return err diff --git a/go.mod b/go.mod index 0d848b83d8a..74477824a1b 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,8 @@ require ( resenje.org/web v0.4.3 ) +require github.com/kylelemons/godebug v1.1.0 // indirect + require ( github.com/BurntSushi/toml v1.3.2 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect diff --git a/pkg/api/chunk_stream_test.go b/pkg/api/chunk_stream_test.go index 5ba62cdd76c..05a77e79ef3 100644 --- a/pkg/api/chunk_stream_test.go +++ b/pkg/api/chunk_stream_test.go @@ -37,7 +37,7 @@ func TestChunkUploadStream(t *testing.T) { ) t.Run("upload and verify", func(t *testing.T) { - chsToGet := []swarm.Chunk{} + chsToGet := make([]swarm.Chunk, 0, 5) for range 5 { ch := testingc.GenerateTestRandomChunk() diff --git a/pkg/bmt/proof_test.go b/pkg/bmt/proof_test.go index 5b3625acefb..4658cb353c7 100644 --- a/pkg/bmt/proof_test.go +++ b/pkg/bmt/proof_test.go @@ -26,7 +26,7 @@ func TestProofCorrectness(t *testing.T) { verifySegments := func(t *testing.T, exp []string, found [][]byte) { t.Helper() - var expSegments [][]byte + expSegments := make([][]byte, 0, len(exp)) for _, v := range exp { decoded, err := hex.DecodeString(v) if err != nil { @@ -154,7 +154,7 @@ func TestProofCorrectness(t *testing.T) { "745bae095b6ff5416b4a351a167f731db6d6f5924f30cd88d48e74261795d27b", } - var segments [][]byte + segments := make([][]byte, 0, len(segmentStrings)) for _, v := range segmentStrings { decoded, err := hex.DecodeString(v) if err != nil { diff --git a/pkg/crypto/crypto_test.go b/pkg/crypto/crypto_test.go index 8a477730108..e1b5b78bb3a 100644 --- a/pkg/crypto/crypto_test.go +++ b/pkg/crypto/crypto_test.go @@ -22,6 +22,7 @@ func TestGenerateSecp256k1Key(t *testing.T) { if err != nil { t.Fatal(err) } + //nolint:staticcheck // SA5011 false positive: t.Fatal terminates test if k1 == nil { t.Fatal("nil key") } @@ -29,10 +30,12 @@ func TestGenerateSecp256k1Key(t *testing.T) { if err != nil { t.Fatal(err) } + //nolint:staticcheck // SA5011 false positive: t.Fatal terminates test if k2 == nil { t.Fatal("nil key") } + //nolint:staticcheck // SA5011 false positive: t.Fatal terminates test if bytes.Equal(k1.D.Bytes(), k2.D.Bytes()) { t.Fatal("two generated keys are equal") } @@ -45,6 +48,7 @@ func TestGenerateSecp256k1EDG(t *testing.T) { if err != nil { t.Fatal(err) } + //nolint:staticcheck // SA5011 false positive: t.Fatal terminates test if k1 == nil { t.Fatal("nil key") } @@ -52,10 +56,11 @@ func TestGenerateSecp256k1EDG(t *testing.T) { if err != nil { t.Fatal(err) } + //nolint:staticcheck // SA5011 false positive: t.Fatal terminates test if k2 == nil { t.Fatal("nil key") } - + //nolint:staticcheck // SA5011 false positive: t.Fatal terminates test if bytes.Equal(k1.D.Bytes(), k2.D.Bytes()) { t.Fatal("two generated keys are equal") } diff --git a/pkg/file/buffer_test.go b/pkg/file/buffer_test.go index 71b374b7f4b..a41f9d4426b 100644 --- a/pkg/file/buffer_test.go +++ b/pkg/file/buffer_test.go @@ -114,10 +114,10 @@ func TestCopyBuffer(t *testing.T) { swarm.ChunkSize*17 + 3, } - testCases := []struct { + testCases := make([]struct { readBufferSize int dataSize int - }{} + }, 0, len(dataSizes)*len(readBufferSizes)) for i := range readBufferSizes { for j := range dataSizes { diff --git a/pkg/file/pipeline/bmt/bmt_test.go b/pkg/file/pipeline/bmt/bmt_test.go index 7fba1d763d0..09a9c965eee 100644 --- a/pkg/file/pipeline/bmt/bmt_test.go +++ b/pkg/file/pipeline/bmt/bmt_test.go @@ -52,7 +52,7 @@ func TestBmtWriter(t *testing.T) { mockChainWriter := mock.NewChainWriter() writer := bmt.NewBmtWriter(mockChainWriter) - var data []byte + data := make([]byte, 0, len(tc.data)+8) if !tc.noSpan { data = make([]byte, 8) diff --git a/pkg/node/chain.go b/pkg/node/chain.go index 78fc0a5448d..f63e70bbd14 100644 --- a/pkg/node/chain.go +++ b/pkg/node/chain.go @@ -51,6 +51,7 @@ func InitChain( pollingInterval time.Duration, chainEnabled bool, minimumGasTipCap uint64, + blockCacheTTLPercent uint64, ) (transaction.Backend, common.Address, int64, transaction.Monitor, transaction.Service, error) { backend := backendnoop.New(chainID) @@ -72,7 +73,7 @@ func InitChain( logger.Info("connected to blockchain backend", "version", versionString) - backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap) + backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, pollingInterval, blockCacheTTLPercent) } backendChainID, err := backend.ChainID(ctx) diff --git a/pkg/node/node.go b/pkg/node/node.go index 50ed1a44b82..b9b21593bca 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -135,6 +135,7 @@ type Options struct { BlockchainRpcEndpoint string BlockProfile bool BlockTime time.Duration + BlockCacheTTLPercent uint64 BootnodeMode bool Bootnodes []string CacheCapacity uint64 @@ -409,6 +410,7 @@ func NewBee( o.BlockTime, chainEnabled, o.MinimumGasTipCap, + o.BlockCacheTTLPercent, ) if err != nil { return nil, fmt.Errorf("init chain: %w", err) diff --git a/pkg/postage/mock/service.go b/pkg/postage/mock/service.go index b4fffd07761..9584ffeeb1c 100644 --- a/pkg/postage/mock/service.go +++ b/pkg/postage/mock/service.go @@ -70,7 +70,7 @@ func (m *mockPostage) StampIssuers() []*postage.StampIssuer { m.issuerLock.Lock() defer m.issuerLock.Unlock() - issuers := make([]*postage.StampIssuer, 0) + issuers := make([]*postage.StampIssuer, 0, len(m.issuersMap)) for _, v := range m.issuersMap { issuers = append(issuers, v) } diff --git a/pkg/shed/index_test.go b/pkg/shed/index_test.go index 91eb657b649..fff3c14bde7 100644 --- a/pkg/shed/index_test.go +++ b/pkg/shed/index_test.go @@ -345,7 +345,7 @@ func TestIndex(t *testing.T) { } t.Run("not found", func(t *testing.T) { - items := make([]Item, len(want)) + items := make([]Item, len(want), len(want)+1) for i, w := range want { items[i] = Item{ Address: w.Address, @@ -373,28 +373,29 @@ func TestIndex_Iterate(t *testing.T) { t.Fatal(err) } - items := []Item{ - { + items := make([]Item, 0, 6) + items = append(items, + Item{ Address: []byte("iterate-hash-01"), Data: []byte("data80"), }, - { + Item{ Address: []byte("iterate-hash-03"), Data: []byte("data22"), }, - { + Item{ Address: []byte("iterate-hash-05"), Data: []byte("data41"), }, - { + Item{ Address: []byte("iterate-hash-02"), Data: []byte("data84"), }, - { + Item{ Address: []byte("iterate-hash-06"), Data: []byte("data1"), }, - } + ) batch := new(leveldb.Batch) for _, i := range items { err = index.PutInBatch(batch, i) @@ -555,28 +556,29 @@ func TestIndex_IterateReverse(t *testing.T) { t.Fatal(err) } - items := []Item{ - { + items := make([]Item, 0, 6) + items = append(items, + Item{ Address: []byte("iterate-hash-01"), Data: []byte("data80"), }, - { + Item{ Address: []byte("iterate-hash-03"), Data: []byte("data22"), }, - { + Item{ Address: []byte("iterate-hash-05"), Data: []byte("data41"), }, - { + Item{ Address: []byte("iterate-hash-02"), Data: []byte("data84"), }, - { + Item{ Address: []byte("iterate-hash-06"), Data: []byte("data1"), }, - } + ) batch := new(leveldb.Batch) for _, i := range items { err = index.PutInBatch(batch, i) diff --git a/pkg/storage/migration/steps_chain_test.go b/pkg/storage/migration/steps_chain_test.go index 327a41331d7..5bcc1cb6aeb 100644 --- a/pkg/storage/migration/steps_chain_test.go +++ b/pkg/storage/migration/steps_chain_test.go @@ -19,7 +19,7 @@ func TestNewStepsChain(t *testing.T) { store := inmemstore.New() populateStore(t, store, populateItemsCount) - stepsFn := make([]migration.StepFn, 0) + stepsFn := make([]migration.StepFn, 0, 10) // Create 10 step functions where each would remove single element, having value [0-10) for i := range 10 { diff --git a/pkg/storage/storagetest/storage.go b/pkg/storage/storagetest/storage.go index 9ee41971b43..7869227e43c 100644 --- a/pkg/storage/storagetest/storage.go +++ b/pkg/storage/storagetest/storage.go @@ -95,7 +95,7 @@ func (o *obj1) ID() string { return o.Id } func (obj1) Namespace() string { return "obj1" } func (o *obj1) Marshal() ([]byte, error) { - buf := make([]byte, 40) + buf := make([]byte, 40, 40+len(o.Buf)) copy(buf[:32], o.Id) binary.LittleEndian.PutUint64(buf[32:], o.SomeInt) buf = append(buf, o.Buf[:]...) diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 87f700a98c8..e0dbd6a4305 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -537,7 +537,7 @@ func TestEvict(t *testing.T) { ts := internal.NewInmemStorage() chunksPerBatch := 50 - var chunks []swarm.Chunk + chunks := make([]swarm.Chunk, 0, 3*chunksPerBatch) batches := []*postage.Batch{postagetesting.MustNewBatch(), postagetesting.MustNewBatch(), postagetesting.MustNewBatch()} evictBatch := batches[1] @@ -690,7 +690,7 @@ func TestEvictMaxCount(t *testing.T) { t.Fatal(err) } - var chunks []swarm.Chunk + chunks := make([]swarm.Chunk, 0, 20) batch := postagetesting.MustNewBatch() diff --git a/pkg/storer/mock/mockstorer.go b/pkg/storer/mock/mockstorer.go index efdce20bbf4..2d851355714 100644 --- a/pkg/storer/mock/mockstorer.go +++ b/pkg/storer/mock/mockstorer.go @@ -132,7 +132,7 @@ func (m *mockStorer) ListSessions(offset, limit int) ([]storer.SessionInfo, erro m.mu.Lock() defer m.mu.Unlock() - sessions := []storer.SessionInfo{} + sessions := make([]storer.SessionInfo, 0, len(m.activeSessions)) for _, v := range m.activeSessions { sessions = append(sessions, *v) } diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index 0e5ab3b9214..8be3be183c2 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -220,8 +220,8 @@ func TestEvictBatch(t *testing.T) { } ctx := context.Background() - var chunks []swarm.Chunk var chunksPerPO uint64 = 10 + chunks := make([]swarm.Chunk, 0, chunksPerPO*3) batches := []*postage.Batch{postagetesting.MustNewBatch(), postagetesting.MustNewBatch(), postagetesting.MustNewBatch()} evictBatch := batches[1] @@ -543,6 +543,7 @@ func TestSubscribeBin(t *testing.T) { chunksPerPO uint64 = 50 putter = storer.ReservePutter() ) + chunks = make([]swarm.Chunk, 0, chunksPerPO*2) for j := range 2 { for range chunksPerPO { diff --git a/pkg/storer/sample_test.go b/pkg/storer/sample_test.go index a73ebbcf96e..c1a34ddec73 100644 --- a/pkg/storer/sample_test.go +++ b/pkg/storer/sample_test.go @@ -26,7 +26,7 @@ func TestReserveSampler(t *testing.T) { const maxPO = 10 randChunks := func(baseAddr swarm.Address, timeVar uint64) []swarm.Chunk { - var chs []swarm.Chunk + chs := make([]swarm.Chunk, 0, chunkCountPerPO*maxPO) for po := range maxPO { for range chunkCountPerPO { ch := chunk.GenerateValidRandomChunkAt(t, baseAddr, po).WithBatch(3, 2, false) diff --git a/pkg/transaction/transaction.go b/pkg/transaction/transaction.go index cc88e7a3ab4..da2b2746fd5 100644 --- a/pkg/transaction/transaction.go +++ b/pkg/transaction/transaction.go @@ -154,9 +154,9 @@ func (t *transactionService) waitForAllPendingTx() error { return err } - pendingTxs = t.filterPendingTransactions(t.ctx, pendingTxs) + pending := t.filterPendingTransactions(t.ctx, pendingTxs) - for _, txHash := range pendingTxs { + for txHash := range pending { t.waitForPendingTx(txHash) } @@ -339,19 +339,15 @@ func (t *transactionService) nextNonce(ctx context.Context) (uint64, error) { return 0, err } - pendingTxs = t.filterPendingTransactions(t.ctx, pendingTxs) + pending := t.filterPendingTransactions(t.ctx, pendingTxs) // PendingNonceAt returns the nonce we should use, but we will // compare this to our pending tx list, therefore the -1. maxNonce := onchainNonce - 1 - for _, txHash := range pendingTxs { - trx, _, err := t.backend.TransactionByHash(ctx, txHash) - if err != nil { - t.logger.Error(err, "pending transaction not found", "tx", txHash) - return 0, err + for _, trx := range pending { + if trx != nil { + maxNonce = max(maxNonce, trx.Nonce()) } - - maxNonce = max(maxNonce, trx.Nonce()) } return maxNonce + 1, nil @@ -404,11 +400,12 @@ func (t *transactionService) PendingTransactions() ([]common.Hash, error) { // filterPendingTransactions will filter supplied transaction hashes removing those that are not pending anymore. // Removed transactions will be also removed from store. -func (t *transactionService) filterPendingTransactions(ctx context.Context, txHashes []common.Hash) []common.Hash { - result := make([]common.Hash, 0, len(txHashes)) +// Returns the pending transactions keyed by hash. +func (t *transactionService) filterPendingTransactions(ctx context.Context, txHashes []common.Hash) map[common.Hash]*types.Transaction { + result := make(map[common.Hash]*types.Transaction, len(txHashes)) for _, txHash := range txHashes { - _, isPending, err := t.backend.TransactionByHash(ctx, txHash) + trx, isPending, err := t.backend.TransactionByHash(ctx, txHash) // When error occurres consider transaction as pending (so this transaction won't be filtered out), // unless it was not found if err != nil { @@ -422,7 +419,7 @@ func (t *transactionService) filterPendingTransactions(ctx context.Context, txHa } if isPending { - result = append(result, txHash) + result[txHash] = trx } else { err := t.store.Delete(pendingTransactionKey(txHash)) if err != nil { diff --git a/pkg/transaction/wrapped/cache/cache.go b/pkg/transaction/wrapped/cache/cache.go new file mode 100644 index 00000000000..c553f33182a --- /dev/null +++ b/pkg/transaction/wrapped/cache/cache.go @@ -0,0 +1,93 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package cache + +import ( + "context" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "resenje.org/singleflight" +) + +type Loader[T any] func() (T, time.Time, error) +type ExpiringSingleFlightCache[T any] struct { + mu sync.RWMutex + value T + expiresAt time.Time + + group singleflight.Group[string, any] + key string + metrics metricSet +} + +func NewExpiringSingleFlightCache[T any](metricsPrefix string) *ExpiringSingleFlightCache[T] { + return &ExpiringSingleFlightCache[T]{ + key: metricsPrefix, + metrics: newMetricSet(metricsPrefix), + } +} + +func (c *ExpiringSingleFlightCache[T]) Collectors() []prometheus.Collector { + return []prometheus.Collector{ + c.metrics.Hits, + c.metrics.Misses, + c.metrics.Loads, + c.metrics.SharedLoads, + c.metrics.LoadErrors, + } +} + +func (c *ExpiringSingleFlightCache[T]) Get(now time.Time) (T, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + if now.Before(c.expiresAt) { + return c.value, true + } + + var zero T + return zero, false +} + +func (c *ExpiringSingleFlightCache[T]) Set(value T, expiresAt time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + + c.value = value + c.expiresAt = expiresAt +} + +func (c *ExpiringSingleFlightCache[T]) GetOrLoad(ctx context.Context, now time.Time, loader Loader[T]) (T, error) { + if v, ok := c.Get(now); ok { + c.metrics.Hits.Inc() + return v, nil + } + + c.metrics.Misses.Inc() + + result, shared, err := c.group.Do(ctx, c.key, func(ctx context.Context) (any, error) { + c.metrics.Loads.Inc() + val, expiresAt, err := loader() + if err != nil { + c.metrics.LoadErrors.Inc() + return val, err + } + c.Set(val, expiresAt) + return val, nil + }) + + if shared { + c.metrics.SharedLoads.Inc() + } + + if err != nil { + var zero T + return zero, err + } + + return result.(T), nil +} diff --git a/pkg/transaction/wrapped/cache/cache_test.go b/pkg/transaction/wrapped/cache/cache_test.go new file mode 100644 index 00000000000..684cd4729fc --- /dev/null +++ b/pkg/transaction/wrapped/cache/cache_test.go @@ -0,0 +1,228 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package cache + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "testing/synctest" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const testMetricsPrefix = "test" + +func newTestCache() *ExpiringSingleFlightCache[uint64] { + return &ExpiringSingleFlightCache[uint64]{ + key: testMetricsPrefix, + metrics: newMetricSet(testMetricsPrefix), + } +} + +func TestSetGet(t *testing.T) { + t.Parallel() + + now := time.Now() + expiresAt := now.Add(time.Second) + c := newTestCache() + c.Set(42, expiresAt) + + val, ok := c.Get(now) + assert.True(t, ok) + assert.Equal(t, uint64(42), val) +} + +func TestTTLExpiry(t *testing.T) { + t.Parallel() + + now := time.Now() + c := newTestCache() + c.Set(42, now) + + _, ok := c.Get(now.Add(time.Second + time.Millisecond)) + assert.False(t, ok) +} + +func TestGetOrLoadMiss(t *testing.T) { + t.Parallel() + + c := newTestCache() + var loadCount atomic.Int32 + + val, err := c.GetOrLoad(context.Background(), time.Now(), func() (uint64, time.Time, error) { + loadCount.Add(1) + return 99, time.Now().Add(time.Second), nil + }) + + require.NoError(t, err) + assert.Equal(t, uint64(99), val) + assert.Equal(t, int32(1), loadCount.Load()) + + cached, ok := c.Get(time.Now()) + assert.True(t, ok) + assert.Equal(t, uint64(99), cached) +} + +func TestGetOrLoadHit(t *testing.T) { + t.Parallel() + const expectedVal = uint64(42) + + now := time.Now() + expiresAt := now.Add(time.Second) + c := newTestCache() + c.Set(expectedVal, expiresAt) + + var loadCount atomic.Int32 + val, err := c.GetOrLoad(context.Background(), now, func() (uint64, time.Time, error) { + loadCount.Add(1) + return expectedVal, now.Add(time.Second), nil + }) + + require.NoError(t, err) + assert.Equal(t, expectedVal, val) + assert.Equal(t, int32(0), loadCount.Load()) +} + +func TestGetOrLoadError(t *testing.T) { + t.Parallel() + + c := newTestCache() + errLoad := errors.New("load failed") + + val, err := c.GetOrLoad(context.Background(), time.Now(), func() (uint64, time.Time, error) { + return 0, time.Time{}, errLoad + }) + + assert.ErrorIs(t, err, errLoad) + assert.Equal(t, uint64(0), val) + + _, ok := c.Get(time.Now()) + assert.False(t, ok) +} + +func TestGetOrLoadSingleflight(t *testing.T) { + const value = uint64(77) + synctest.Test(t, func(t *testing.T) { + c := newTestCache() + var loadCount atomic.Int32 + gate := make(chan struct{}) + + const n = 10 + var wg sync.WaitGroup + results := make([]uint64, n) + errs := make([]error, n) + + wg.Add(n) + for i := range n { + go func(idx int) { + defer wg.Done() + now := time.Now() + results[idx], errs[idx] = c.GetOrLoad(context.Background(), now, func() (uint64, time.Time, error) { + loadCount.Add(1) + <-gate + return value, now.Add(time.Second), nil + }) + }(i) + } + + time.Sleep(50 * time.Millisecond) + close(gate) + wg.Wait() + + assert.Equal(t, int32(1), loadCount.Load()) + for i := range n { + assert.NoError(t, errs[i]) + assert.Equal(t, value, results[i]) + } + }) +} + +func TestGetOrLoadReloadAfterExpiry(t *testing.T) { + t.Parallel() + + now := time.Now() + c := newTestCache() + c.Set(42, now) + + val, err := c.GetOrLoad(context.Background(), now.Add(time.Second+time.Millisecond), func() (uint64, time.Time, error) { + return 100, now.Add(time.Second + time.Millisecond), nil + }) + + require.NoError(t, err) + assert.Equal(t, uint64(100), val) +} + +func TestGetOrLoadContextCancellation(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + const expectedVal = 55 + c := newTestCache() + gate := make(chan struct{}) + + ctx1, cancel1 := context.WithCancel(context.Background()) + ctx2 := context.Background() + + var wg sync.WaitGroup + var result1, result2 uint64 + var err1, err2 error + + wg.Add(2) + go func() { + defer wg.Done() + result1, err1 = c.GetOrLoad(ctx1, time.Now(), func() (uint64, time.Time, error) { + <-gate + return expectedVal, time.Now().Add(time.Second), nil + }) + }() + + go func() { + defer wg.Done() + result2, err2 = c.GetOrLoad(ctx2, time.Now(), func() (uint64, time.Time, error) { + <-gate + return expectedVal, time.Now().Add(time.Second), nil + }) + }() + + time.Sleep(50 * time.Millisecond) + cancel1() + time.Sleep(50 * time.Millisecond) + close(gate) + wg.Wait() + + assert.ErrorIs(t, err1, context.Canceled) + assert.Equal(t, uint64(0), result1) + + require.NoError(t, err2) + assert.Equal(t, uint64(expectedVal), result2) + }) +} + +func TestMetrics(t *testing.T) { + t.Parallel() + + now := time.Now() + expiresAt := now.Add(time.Second) + c := newTestCache() + + ctx := context.Background() + + // miss + load error + _, _ = c.GetOrLoad(ctx, now, func() (uint64, time.Time, error) { return 0, time.Time{}, errors.New("fail") }) + + // miss + load + _, _ = c.GetOrLoad(ctx, now, func() (uint64, time.Time, error) { return 42, expiresAt, nil }) + // hit + _, _ = c.GetOrLoad(ctx, now, func() (uint64, time.Time, error) { return 0, expiresAt, nil }) + + assert.Equal(t, float64(1), testutil.ToFloat64(c.metrics.Hits)) + assert.Equal(t, float64(2), testutil.ToFloat64(c.metrics.Misses)) + assert.Equal(t, float64(2), testutil.ToFloat64(c.metrics.Loads)) + assert.Equal(t, float64(1), testutil.ToFloat64(c.metrics.LoadErrors)) +} diff --git a/pkg/transaction/wrapped/cache/metrics.go b/pkg/transaction/wrapped/cache/metrics.go new file mode 100644 index 00000000000..039a3ed7824 --- /dev/null +++ b/pkg/transaction/wrapped/cache/metrics.go @@ -0,0 +1,75 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package cache + +import ( + m "github.com/ethersphere/bee/v2/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +type metricSet struct { + Hits prometheus.Counter + Misses prometheus.Counter + Loads prometheus.Counter + SharedLoads prometheus.Counter + LoadErrors prometheus.Counter +} + +type Metrics struct { + BlockNumber metricSet + Unknown metricSet +} + +func newMetricSet(prefix string) metricSet { + subsystem := "eth_backend_cache" + return metricSet{ + Hits: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: prefix + "_hits", + Help: prefix + " cache hits", + }), + Misses: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: prefix + "_misses", + Help: prefix + " cache misses", + }), + Loads: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: prefix + "_loads", + Help: prefix + " cache backend loads", + }), + SharedLoads: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: prefix + "_shared_loads", + Help: prefix + " cache shared loads", + }), + LoadErrors: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: prefix + "_load_errors", + Help: prefix + " cache load errors", + }), + } +} + +func (mtr *Metrics) Collectors() []prometheus.Collector { + return []prometheus.Collector{ + mtr.BlockNumber.Hits, + mtr.BlockNumber.Misses, + mtr.BlockNumber.Loads, + mtr.BlockNumber.SharedLoads, + mtr.BlockNumber.LoadErrors, + + mtr.Unknown.Hits, + mtr.Unknown.Misses, + mtr.Unknown.Loads, + mtr.Unknown.SharedLoads, + mtr.Unknown.LoadErrors, + } +} diff --git a/pkg/transaction/wrapped/fee_test.go b/pkg/transaction/wrapped/fee_test.go index 41afb69f28f..33ddc31da88 100644 --- a/pkg/transaction/wrapped/fee_test.go +++ b/pkg/transaction/wrapped/fee_test.go @@ -9,6 +9,7 @@ import ( "errors" "math/big" "testing" + "time" "github.com/ethereum/go-ethereum/core/types" "github.com/ethersphere/bee/v2/pkg/transaction/backendmock" @@ -105,6 +106,8 @@ func TestSuggestedFeeAndTip(t *testing.T) { }), ), minimumGasTipCap, + 5*time.Second, + 90, ) gasFeeCap, gasTipCap, err := backend.SuggestedFeeAndTip(ctx, tc.gasPrice, tc.boostPercent) diff --git a/pkg/transaction/wrapped/metrics.go b/pkg/transaction/wrapped/metrics.go index a5cfaa554a0..cd9364696a8 100644 --- a/pkg/transaction/wrapped/metrics.go +++ b/pkg/transaction/wrapped/metrics.go @@ -126,5 +126,7 @@ func newMetrics() metrics { } func (b *wrappedBackend) Metrics() []prometheus.Collector { - return m.PrometheusCollectorsFromFields(b.metrics) + collectors := m.PrometheusCollectorsFromFields(b.metrics) + collectors = append(collectors, b.blockNumberCache.Collectors()...) + return collectors } diff --git a/pkg/transaction/wrapped/wrapped.go b/pkg/transaction/wrapped/wrapped.go index 1d4e452aef3..fe640020bff 100644 --- a/pkg/transaction/wrapped/wrapped.go +++ b/pkg/transaction/wrapped/wrapped.go @@ -8,29 +8,47 @@ import ( "context" "errors" "math/big" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethersphere/bee/v2/pkg/transaction" "github.com/ethersphere/bee/v2/pkg/transaction/backend" + "github.com/ethersphere/bee/v2/pkg/transaction/wrapped/cache" ) +const DefaultBlockNumberTTLPercent = 85 + var ( _ transaction.Backend = (*wrappedBackend)(nil) ) type wrappedBackend struct { - backend backend.Geth - metrics metrics - minimumGasTipCap int64 -} - -func NewBackend(backend backend.Geth, minimumGasTipCap uint64) transaction.Backend { + backend backend.Geth + metrics metrics + minimumGasTipCap int64 + blockTime time.Duration + blockNumberTTLPercent uint64 + blockNumberCache *cache.ExpiringSingleFlightCache[uint64] +} + +func NewBackend( + backend backend.Geth, + minimumGasTipCap uint64, + blockTime time.Duration, + blockNumberTTLPercent uint64, +) transaction.Backend { + if blockNumberTTLPercent == 0 || blockNumberTTLPercent >= 100 { + blockNumberTTLPercent = DefaultBlockNumberTTLPercent + } return &wrappedBackend{ - backend: backend, - minimumGasTipCap: int64(minimumGasTipCap), - metrics: newMetrics(), + backend: backend, + minimumGasTipCap: int64(minimumGasTipCap), + blockTime: blockTime, + blockNumberTTLPercent: blockNumberTTLPercent, + metrics: newMetrics(), + blockNumberCache: cache.NewExpiringSingleFlightCache[uint64]("block_number"), } } @@ -61,14 +79,45 @@ func (b *wrappedBackend) TransactionByHash(ctx context.Context, hash common.Hash } func (b *wrappedBackend) BlockNumber(ctx context.Context) (uint64, error) { - b.metrics.TotalRPCCalls.Inc() - b.metrics.BlockNumberCalls.Inc() - blockNumber, err := b.backend.BlockNumber(ctx) - if err != nil { - b.metrics.TotalRPCErrors.Inc() - return 0, err - } - return blockNumber, nil + now := time.Now().UTC() + + return b.blockNumberCache.GetOrLoad(ctx, now, func() (uint64, time.Time, error) { + b.metrics.TotalRPCCalls.Inc() + b.metrics.BlockNumberCalls.Inc() + + header, err := b.backend.HeaderByNumber(ctx, nil) + if err != nil { + b.metrics.TotalRPCErrors.Inc() + return 0, time.Time{}, err + } + if header == nil || header.Number == nil { + b.metrics.TotalRPCErrors.Inc() + return 0, time.Time{}, errors.New("latest block header unavailable") + } + + now = time.Now().UTC() + ttl := b.blockTime * time.Duration(b.blockNumberTTLPercent) / 100 + if ttl < 500*time.Millisecond { + ttl = time.Second + } + + expiresAt := time.Unix(int64(header.Time), 0).Add(ttl) + maxExpiresAt := now.Add(ttl) + if expiresAt.After(maxExpiresAt) { + // in case if local clocks are behind blockchain clocks + expiresAt = maxExpiresAt + } + + if !expiresAt.After(now) { + // in case if local clocks are ahead blockchain clocks, or we got block header right before new block + retryTTL := b.blockTime / 10 + if retryTTL < 100*time.Millisecond { + retryTTL = time.Second + } + expiresAt = now.Add(retryTTL) + } + return header.Number.Uint64(), expiresAt, nil + }) } func (b *wrappedBackend) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { diff --git a/pkg/transaction/wrapped/wrapped_test.go b/pkg/transaction/wrapped/wrapped_test.go new file mode 100644 index 00000000000..632c0742d4b --- /dev/null +++ b/pkg/transaction/wrapped/wrapped_test.go @@ -0,0 +1,175 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package wrapped + +import ( + "context" + "math/big" + "sync/atomic" + "testing" + "testing/synctest" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethersphere/bee/v2/pkg/transaction/backendmock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testBlockTime = 5 * time.Second + testTTLPercent = 90 + testMinimumGasTipCap = 0 +) + +func TestBlockNumberUsesLatestHeaderCache(t *testing.T) { + t.Parallel() + + const expectedBlock = uint64(10) + var headerCalls atomic.Int32 + + backend := NewBackend( + backendmock.New( + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + headerCalls.Add(1) + assert.Nil(t, number) + return &types.Header{ + Number: big.NewInt(int64(expectedBlock)), + Time: uint64(time.Now().Add(time.Second).Unix()), + BaseFee: big.NewInt(1), + }, nil + }), + ), + testMinimumGasTipCap, + testBlockTime, + testTTLPercent, + ) + + first, err := backend.BlockNumber(context.Background()) + require.NoError(t, err) + assert.Equal(t, expectedBlock, first) + + second, err := backend.BlockNumber(context.Background()) + require.NoError(t, err) + assert.Equal(t, expectedBlock, second) + assert.Equal(t, int32(1), headerCalls.Load()) +} + +func TestBlockNumberNearExpiry(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + const ( + staleBlock = uint64(100) + freshBlock = uint64(101) + ) + var headerCalls atomic.Int32 + + now := time.Now() + backend := NewBackend( + backendmock.New( + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + headerCalls.Add(1) + if headerCalls.Load() == 1 { + return &types.Header{ + Number: big.NewInt(int64(staleBlock)), + Time: uint64(now.Add(-testBlockTime).Unix()), + }, nil + } + return &types.Header{ + Number: big.NewInt(int64(freshBlock)), + Time: uint64(now.Unix()), + }, nil + }), + ), + testMinimumGasTipCap, + testBlockTime, + testTTLPercent, + ) + + ctx := context.Background() + first, err := backend.BlockNumber(ctx) + require.NoError(t, err) + assert.Equal(t, staleBlock, first) + + time.Sleep(testBlockTime/10 + 100*time.Millisecond) + + second, err := backend.BlockNumber(ctx) + require.NoError(t, err) + assert.Equal(t, freshBlock, second) + assert.Equal(t, int32(2), headerCalls.Load()) + }) +} + +func TestBlockNumberLocalClockBehind(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + const expectedBlock = uint64(200) + var headerCalls atomic.Int32 + + backend := NewBackend( + backendmock.New( + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + headerCalls.Add(1) + return &types.Header{ + Number: big.NewInt(int64(expectedBlock)), + Time: uint64(time.Now().Add(30 * time.Second).Unix()), + }, nil + }), + ), + testMinimumGasTipCap, + testBlockTime, + testTTLPercent, + ) + + ttl := testBlockTime * testTTLPercent / 100 + + val, err := backend.BlockNumber(context.Background()) + require.NoError(t, err) + assert.Equal(t, expectedBlock, val) + + time.Sleep(ttl + 100*time.Millisecond) + + _, err = backend.BlockNumber(context.Background()) + require.NoError(t, err) + assert.Equal(t, int32(2), headerCalls.Load()) + }) +} + +func TestBlockNumberLocalClockAhead(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + + const expectedBlock = uint64(300) + var headerCalls atomic.Int32 + + backend := NewBackend( + backendmock.New( + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + headerCalls.Add(1) + return &types.Header{ + Number: big.NewInt(int64(expectedBlock)), + Time: uint64(time.Now().Add(-2 * testBlockTime).Unix()), + }, nil + }), + ), + testMinimumGasTipCap, + testBlockTime, + testTTLPercent, + ) + + val, err := backend.BlockNumber(context.Background()) + require.NoError(t, err) + assert.Equal(t, expectedBlock, val) + + retryTTL := testBlockTime / 10 + time.Sleep(retryTTL + 100*time.Millisecond) + + _, err = backend.BlockNumber(context.Background()) + require.NoError(t, err) + assert.Equal(t, int32(2), headerCalls.Load()) + }) +}