diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index 5e0677065fe..4c7d4fd712e 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "math/big" "os" "path/filepath" "strings" @@ -17,6 +18,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/node" "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/bee/v2/pkg/transaction" p2pforge "github.com/ipshipyard/p2p-forge/client" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -101,6 +103,14 @@ const ( configKeyBlockchainRpcTLSTimeout = "blockchain-rpc.tls-timeout" configKeyBlockchainRpcIdleTimeout = "blockchain-rpc.idle-timeout" configKeyBlockchainRpcKeepalive = "blockchain-rpc.keepalive" + + // transaction retry + optionNameTransactionRetryMaxRetries = "transaction-retry-max-retries" + optionNameTransactionRetryDelay = "transaction-retry-delay" + optionNameTransactionRetryGasIncreasePercent = "transaction-retry-gas-increase-percent" + optionNameTransactionRetryMaxTxPriceWei = "transaction-retry-max-tx-price-wei" + optionNameFeeHistoryBlockCount = "fee-history-block-count" + optionNameFeeHistoryRewardPercentiles = "fee-history-reward-percentiles" ) var blockchainRpcConfigPairs = []struct{ flat, dotted string }{ @@ -310,6 +320,8 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().String(optionNameStakingAddress, "", "staking contract address") cmd.Flags().Uint64(optionNameBlockTime, 5, "chain block time") cmd.Flags().Uint64(optionNameBlockSyncInterval, 10, "block number cache sync interval in blocks") + cmd.Flags().Uint64(optionNameFeeHistoryBlockCount, 100, "eth_feeHistory block count for fee hints") + cmd.Flags().String(optionNameFeeHistoryRewardPercentiles, "10,50,90", "comma-separated reward percentiles for eth_feeHistory") cmd.Flags().Duration(optionWarmUpTime, time.Minute*5, "maximum node warmup duration; proceeds when stable or after this time") cmd.Flags().Bool(optionNameMainNet, true, "triggers connect to main net bootnodes.") cmd.Flags().Bool(optionNameRetrievalCaching, true, "enable forwarded content caching") @@ -329,6 +341,10 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().Bool(optionSkipPostageSnapshot, false, "skip postage snapshot") cmd.Flags().Uint64(optionNameMinimumGasTipCap, 0, "minimum gas tip cap in wei for transactions, 0 means use suggested gas tip cap") cmd.Flags().Uint64(optionNameGasLimitFallback, 500_000, "gas limit fallback when estimation fails for contract transactions") + cmd.Flags().Int(optionNameTransactionRetryMaxRetries, 5, "maximum broadcast attempts for SendWithRetry (e.g. redistribution txs)") + cmd.Flags().Duration(optionNameTransactionRetryDelay, time.Minute, "how long to wait for a receipt before escalating fees in transactions with retry") + cmd.Flags().Int(optionNameTransactionRetryGasIncreasePercent, 20, "percent increase applied to priority fee after each transactions with retry escalation step") + cmd.Flags().Uint64(optionNameTransactionRetryMaxTxPriceWei, 0, "maximum maxFeePerGas in wei per gas for transactions with retry") cmd.Flags().Bool(optionNameP2PWSSEnable, false, "Enable Secure WebSocket P2P connections") cmd.Flags().String(optionP2PWSSAddr, ":1635", "p2p wss address") cmd.Flags().String(optionNATWSSAddr, "", "WSS NAT exposed address") @@ -376,6 +392,18 @@ func (c *command) bindBlockchainRpcConfig(cmd *cobra.Command) { } } +func txRetryConfigFromCommand(c *command) transaction.TransactionsRetryConfig { + cfg := transaction.TransactionsRetryConfig{ + MaxRetries: c.config.GetInt(optionNameTransactionRetryMaxRetries), + RetryDelay: c.config.GetDuration(optionNameTransactionRetryDelay), + GasIncreasePercent: c.config.GetInt(optionNameTransactionRetryGasIncreasePercent), + } + if v := c.config.GetUint64(optionNameTransactionRetryMaxTxPriceWei); v != 0 { + cfg.MaxTxPrice = new(big.Int).SetUint64(v) + } + return cfg +} + func newLogger(cmd *cobra.Command, verbosity string) (log.Logger, error) { var ( sink = cmd.OutOrStdout() diff --git a/cmd/bee/cmd/deploy.go b/cmd/bee/cmd/deploy.go index 874dcc38a8b..f9fb2fb82e8 100644 --- a/cmd/bee/cmd/deploy.go +++ b/cmd/bee/cmd/deploy.go @@ -42,6 +42,11 @@ func (c *command) initDeployCmd() error { ctx := cmd.Context() + feeHistoryRewardPerc, err := node.ParseFeeHistoryRewardPercentiles(c.config.GetString(optionNameFeeHistoryRewardPercentiles)) + if err != nil { + return err + } + swapBackend, overlayEthAddress, chainID, transactionMonitor, transactionService, err := node.InitChain( ctx, logger, @@ -60,6 +65,9 @@ func (c *command) initDeployCmd() error { Keepalive: c.config.GetDuration(configKeyBlockchainRpcKeepalive), }, c.config.GetUint64(optionNameBlockSyncInterval), + c.config.GetUint64(optionNameFeeHistoryBlockCount), + feeHistoryRewardPerc, + txRetryConfigFromCommand(c), ) if err != nil { return err diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index bf6fe5404c7..e4b1e289065 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -202,6 +202,11 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo } } + feeHistoryRewardPerc, err := node.ParseFeeHistoryRewardPercentiles(c.config.GetString(optionNameFeeHistoryRewardPercentiles)) + if err != nil { + return nil, err + } + signerConfig, err := c.configureSigner(cmd, logger) if err != nil { return nil, fmt.Errorf("configure signer: %w", err) @@ -286,71 +291,77 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo } b, err := node.NewBee(ctx, c.config.GetString(optionNameP2PAddr), signerConfig.publicKey, signerConfig.signer, networkID, logger, signerConfig.libp2pPrivateKey, signerConfig.pssPrivateKey, signerConfig.session, &node.Options{ - Addr: c.config.GetString(optionNameP2PAddr), - AllowPrivateCIDRs: c.config.GetBool(optionNameAllowPrivateCIDRs), - APIAddr: c.config.GetString(optionNameAPIAddr), - EnableWSS: c.config.GetBool(optionNameP2PWSSEnable), - WSSAddr: c.config.GetString(optionP2PWSSAddr), - AutoTLSStorageDir: filepath.Join(c.config.GetString(optionNameDataDir), "autotls"), - BlockchainRpcEndpoint: c.config.GetString(configKeyBlockchainRpcEndpoint), - BlockchainRpcDialTimeout: c.config.GetDuration(configKeyBlockchainRpcDialTimeout), - BlockchainRpcTLSTimeout: c.config.GetDuration(configKeyBlockchainRpcTLSTimeout), - BlockchainRpcIdleTimeout: c.config.GetDuration(configKeyBlockchainRpcIdleTimeout), - BlockchainRpcKeepalive: c.config.GetDuration(configKeyBlockchainRpcKeepalive), - BlockProfile: c.config.GetBool(optionNamePProfBlock), - BlockTime: networkConfig.blockTime, - BlockSyncInterval: c.config.GetUint64(optionNameBlockSyncInterval), - BootnodeMode: bootNode, - Bootnodes: networkConfig.bootNodes, - CacheCapacity: c.config.GetUint64(optionNameCacheCapacity), - AutoTLSCAEndpoint: c.config.GetString(optionAutoTLSCAEndpoint), - ChainID: networkConfig.chainID, - ChequebookEnable: c.config.GetBool(optionNameChequebookEnable), - CORSAllowedOrigins: c.config.GetStringSlice(optionCORSAllowedOrigins), - DataDir: c.config.GetString(optionNameDataDir), - DBBlockCacheCapacity: c.config.GetUint64(optionNameDBBlockCacheCapacity), - DBDisableSeeksCompaction: c.config.GetBool(optionNameDBDisableSeeksCompaction), - DBOpenFilesLimit: c.config.GetUint64(optionNameDBOpenFilesLimit), - DBWriteBufferSize: c.config.GetUint64(optionNameDBWriteBufferSize), - EnableStorageIncentives: c.config.GetBool(optionNameStorageIncentivesEnable), - EnableWS: c.config.GetBool(optionNameP2PWSEnable), - AutoTLSDomain: c.config.GetString(optionAutoTLSDomain), - AutoTLSRegistrationEndpoint: c.config.GetString(optionAutoTLSRegistrationEndpoint), - FullNodeMode: fullNode, - Logger: logger, - MinimumGasTipCap: c.config.GetUint64(optionNameMinimumGasTipCap), - GasLimitFallback: c.config.GetUint64(optionNameGasLimitFallback), - MinimumStorageRadius: c.config.GetUint(optionMinimumStorageRadius), - MutexProfile: c.config.GetBool(optionNamePProfMutex), - NATAddr: c.config.GetString(optionNameNATAddr), - NATWSSAddr: c.config.GetString(optionNATWSSAddr), - NeighborhoodSuggester: neighborhoodSuggester, - PaymentEarly: c.config.GetInt64(optionNamePaymentEarly), - PaymentThreshold: c.config.GetString(optionNamePaymentThreshold), - PaymentTolerance: c.config.GetInt64(optionNamePaymentTolerance), - PostageContractAddress: c.config.GetString(optionNamePostageContractAddress), - PostageContractStartBlock: c.config.GetUint64(optionNamePostageContractStartBlock), - PriceOracleAddress: c.config.GetString(optionNamePriceOracleAddress), - RedistributionContractAddress: c.config.GetString(optionNameRedistributionAddress), - ReserveCapacityDoubling: c.config.GetInt(optionReserveCapacityDoubling), - ResolverConnectionCfgs: resolverCfgs, - Resync: c.config.GetBool(optionNameResync), - RetrievalCaching: c.config.GetBool(optionNameRetrievalCaching), - SkipPostageSnapshot: c.config.GetBool(optionSkipPostageSnapshot), - StakingContractAddress: c.config.GetString(optionNameStakingAddress), - StatestoreCacheCapacity: c.config.GetUint64(optionNameStateStoreCacheCapacity), - StaticNodes: staticNodes, - SwapEnable: c.config.GetBool(optionNameSwapEnable), - SwapFactoryAddress: c.config.GetString(optionNameSwapFactoryAddress), - SwapInitialDeposit: c.config.GetString(optionNameSwapInitialDeposit), - TargetNeighborhood: c.config.GetString(optionNameTargetNeighborhood), - TracingEnabled: c.config.GetBool(optionNameTracingEnabled), - TracingEndpoint: tracingEndpoint, - TracingServiceName: c.config.GetString(optionNameTracingServiceName), - TrxDebugMode: c.config.GetBool(optionNameTransactionDebugMode), - WarmupTime: c.config.GetDuration(optionWarmUpTime), - WelcomeMessage: c.config.GetString(optionWelcomeMessage), - WhitelistedWithdrawalAddress: c.config.GetStringSlice(optionNameWhitelistedWithdrawalAddress), + Addr: c.config.GetString(optionNameP2PAddr), + AllowPrivateCIDRs: c.config.GetBool(optionNameAllowPrivateCIDRs), + APIAddr: c.config.GetString(optionNameAPIAddr), + EnableWSS: c.config.GetBool(optionNameP2PWSSEnable), + WSSAddr: c.config.GetString(optionP2PWSSAddr), + AutoTLSStorageDir: filepath.Join(c.config.GetString(optionNameDataDir), "autotls"), + BlockchainRpcEndpoint: c.config.GetString(configKeyBlockchainRpcEndpoint), + BlockchainRpcDialTimeout: c.config.GetDuration(configKeyBlockchainRpcDialTimeout), + BlockchainRpcTLSTimeout: c.config.GetDuration(configKeyBlockchainRpcTLSTimeout), + BlockchainRpcIdleTimeout: c.config.GetDuration(configKeyBlockchainRpcIdleTimeout), + BlockchainRpcKeepalive: c.config.GetDuration(configKeyBlockchainRpcKeepalive), + BlockProfile: c.config.GetBool(optionNamePProfBlock), + BlockTime: networkConfig.blockTime, + BlockSyncInterval: c.config.GetUint64(optionNameBlockSyncInterval), + FeeHistoryBlockCount: c.config.GetUint64(optionNameFeeHistoryBlockCount), + FeeHistoryRewardPercentiles: feeHistoryRewardPerc, + TransactionRetryMaxRetries: c.config.GetInt(optionNameTransactionRetryMaxRetries), + TransactionRetryDelay: c.config.GetDuration(optionNameTransactionRetryDelay), + TransactionRetryGasIncreasePercent: c.config.GetInt(optionNameTransactionRetryGasIncreasePercent), + TransactionRetryMaxTxPriceWei: c.config.GetUint64(optionNameTransactionRetryMaxTxPriceWei), + BootnodeMode: bootNode, + Bootnodes: networkConfig.bootNodes, + CacheCapacity: c.config.GetUint64(optionNameCacheCapacity), + AutoTLSCAEndpoint: c.config.GetString(optionAutoTLSCAEndpoint), + ChainID: networkConfig.chainID, + ChequebookEnable: c.config.GetBool(optionNameChequebookEnable), + CORSAllowedOrigins: c.config.GetStringSlice(optionCORSAllowedOrigins), + DataDir: c.config.GetString(optionNameDataDir), + DBBlockCacheCapacity: c.config.GetUint64(optionNameDBBlockCacheCapacity), + DBDisableSeeksCompaction: c.config.GetBool(optionNameDBDisableSeeksCompaction), + DBOpenFilesLimit: c.config.GetUint64(optionNameDBOpenFilesLimit), + DBWriteBufferSize: c.config.GetUint64(optionNameDBWriteBufferSize), + EnableStorageIncentives: c.config.GetBool(optionNameStorageIncentivesEnable), + EnableWS: c.config.GetBool(optionNameP2PWSEnable), + AutoTLSDomain: c.config.GetString(optionAutoTLSDomain), + AutoTLSRegistrationEndpoint: c.config.GetString(optionAutoTLSRegistrationEndpoint), + FullNodeMode: fullNode, + Logger: logger, + MinimumGasTipCap: c.config.GetUint64(optionNameMinimumGasTipCap), + GasLimitFallback: c.config.GetUint64(optionNameGasLimitFallback), + MinimumStorageRadius: c.config.GetUint(optionMinimumStorageRadius), + MutexProfile: c.config.GetBool(optionNamePProfMutex), + NATAddr: c.config.GetString(optionNameNATAddr), + NATWSSAddr: c.config.GetString(optionNATWSSAddr), + NeighborhoodSuggester: neighborhoodSuggester, + PaymentEarly: c.config.GetInt64(optionNamePaymentEarly), + PaymentThreshold: c.config.GetString(optionNamePaymentThreshold), + PaymentTolerance: c.config.GetInt64(optionNamePaymentTolerance), + PostageContractAddress: c.config.GetString(optionNamePostageContractAddress), + PostageContractStartBlock: c.config.GetUint64(optionNamePostageContractStartBlock), + PriceOracleAddress: c.config.GetString(optionNamePriceOracleAddress), + RedistributionContractAddress: c.config.GetString(optionNameRedistributionAddress), + ReserveCapacityDoubling: c.config.GetInt(optionReserveCapacityDoubling), + ResolverConnectionCfgs: resolverCfgs, + Resync: c.config.GetBool(optionNameResync), + RetrievalCaching: c.config.GetBool(optionNameRetrievalCaching), + SkipPostageSnapshot: c.config.GetBool(optionSkipPostageSnapshot), + StakingContractAddress: c.config.GetString(optionNameStakingAddress), + StatestoreCacheCapacity: c.config.GetUint64(optionNameStateStoreCacheCapacity), + StaticNodes: staticNodes, + SwapEnable: c.config.GetBool(optionNameSwapEnable), + SwapFactoryAddress: c.config.GetString(optionNameSwapFactoryAddress), + SwapInitialDeposit: c.config.GetString(optionNameSwapInitialDeposit), + TargetNeighborhood: c.config.GetString(optionNameTargetNeighborhood), + TracingEnabled: c.config.GetBool(optionNameTracingEnabled), + TracingEndpoint: tracingEndpoint, + TracingServiceName: c.config.GetString(optionNameTracingServiceName), + TrxDebugMode: c.config.GetBool(optionNameTransactionDebugMode), + WarmupTime: c.config.GetDuration(optionWarmUpTime), + WelcomeMessage: c.config.GetString(optionWelcomeMessage), + WhitelistedWithdrawalAddress: c.config.GetStringSlice(optionNameWhitelistedWithdrawalAddress), }) return b, err diff --git a/pkg/api/api.go b/pkg/api/api.go index acd838a3ff6..2816bb12a53 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -94,10 +94,11 @@ const ( SwarmActPublisherHeader = "Swarm-Act-Publisher" SwarmActHistoryAddressHeader = "Swarm-Act-History-Address" - ImmutableHeader = "Immutable" - GasPriceHeader = "Gas-Price" - GasLimitHeader = "Gas-Limit" - ETagHeader = "ETag" + ImmutableHeader = "Immutable" + GasPriceHeader = "Gas-Price" + GasLimitHeader = "Gas-Limit" + DisableRetryHeader = "Disable-Retry" + ETagHeader = "ETag" AuthorizationHeader = "Authorization" AcceptEncodingHeader = "Accept-Encoding" @@ -557,8 +558,9 @@ func (s *Service) gasConfigMiddleware(handlerName string) func(h http.Handler) h logger := s.logger.WithName(handlerName).Build() headers := struct { - GasPrice *big.Int `map:"Gas-Price"` - GasLimit uint64 `map:"Gas-Limit"` + GasPrice *big.Int `map:"Gas-Price"` + GasLimit uint64 `map:"Gas-Limit"` + DisableRetry bool `map:"Disable-Retry"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -567,6 +569,7 @@ func (s *Service) gasConfigMiddleware(handlerName string) func(h http.Handler) h ctx := r.Context() ctx = sctx.SetGasPrice(ctx, headers.GasPrice) ctx = sctx.SetGasLimit(ctx, headers.GasLimit) + ctx = sctx.SetDisableRetry(ctx, headers.DisableRetry) h.ServeHTTP(w, r.WithContext(ctx)) }) @@ -581,7 +584,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler { SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, - SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader, + SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, DisableRetryHeader, ImmutableHeader, SwarmActHeader, SwarmActTimestampHeader, SwarmActPublisherHeader, SwarmActHistoryAddressHeader, } allowedHeadersStr := strings.Join(allowedHeaders, ", ") diff --git a/pkg/node/chain.go b/pkg/node/chain.go index ef1a63c9881..c44b3917675 100644 --- a/pkg/node/chain.go +++ b/pkg/node/chain.go @@ -12,6 +12,7 @@ import ( "math/big" "net" "net/http" + "strconv" "strings" "time" @@ -41,6 +42,35 @@ const ( additionalConfirmations = 2 ) +// ParseFeeHistoryRewardPercentiles parses a comma-separated list of floats for eth_feeHistory +// rewardPercentiles. Exactly three values in the range [0, 100] are required. +func ParseFeeHistoryRewardPercentiles(s string) ([]float64, error) { + s = strings.TrimSpace(s) + if s == "" { + return nil, errors.New("fee history reward percentiles: empty string") + } + parts := strings.Split(s, ",") + out := make([]float64, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p == "" { + return nil, errors.New("fee history reward percentiles: empty token") + } + v, err := strconv.ParseFloat(p, 64) + if err != nil { + return nil, fmt.Errorf("fee history reward percentiles: parse %q: %w", p, err) + } + if v < 0 || v > 100 { + return nil, fmt.Errorf("fee history reward percentiles: %g out of range [0,100]", v) + } + out = append(out, v) + } + if len(out) != 3 { + return nil, fmt.Errorf("fee history reward percentiles: exactly 3 values, got %d", len(out)) + } + return out, nil +} + // BlockchainRPCConfig holds the configuration parameters for the blockchain RPC client transport. type BlockchainRPCConfig struct { Endpoint string @@ -64,6 +94,9 @@ func InitChain( fallbackGasLimit uint64, rpcCfg BlockchainRPCConfig, blockSyncInterval uint64, + feeHistoryBlockCount uint64, + feeHistoryRewardPercentiles []float64, + retryCfg transaction.TransactionsRetryConfig, ) (transaction.Backend, common.Address, int64, transaction.Monitor, transaction.Service, error) { backend := backendnoop.New(chainID) @@ -98,7 +131,7 @@ func InitChain( logger.Info("connected to blockchain backend", "version", versionString) - backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, pollingInterval, blockSyncInterval) + backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, pollingInterval, blockSyncInterval, feeHistoryBlockCount, feeHistoryRewardPercentiles) } backendChainID, err := backend.ChainID(ctx) @@ -117,7 +150,7 @@ func InitChain( transactionMonitor := transaction.NewMonitor(logger, backend, overlayEthAddress, pollingInterval, cancellationDepth) - transactionService, err := transaction.NewService(logger, overlayEthAddress, backend, signer, stateStore, backendChainID, transactionMonitor, fallbackGasLimit) + transactionService, err := transaction.NewService(logger, overlayEthAddress, backend, signer, stateStore, backendChainID, transactionMonitor, fallbackGasLimit, retryCfg) if err != nil { return nil, common.Address{}, 0, nil, nil, fmt.Errorf("transaction service: %w", err) } diff --git a/pkg/node/node.go b/pkg/node/node.go index 53d96ec5419..7983bd883e6 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -128,71 +128,89 @@ type Bee struct { } type Options struct { - Addr string - AllowPrivateCIDRs bool - APIAddr string - EnableWSS bool - WSSAddr string - AutoTLSStorageDir string - BlockchainRpcEndpoint string - BlockchainRpcDialTimeout time.Duration - BlockchainRpcTLSTimeout time.Duration - BlockchainRpcIdleTimeout time.Duration - BlockchainRpcKeepalive time.Duration - BlockProfile bool - BlockTime time.Duration - BlockSyncInterval uint64 - BootnodeMode bool - Bootnodes []string - CacheCapacity uint64 - AutoTLSCAEndpoint string - ChainID int64 - ChequebookEnable bool - CORSAllowedOrigins []string - DataDir string - DBBlockCacheCapacity uint64 - DBDisableSeeksCompaction bool - DBOpenFilesLimit uint64 - DBWriteBufferSize uint64 - EnableStorageIncentives bool - EnableWS bool - AutoTLSDomain string - AutoTLSRegistrationEndpoint string - FullNodeMode bool - GasLimitFallback uint64 - Logger log.Logger - MinimumGasTipCap uint64 - MinimumStorageRadius uint - MutexProfile bool - NATAddr string - NATWSSAddr string - NeighborhoodSuggester string - PaymentEarly int64 - PaymentThreshold string - PaymentTolerance int64 - PostageContractAddress string - PostageContractStartBlock uint64 - PriceOracleAddress string - RedistributionContractAddress string - ReserveCapacityDoubling int - ResolverConnectionCfgs []multiresolver.ConnectionConfig - Resync bool - RetrievalCaching bool - SkipPostageSnapshot bool - StakingContractAddress string - StatestoreCacheCapacity uint64 - StaticNodes []swarm.Address - SwapEnable bool - SwapFactoryAddress string - SwapInitialDeposit string - TargetNeighborhood string - TracingEnabled bool - TracingEndpoint string - TracingServiceName string - TrxDebugMode bool - WarmupTime time.Duration - WelcomeMessage string - WhitelistedWithdrawalAddress []string + Addr string + AllowPrivateCIDRs bool + APIAddr string + EnableWSS bool + WSSAddr string + AutoTLSStorageDir string + BlockchainRpcEndpoint string + BlockchainRpcDialTimeout time.Duration + BlockchainRpcTLSTimeout time.Duration + BlockchainRpcIdleTimeout time.Duration + BlockchainRpcKeepalive time.Duration + BlockProfile bool + BlockTime time.Duration + BlockSyncInterval uint64 + FeeHistoryBlockCount uint64 + FeeHistoryRewardPercentiles []float64 + TransactionRetryMaxRetries int + TransactionRetryDelay time.Duration + TransactionRetryGasIncreasePercent int + TransactionRetryMaxTxPriceWei uint64 + BootnodeMode bool + Bootnodes []string + CacheCapacity uint64 + AutoTLSCAEndpoint string + ChainID int64 + ChequebookEnable bool + CORSAllowedOrigins []string + DataDir string + DBBlockCacheCapacity uint64 + DBDisableSeeksCompaction bool + DBOpenFilesLimit uint64 + DBWriteBufferSize uint64 + EnableStorageIncentives bool + EnableWS bool + AutoTLSDomain string + AutoTLSRegistrationEndpoint string + FullNodeMode bool + GasLimitFallback uint64 + Logger log.Logger + MinimumGasTipCap uint64 + MinimumStorageRadius uint + MutexProfile bool + NATAddr string + NATWSSAddr string + NeighborhoodSuggester string + PaymentEarly int64 + PaymentThreshold string + PaymentTolerance int64 + PostageContractAddress string + PostageContractStartBlock uint64 + PriceOracleAddress string + RedistributionContractAddress string + ReserveCapacityDoubling int + ResolverConnectionCfgs []multiresolver.ConnectionConfig + Resync bool + RetrievalCaching bool + SkipPostageSnapshot bool + StakingContractAddress string + StatestoreCacheCapacity uint64 + StaticNodes []swarm.Address + SwapEnable bool + SwapFactoryAddress string + SwapInitialDeposit string + TargetNeighborhood string + TracingEnabled bool + TracingEndpoint string + TracingServiceName string + TrxDebugMode bool + WarmupTime time.Duration + WelcomeMessage string + WhitelistedWithdrawalAddress []string +} + +func txRetryConfigFromOptions(o *Options) transaction.TransactionsRetryConfig { + c := transaction.TransactionsRetryConfig{ + MaxRetries: o.TransactionRetryMaxRetries, + RetryDelay: o.TransactionRetryDelay, + GasIncreasePercent: o.TransactionRetryGasIncreasePercent, + } + if o.TransactionRetryMaxTxPriceWei != 0 { + c.MaxTxPrice = new(big.Int).SetUint64(o.TransactionRetryMaxTxPriceWei) + } + return c } const ( @@ -428,6 +446,9 @@ func NewBee( Keepalive: o.BlockchainRpcKeepalive, }, o.BlockSyncInterval, + o.FeeHistoryBlockCount, + o.FeeHistoryRewardPercentiles, + txRetryConfigFromOptions(o), ) if err != nil { return nil, fmt.Errorf("init chain: %w", err) @@ -1350,6 +1371,9 @@ func NewBee( if swapBackendMetrics, ok := chainBackend.(metrics.Collector); ok { apiService.MustRegisterMetrics(swapBackendMetrics.Metrics()...) } + if txMetrics, ok := transactionService.(metrics.Collector); ok { + apiService.MustRegisterMetrics(txMetrics.Metrics()...) + } if l, ok := logger.(metrics.Collector); ok { apiService.MustRegisterMetrics(l.Metrics()...) diff --git a/pkg/postage/postagecontract/contract.go b/pkg/postage/postagecontract/contract.go index cae599db166..62242c9630e 100644 --- a/pkg/postage/postagecontract/contract.go +++ b/pkg/postage/postagecontract/contract.go @@ -179,6 +179,14 @@ func (c *postageContract) sendApproveTransaction(ctx context.Context, amount *bi ) }() + if !sctx.GetDisableRetry(ctx) { + _, receipt, err = c.transactionService.SendWithRetry(ctx, request) + if err != nil { + return nil, err + } + return receipt, nil + } + txHash, err := c.transactionService.Send(ctx, request, transaction.DefaultTipBoostPercent) if err != nil { return nil, err @@ -215,6 +223,14 @@ func (c *postageContract) sendTransaction(ctx context.Context, callData []byte, ) }() + if !sctx.GetDisableRetry(ctx) { + _, receipt, err = c.transactionService.SendWithRetry(ctx, request) + if err != nil { + return nil, err + } + return receipt, nil + } + txHash, err := c.transactionService.Send(ctx, request, transaction.DefaultTipBoostPercent) if err != nil { return nil, err diff --git a/pkg/postage/postagecontract/contract_test.go b/pkg/postage/postagecontract/contract_test.go index 92020efbe9a..6592a7f23a7 100644 --- a/pkg/postage/postagecontract/contract_test.go +++ b/pkg/postage/postagecontract/contract_test.go @@ -74,36 +74,25 @@ func TestCreateBatch(t *testing.T) { postageStampContractABI, bzzTokenAddress, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { switch *request.To { case bzzTokenAddress: - return txHashApprove, nil + return txHashApprove, &types.Receipt{Status: 1}, nil case postageStampAddress: if bytes.Equal(expectedCallDataForExpireLimitedBatches[:32], request.Data[:32]) { - return txHashApprove, nil + return txHashApprove, &types.Receipt{Status: 1}, nil } if !bytes.Equal(expectedCallData[:100], request.Data[:100]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashCreate, nil - } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - switch txHash { - case txHashApprove: - return &types.Receipt{ - Status: 1, - }, nil - case txHashCreate: - return &types.Receipt{ + return txHashCreate, &types.Receipt{ Logs: []*types.Log{ newCreateEvent(postageStampAddress, batchID), }, Status: 1, }, nil } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), transactionMock.WithCallFunc(func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) { if *request.To == bzzTokenAddress { @@ -319,33 +308,22 @@ func TestTopUpBatch(t *testing.T) { postageStampContractABI, bzzTokenAddress, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { switch *request.To { case bzzTokenAddress: - return txHashApprove, nil + return txHashApprove, &types.Receipt{Status: 1}, nil case postageStampAddress: if !bytes.Equal(expectedCallData[:64], request.Data[:64]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashTopup, nil - } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - switch txHash { - case txHashApprove: - return &types.Receipt{ - Status: 1, - }, nil - case txHashTopup: - return &types.Receipt{ + return txHashTopup, &types.Receipt{ Logs: []*types.Log{ newTopUpEvent(postageStampAddress, batch), }, Status: 1, }, nil } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), transactionMock.WithCallFunc(func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) { if *request.To == bzzTokenAddress { @@ -490,33 +468,22 @@ func TestDiluteBatch(t *testing.T) { postageStampContractABI, bzzTokenAddress, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == postageStampAddress { if bytes.Equal(expectedCallDataForExpireLimitedBatches[:32], request.Data[:32]) { - return txHashApprove, nil + return txHashApprove, &types.Receipt{Status: 1}, nil } if !bytes.Equal(expectedCallData[:64], request.Data[:64]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashDilute, nil - } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - if txHash == txHashDilute { - return &types.Receipt{ + return txHashDilute, &types.Receipt{ Logs: []*types.Log{ newDiluteEvent(postageStampAddress, batch), }, Status: 1, }, nil } - if txHash == txHashApprove { - return &types.Receipt{ - Status: 1, - }, nil - } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), transactionMock.WithCallFunc(func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) { if *request.To == postageStampAddress { @@ -663,12 +630,8 @@ func TestBatchExpirer(t *testing.T) { } } return nil, errors.New("unexpected call") - }), transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, i int) (txHash common.Hash, err error) { - return common.Hash{}, nil - }), transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - return &types.Receipt{ - Status: 1, - }, nil + }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + return common.Hash{}, &types.Receipt{Status: 1}, nil }), ), postageMock, @@ -805,13 +768,13 @@ func TestBatchExpirer(t *testing.T) { } } return nil, errors.New("unexpected call") - }), transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, i int) (txHash common.Hash, err error) { + }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == postageContractAddress { if bytes.Equal(expectedCallDataForExpireLimitedBatches[:32], request.Data[:32]) { - return txHash, fmt.Errorf("some error") + return common.Hash{}, nil, fmt.Errorf("some error") } } - return txHash, errors.New("unexpected call") + return common.Hash{}, nil, errors.New("unexpected call") }), ), postageMock, @@ -928,12 +891,8 @@ func TestBatchExpirer(t *testing.T) { } } return nil, errors.New("unexpected call") - }), transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, i int) (txHash common.Hash, err error) { - return common.Hash{}, nil - }), transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - return &types.Receipt{ - Status: 0, - }, nil + }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + return common.Hash{}, &types.Receipt{Status: 0}, transaction.ErrTransactionReverted }), ), postageMock, diff --git a/pkg/sctx/sctx.go b/pkg/sctx/sctx.go index e2f1d3aefd4..004312bd0c9 100644 --- a/pkg/sctx/sctx.go +++ b/pkg/sctx/sctx.go @@ -20,6 +20,7 @@ type ( requestHostKey struct{} gasPriceKey struct{} gasLimitKey struct{} + disableRetryKey struct{} ) // SetHost sets the http request host in the context @@ -67,3 +68,12 @@ func GetGasPrice(ctx context.Context) *big.Int { } return nil } + +func SetDisableRetry(ctx context.Context, disable bool) context.Context { + return context.WithValue(ctx, disableRetryKey{}, disable) +} + +func GetDisableRetry(ctx context.Context) bool { + v, ok := ctx.Value(disableRetryKey{}).(bool) + return ok && v +} diff --git a/pkg/storageincentives/redistribution/redistribution.go b/pkg/storageincentives/redistribution/redistribution.go index 77013a8a990..98cf737d3fe 100644 --- a/pkg/storageincentives/redistribution/redistribution.go +++ b/pkg/storageincentives/redistribution/redistribution.go @@ -18,7 +18,8 @@ import ( ) const ( - loggerName = "redistributionContract" + loggerName = "redistributionContract" + // BoostTipPercent is used where the node sends transactions without retry. BoostTipPercent = 50 ) @@ -115,7 +116,7 @@ func (c *contract) Claim(ctx context.Context, proofs ChunkInclusionProofs) (comm Value: big.NewInt(0), Description: "claim win transaction", } - txHash, err := c.sendAndWait(ctx, request, BoostTipPercent) + txHash, err := c.sendAndWait(ctx, request) if err != nil { return txHash, fmt.Errorf("claim: %w", err) } @@ -138,7 +139,7 @@ func (c *contract) Commit(ctx context.Context, obfusHash []byte, round uint64) ( Value: big.NewInt(0), Description: "commit transaction", } - txHash, err := c.sendAndWait(ctx, request, BoostTipPercent) + txHash, err := c.sendAndWait(ctx, request) if err != nil { return txHash, fmt.Errorf("commit: obfusHash %v: %w", common.BytesToHash(obfusHash), err) } @@ -161,7 +162,7 @@ func (c *contract) Reveal(ctx context.Context, storageDepth uint8, reserveCommit Value: big.NewInt(0), Description: "reveal transaction", } - txHash, err := c.sendAndWait(ctx, request, BoostTipPercent) + txHash, err := c.sendAndWait(ctx, request) if err != nil { return txHash, fmt.Errorf("reveal: storageDepth %d reserveCommitmentHash %v RandomNonce %v: %w", storageDepth, common.BytesToHash(reserveCommitmentHash), common.BytesToHash(RandomNonce), err) } @@ -189,7 +190,7 @@ func (c *contract) ReserveSalt(ctx context.Context) ([]byte, error) { return salt[:], nil } -func (c *contract) sendAndWait(ctx context.Context, request *transaction.TxRequest, boostPercent int) (txHash common.Hash, err error) { +func (c *contract) sendAndWait(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, err error) { defer func() { err = c.txService.UnwrapABIError( ctx, @@ -199,17 +200,12 @@ func (c *contract) sendAndWait(ctx context.Context, request *transaction.TxReque ) }() - txHash, err = c.txService.Send(ctx, request, boostPercent) + txHash, receipt, err := c.txService.SendWithRetry(ctx, request) if err != nil { return txHash, err } - receipt, err := c.txService.WaitForReceipt(ctx, txHash) - if err != nil { - return txHash, err - } - - if receipt.Status == 0 { - return txHash, transaction.ErrTransactionReverted + if receipt == nil { + return txHash, fmt.Errorf("missing receipt after send with retry") } return txHash, nil } diff --git a/pkg/storageincentives/redistribution/redistribution_test.go b/pkg/storageincentives/redistribution/redistribution_test.go index b48e9014b02..85ec99b7b19 100644 --- a/pkg/storageincentives/redistribution/redistribution_test.go +++ b/pkg/storageincentives/redistribution/redistribution_test.go @@ -203,22 +203,14 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashDeposited, nil + return txHashDeposited, &types.Receipt{Status: 1}, nil } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - if txHash == txHashDeposited { - return &types.Receipt{ - Status: 1, - }, nil - } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), ), redistributionContractAddress, @@ -245,22 +237,14 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashDeposited, nil - } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - if txHash == txHashDeposited { - return &types.Receipt{ - Status: 0, - }, nil + return txHashDeposited, &types.Receipt{Status: 0}, transaction.ErrTransactionReverted } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), ), redistributionContractAddress, @@ -288,22 +272,14 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashDeposited, nil + return txHashDeposited, &types.Receipt{Status: 1}, nil } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - if txHash == txHashDeposited { - return &types.Receipt{ - Status: 1, - }, nil - } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), ), redistributionContractAddress, @@ -333,22 +309,14 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, _ int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashDeposited, nil - } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - if txHash == txHashDeposited { - return &types.Receipt{ - Status: 1, - }, nil + return txHashDeposited, &types.Receipt{Status: 1}, nil } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), ), redistributionContractAddress, @@ -421,7 +389,10 @@ func TestRedistribution(t *testing.T) { t.Run("invalid call data", func(t *testing.T) { t.Parallel() - expectedCallData, err := redistributionContractABI.Pack("commit", common.BytesToHash(common.Hex2Bytes("some hash")), uint64(0)) + // Use valid distinct hashes: Hex2Bytes("some hash") and Hex2Bytes("hash") both decode to empty bytes. + expectedHash := common.HexToHash("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + actualHash := common.Hex2Bytes("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + expectedCallData, err := redistributionContractABI.Pack("commit", expectedHash, uint64(0)) if err != nil { t.Fatal(err) } @@ -430,14 +401,14 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { - if !bytes.Equal(expectedCallData[:], request.Data[:]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + if !bytes.Equal(expectedCallData, request.Data) { + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashDeposited, nil + return txHashDeposited, &types.Receipt{Status: 1}, nil } - return common.Hash{}, errors.New("sent to wrong contract") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), ), redistributionContractAddress, @@ -445,7 +416,7 @@ func TestRedistribution(t *testing.T) { 0, ) - _, err = contract.Commit(ctx, common.Hex2Bytes("hash"), 0) + _, err = contract.Commit(ctx, actualHash, 0) if err == nil { t.Fatal("expected error") } diff --git a/pkg/transaction/backend.go b/pkg/transaction/backend.go index 075dbfe19b8..48a1c64abee 100644 --- a/pkg/transaction/backend.go +++ b/pkg/transaction/backend.go @@ -17,10 +17,19 @@ import ( "github.com/ethersphere/bee/v2/pkg/transaction/backend" ) +// FeeHistorySuggestedFeeAndTips are max-fee-per-gas style estimates from eth_feeHistory over the last N blocks (by default 100) +// Low, Market, and Aggressive are the median per-block priority fee at the 10th, 50th, and 90th reward percentiles respectively (each priority tier is floored by the configured minimum tip). +type FeeHistorySuggestedFeeAndTips struct { + LowTip *big.Int + MarketTip *big.Int + AggressiveTip *big.Int +} + // Backend is the minimum of blockchain backend functions we need. type Backend interface { backend.Geth SuggestedFeeAndTip(ctx context.Context, gasPrice *big.Int, boostPercent int) (*big.Int, *big.Int, error) + SuggestedFeeAndTipsFromHistory(ctx context.Context, lastBlock *big.Int) (*FeeHistorySuggestedFeeAndTips, error) } // IsSynced will check if we are synced with the given blockchain backend. This diff --git a/pkg/transaction/backend/backend.go b/pkg/transaction/backend/backend.go index 67323d14e45..2d838a78a26 100644 --- a/pkg/transaction/backend/backend.go +++ b/pkg/transaction/backend/backend.go @@ -21,6 +21,7 @@ type Geth interface { ChainID(ctx context.Context) (*big.Int, error) Close() EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) + FeeHistory(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) diff --git a/pkg/transaction/backendmock/backend.go b/pkg/transaction/backendmock/backend.go index 364493b0341..e0a88a3200d 100644 --- a/pkg/transaction/backendmock/backend.go +++ b/pkg/transaction/backendmock/backend.go @@ -18,18 +18,20 @@ import ( var ErrNotImplemented = errors.New("not implemented") type backendMock struct { - callContract func(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) - sendTransaction func(ctx context.Context, tx *types.Transaction) error - suggestedFeeAndTip func(ctx context.Context, gasPrice *big.Int, boostPercent int) (*big.Int, *big.Int, error) - suggestGasTipCap func(ctx context.Context) (*big.Int, error) - estimateGas func(ctx context.Context, msg ethereum.CallMsg) (gas uint64, err error) - transactionReceipt func(ctx context.Context, txHash common.Hash) (*types.Receipt, error) - pendingNonceAt func(ctx context.Context, account common.Address) (uint64, error) - transactionByHash func(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) - blockNumber func(ctx context.Context) (uint64, error) - headerByNumber func(ctx context.Context, number *big.Int) (*types.Header, error) - balanceAt func(ctx context.Context, address common.Address, block *big.Int) (*big.Int, error) - nonceAt func(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) + callContract func(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) + sendTransaction func(ctx context.Context, tx *types.Transaction) error + suggestedFeeAndTip func(ctx context.Context, gasPrice *big.Int, boostPercent int) (*big.Int, *big.Int, error) + suggestedFeeAndTipsFromHistory func(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) + suggestGasTipCap func(ctx context.Context) (*big.Int, error) + estimateGas func(ctx context.Context, msg ethereum.CallMsg) (gas uint64, err error) + transactionReceipt func(ctx context.Context, txHash common.Hash) (*types.Receipt, error) + pendingNonceAt func(ctx context.Context, account common.Address) (uint64, error) + transactionByHash func(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) + blockNumber func(ctx context.Context) (uint64, error) + headerByNumber func(ctx context.Context, number *big.Int) (*types.Header, error) + balanceAt func(ctx context.Context, address common.Address, block *big.Int) (*big.Int, error) + nonceAt func(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) + feeHistory func(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) } func (m *backendMock) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { @@ -53,6 +55,13 @@ func (m *backendMock) SuggestedFeeAndTip(ctx context.Context, gasPrice *big.Int, return nil, nil, ErrNotImplemented } +func (m *backendMock) SuggestedFeeAndTipsFromHistory(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + if m.suggestedFeeAndTipsFromHistory != nil { + return m.suggestedFeeAndTipsFromHistory(ctx, lastBlock) + } + return nil, ErrNotImplemented +} + func (m *backendMock) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { if m.estimateGas != nil { return m.estimateGas(ctx, msg) @@ -120,6 +129,13 @@ func (m *backendMock) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { return nil, ErrNotImplemented } +func (m *backendMock) FeeHistory(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) { + if m.feeHistory != nil { + return m.feeHistory(ctx, blockCount, lastBlock, rewardPercentiles) + } + return nil, ErrNotImplemented +} + func (m *backendMock) ChainID(ctx context.Context) (*big.Int, error) { return nil, ErrNotImplemented } @@ -214,3 +230,15 @@ func WithNonceAtFunc(f func(ctx context.Context, account common.Address, blockNu s.nonceAt = f }) } + +func WithFeeHistoryFunc(f func(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error)) Option { + return optionFunc(func(s *backendMock) { + s.feeHistory = f + }) +} + +func WithSuggestedFeeAndTipsFromHistoryFunc(f func(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error)) Option { + return optionFunc(func(s *backendMock) { + s.suggestedFeeAndTipsFromHistory = f + }) +} diff --git a/pkg/transaction/backendnoop/backend.go b/pkg/transaction/backendnoop/backend.go index 4b149369981..830dc9f9de3 100644 --- a/pkg/transaction/backendnoop/backend.go +++ b/pkg/transaction/backendnoop/backend.go @@ -51,6 +51,10 @@ func (b *Backend) SuggestedFeeAndTip(ctx context.Context, gasPrice *big.Int, boo return nil, nil, postagecontract.ErrChainDisabled } +func (b *Backend) SuggestedFeeAndTipsFromHistory(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + return nil, postagecontract.ErrChainDisabled +} + func (b *Backend) SuggestGasTipCap(context.Context) (*big.Int, error) { return nil, postagecontract.ErrChainDisabled } @@ -87,6 +91,10 @@ func (b *Backend) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log return nil, postagecontract.ErrChainDisabled } +func (b *Backend) FeeHistory(context.Context, uint64, *big.Int, []float64) (*ethereum.FeeHistory, error) { + return nil, postagecontract.ErrChainDisabled +} + func (b *Backend) ChainID(context.Context) (*big.Int, error) { return big.NewInt(b.chainID), nil } diff --git a/pkg/transaction/backendsimulation/backend.go b/pkg/transaction/backendsimulation/backend.go index 9c647b6404a..d8d643e3f57 100644 --- a/pkg/transaction/backendsimulation/backend.go +++ b/pkg/transaction/backendsimulation/backend.go @@ -97,6 +97,10 @@ func (m *simulatedBackend) SuggestedFeeAndTip(ctx context.Context, gasPrice *big return nil, nil, ErrNotImplemented } +func (m *simulatedBackend) SuggestedFeeAndTipsFromHistory(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + return nil, ErrNotImplemented +} + func (m *simulatedBackend) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { return 0, ErrNotImplemented } @@ -148,6 +152,10 @@ func (m *simulatedBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, erro return nil, ErrNotImplemented } +func (m *simulatedBackend) FeeHistory(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) { + return nil, ErrNotImplemented +} + func (m *simulatedBackend) ChainID(ctx context.Context) (*big.Int, error) { return nil, ErrNotImplemented } diff --git a/pkg/transaction/export_test.go b/pkg/transaction/export_test.go index 3d723b4fb19..8b6a9070fdb 100644 --- a/pkg/transaction/export_test.go +++ b/pkg/transaction/export_test.go @@ -4,4 +4,33 @@ package transaction -var StoredTransactionKey = storedTransactionKey +import ( + "context" + "math/big" + + "github.com/ethersphere/bee/v2/pkg/log" +) + +var ( + StoredTransactionKey = storedTransactionKey + RetryStateKey = retryStateKey + PendingTransactionKey = pendingTransactionKey + EscalateGasTip = escalateGasTip +) + +// SuggestGasFeeGasTipCapWithHistory exposes suggestGasFeeGasTipCapWithHistory for tests. +func SuggestGasFeeGasTipCapWithHistory( + backend Backend, + gasIncreasePercent int, + maxTxPrice *big.Int, + ctx context.Context, + prevGasTipCap *big.Int, +) (gasFeeCap, gasTipCap *big.Int, err error) { + svc := &transactionService{ + logger: log.Noop, + backend: backend, + txRetryGasIncreasePercent: gasIncreasePercent, + maxTxPrice: maxTxPrice, + } + return svc.suggestGasFeeGasTipCapWithHistory(ctx, prevGasTipCap) +} diff --git a/pkg/transaction/metrics.go b/pkg/transaction/metrics.go new file mode 100644 index 00000000000..a9bf4e9a1bf --- /dev/null +++ b/pkg/transaction/metrics.go @@ -0,0 +1,128 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package transaction + +import ( + "context" + "errors" + "math/big" + "strconv" + "strings" + + m "github.com/ethersphere/bee/v2/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +var _ m.Collector = (*transactionService)(nil) + +// transactionsWithRetryMetrics collects SendWithRetry monitoring data for Prometheus dashboards. +type transactionsWithRetryMetrics struct { + // AttemptsPerTransaction is the number of broadcast rounds per SendWithRetry invocation + // (1 = confirmed on the first broadcast, 2 = one retry, etc.). + AttemptsPerTransaction prometheus.Histogram + // OutcomesTotal counts finished SendWithRetry runs by result label. + OutcomesTotal *prometheus.CounterVec + // BroadcastGasTipCap records maxPriorityFeePerGas (wei) per broadcast attempt index. + BroadcastGasTipCap *prometheus.HistogramVec + // BroadcastGasFeeCap records maxFeePerGas (wei) per broadcast attempt index. + BroadcastGasFeeCap *prometheus.HistogramVec +} + +func newRetryMetrics() transactionsWithRetryMetrics { + subsystem := "transaction_retry" + + // Gas fees on Gnosis/mainnet-style chains: from ~1 gwei to tens of gwei per unit. + gasBuckets := prometheus.ExponentialBuckets(1_000_000_000, 2, 14) + + return transactionsWithRetryMetrics{ + AttemptsPerTransaction: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "attempts_per_transaction", + Help: "Broadcast attempts per SendWithRetry invocation (1 = no retry needed).", + Buckets: []float64{1, 2, 3, 4, 5, 6, 8, 10}, + }), + OutcomesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "outcomes_total", + Help: "Finished SendWithRetry invocations by outcome.", + }, []string{"result"}), + BroadcastGasTipCap: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "broadcast_gas_tip_cap_wei", + Help: "maxPriorityFeePerGas (wei) of each retry broadcast, labelled by attempt index (0 = first).", + Buckets: gasBuckets, + }, []string{"attempt"}), + BroadcastGasFeeCap: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "broadcast_gas_fee_cap_wei", + Help: "maxFeePerGas (wei) of each retry broadcast, labelled by attempt index (0 = first).", + Buckets: gasBuckets, + }, []string{"attempt"}), + } +} + +func (t *transactionService) Metrics() []prometheus.Collector { + return m.PrometheusCollectorsFromFields(t.metrics) +} + +func (t *transactionService) recordRetryBroadcast(attempt int, tip, feeCap *big.Int) { + if tip == nil || feeCap == nil { + return + } + attemptLabel := strconv.Itoa(attempt) + t.metrics.BroadcastGasTipCap.WithLabelValues(attemptLabel).Observe(weiToFloat(tip)) + t.metrics.BroadcastGasFeeCap.WithLabelValues(attemptLabel).Observe(weiToFloat(feeCap)) +} + +func (t *transactionService) recordRetryComplete(broadcastAttempts int, err error) { + t.metrics.OutcomesTotal.WithLabelValues(retryOutcomeLabel(err)).Inc() + if broadcastAttempts > 0 { + t.metrics.AttemptsPerTransaction.Observe(float64(broadcastAttempts)) + } +} + +func weiToFloat(v *big.Int) float64 { + if v == nil { + return 0 + } + f, _ := new(big.Float).SetInt(v).Float64() + return f +} + +// retryOutcomeLabel maps a SendWithRetry terminal error to a stable Prometheus label. +func retryOutcomeLabel(err error) string { + if err == nil { + return "success" + } + if errors.Is(err, ErrTransactionReverted) { + return "reverted" + } + if errors.Is(err, ErrSignTransaction) { + return "sign_failed" + } + if errors.Is(err, ErrTransactionCancelled) { + return "cancelled" + } + if errors.Is(err, ErrTxMaxPriceExceeded) { + return "max_price_exceeded" + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return "context_canceled" + } + if strings.Contains(err.Error(), "transaction failed after") { + return "attempts_exhausted" + } + if strings.Contains(err.Error(), "send txs with retry requires automatic gas pricing") { + return "manual_gas_price" + } + if isErrCritical(err) { + return "critical" + } + return "other" +} diff --git a/pkg/transaction/mock/transaction.go b/pkg/transaction/mock/transaction.go index 072f47cf8f2..66ad081fbe4 100644 --- a/pkg/transaction/mock/transaction.go +++ b/pkg/transaction/mock/transaction.go @@ -19,6 +19,7 @@ import ( type transactionServiceMock struct { send func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) + sendWithRetry func(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, receipt *types.Receipt, err error) waitForReceipt func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) watchSentTransaction func(txHash common.Hash) (chan types.Receipt, chan error, error) call func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) @@ -29,6 +30,13 @@ type transactionServiceMock struct { transactionFee func(ctx context.Context, txHash common.Hash) (*big.Int, error) } +func (m *transactionServiceMock) SendWithRetry(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + if m.sendWithRetry != nil { + return m.sendWithRetry(ctx, request) + } + return common.Hash{}, nil, errors.New("not implemented") +} + func (m *transactionServiceMock) Send(ctx context.Context, request *transaction.TxRequest, boostPercent int) (txHash common.Hash, err error) { if m.send != nil { return m.send(ctx, request, boostPercent) @@ -110,6 +118,12 @@ type optionFunc func(*transactionServiceMock) func (f optionFunc) apply(r *transactionServiceMock) { f(r) } +func WithSendWithRetryFunc(f func(context.Context, *transaction.TxRequest) (common.Hash, *types.Receipt, error)) Option { + return optionFunc(func(s *transactionServiceMock) { + s.sendWithRetry = f + }) +} + func WithSendFunc(f func(context.Context, *transaction.TxRequest, int) (txHash common.Hash, err error)) Option { return optionFunc(func(s *transactionServiceMock) { s.send = f diff --git a/pkg/transaction/send_tx_with_retry.go b/pkg/transaction/send_tx_with_retry.go new file mode 100644 index 00000000000..a7dcbfa0377 --- /dev/null +++ b/pkg/transaction/send_tx_with_retry.go @@ -0,0 +1,525 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package transaction + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +const retryStatePrefix = "transaction_retry_" + +// TransactionRetryState is persisted so transactions with retry can resume after a node restart. +type TransactionRetryState struct { + Nonce uint64 `json:"nonce"` + NonceAssigned bool `json:"nonce_assigned"` + NextAttempt int `json:"next_attempt"` + LastTxHash common.Hash `json:"last_tx_hash"` + AllTxHashes []common.Hash `json:"all_tx_hashes"` + GasLimit uint64 `json:"gas_limit"` + To *common.Address `json:"to,omitempty"` + Data []byte `json:"data,omitempty"` + Value *big.Int `json:"value,omitempty"` + Description string `json:"description,omitempty"` + + // PreviousTip is the maxPriorityFeePerGas used in the last successful broadcast. + // Each retry escalates from this value by (100+GasIncreasePercent)/100. + PreviousTip *big.Int `json:"previous_tip,omitempty"` +} + +func retryStateKey(nonce uint64) string { + return fmt.Sprintf("%s%020d", retryStatePrefix, nonce) +} + +// SendWithRetry sends an EIP-1559 transaction using one eth_feeHistory snapshot for the initial tip, +// then increases gas tip by gas_increase_percent after each unsuccessful wait, up to max_retries. +func (t *transactionService) SendWithRetry(ctx context.Context, request *TxRequest) (txHash common.Hash, receipt *types.Receipt, err error) { + if request.GasPrice != nil { + err = errors.New("send txs with retry requires automatic gas pricing") + t.recordRetryComplete(0, err) + return common.Hash{}, nil, err + } + return t.retry(ctx, "", request) +} + +// escalateGasTip returns tip * (100+increasePct)/100 — a single escalation step. +func escalateGasTip(tip *big.Int, increasePct int) *big.Int { + if tip == nil { + return nil + } + return new(big.Int).Div(new(big.Int).Mul(new(big.Int).Set(tip), big.NewInt(int64(100+increasePct))), big.NewInt(100)) +} + +// suggestGasFeeGasTipCapWithHistory returns maxFeePerGas (gasFeeCap) and maxPriorityFeePerGas (gasTipCap) +// for transactions with retry. It reads the latest block base fee and picks a priority fee, then sets +// gasFeeCap = 2*baseFee + tip (same formula as wrapped.SuggestedFeeAndTip). +// +// Priority fee selection: +// - First attempt (prevGasTipCap nil or zero): one eth_feeHistory snapshot via +// SuggestedFeeAndTipsFromHistory; MarketTip is used as gasTipCap. +// - Later attempts: gasTipCap = prevGasTipCap * (100 + txRetryGasIncreasePercent) / 100. +// +// When maxTxPrice is set and 2*baseFee + escalated tip exceeds it, the function broadcasts with the +// un-escalated previous tip (2*baseFee + prevGasTipCap) instead. If that fee cap still exceeds +// maxTxPrice, it returns ErrTxMaxPriceExceeded. +func (t *transactionService) suggestGasFeeGasTipCapWithHistory(ctx context.Context, prevGasTipCap *big.Int) (gasFeeCap, gasTipCap *big.Int, err error) { + header, err := t.backend.HeaderByNumber(ctx, nil) + if err != nil { + return nil, prevGasTipCap, err + } + if header == nil || header.BaseFee == nil { + return nil, prevGasTipCap, fmt.Errorf("latest block header or base fee unavailable") + } + + var escalatedGasTip *big.Int + if prevGasTipCap == nil || prevGasTipCap.Sign() == 0 { + fh, err := t.backend.SuggestedFeeAndTipsFromHistory(ctx, nil) + if err != nil { + return nil, nil, fmt.Errorf("fee history: %w", err) + } + if fh == nil { + return nil, nil, errors.New("fee history: missing base fee") + } + escalatedGasTip = fh.MarketTip + prevGasTipCap = fh.MarketTip + } else { + escalatedGasTip = escalateGasTip(prevGasTipCap, t.txRetryGasIncreasePercent) + } + + gasFeeCap = new(big.Int).Mul(header.BaseFee, big.NewInt(2)) + gasFeeCapWithEscalatedTip := new(big.Int).Add(new(big.Int).Set(gasFeeCap), escalatedGasTip) + gasFeeCapWithPreviousTip := new(big.Int).Add(new(big.Int).Set(gasFeeCap), prevGasTipCap) + + t.logger.V(1).Register().Debug("suggest gas fees for retry", + "base_fee", header.BaseFee, + "previous_tip", prevGasTipCap, + "escalated_tip", escalatedGasTip, + "gas_fee_cap_with_escalated_tip", gasFeeCapWithEscalatedTip, + "gas_fee_cap_with_previous_tip", gasFeeCapWithPreviousTip, + "max_tx_price", t.maxTxPrice) + + if t.maxTxPrice != nil && gasFeeCapWithEscalatedTip.Cmp(t.maxTxPrice) > 0 { + t.logger.Warning("gas cap fee with escalated gas tip is too high, fallback to previous gas tip", + "escalated_gas_tip_cap", escalatedGasTip.String(), + "escalated_gas_fee_cap", gasFeeCapWithEscalatedTip.String(), + "previous_gas_tip_cap", prevGasTipCap.String()) + + if gasFeeCapWithPreviousTip.Cmp(t.maxTxPrice) > 0 { + return nil, nil, fmt.Errorf("%w: max_fee_per_gas %s exceeds limit %s", ErrTxMaxPriceExceeded, gasFeeCap, t.maxTxPrice) + } + return gasFeeCapWithPreviousTip, prevGasTipCap, nil + } + return gasFeeCapWithEscalatedTip, escalatedGasTip, nil +} + +func (t *transactionService) prepareTransactionWithRetry(ctx context.Context, request *TxRequest, nonce uint64, prevGasTipCap *big.Int) (*types.Transaction, error) { + gasLimit, err := t.estimateGasLimit(ctx, request) + if err != nil { + return nil, err + } + + gasFeeCap, newGasTipCap, err := t.suggestGasFeeGasTipCapWithHistory(ctx, prevGasTipCap) + if err != nil { + return nil, err + } + + tx := types.NewTx(&types.DynamicFeeTx{ + Nonce: nonce, + ChainID: t.chainID, + To: request.To, + Value: request.Value, + Gas: gasLimit, + GasFeeCap: gasFeeCap, + GasTipCap: newGasTipCap, + Data: request.Data, + }) + return tx, nil +} + +// broadcastTx prepares, signs, and sends a transaction. +// When fixedNonce is nil a new nonce is allocated (first attempt); +// otherwise the supplied nonce is reused (replacement transaction). +func (t *transactionService) broadcastTx(ctx context.Context, request *TxRequest, fixedNonce *uint64, gasTipCap *big.Int, attempt int) (*types.Transaction, error) { + var nonce uint64 + + if fixedNonce != nil { + nonce = *fixedNonce + } else { + t.lock.Lock() + defer t.lock.Unlock() + + n, err := t.nextNonce(ctx) + if err != nil { + return nil, err + } + nonce = n + } + tx, err := t.prepareTransactionWithRetry(ctx, request, nonce, gasTipCap) + if err != nil { + return nil, err + } + + signedTx, err := t.signer.SignTx(tx, t.chainID) + if err != nil { + return nil, fmt.Errorf("%w: %w", ErrSignTransaction, err) + } + + t.logger.Info("send with retry: broadcast", + "attempt", attempt, + "tx", signedTx.Hash(), + "nonce", nonce, + "to", addressForLog(request.To), + "gas_limit", tx.Gas(), + "gas_fee_cap", tx.GasFeeCap(), + "gas_tip_cap", tx.GasTipCap(), + "value", tx.Value(), + "data_len", len(tx.Data()), + "description", request.Description, + ) + + t.recordRetryBroadcast(attempt, tx.GasTipCap(), tx.GasFeeCap()) + + err = t.backend.SendTransaction(ctx, signedTx) + return signedTx, err +} + +func (t *transactionService) deleteRetryStateAndPending(retryKey string, state TransactionRetryState) { + if retryKey == "" { + return + } + _ = t.store.Delete(retryKey) + for _, h := range state.AllTxHashes { + _ = t.store.Delete(pendingTransactionKey(h)) + } + if state.LastTxHash != (common.Hash{}) { + _ = t.store.Delete(pendingTransactionKey(state.LastTxHash)) + } +} +func (t *transactionService) retry(ctx context.Context, txRetryKey string, request *TxRequest) (common.Hash, *types.Receipt, error) { + var ( + txState TransactionRetryState + nonce *uint64 + ) + + if txRetryKey != "" { + if err := t.store.Get(txRetryKey, &txState); err != nil { + return common.Hash{}, nil, err + } + } + + if request == nil { + request = &TxRequest{ + To: txState.To, + Data: txState.Data, + GasLimit: txState.GasLimit, + Value: txState.Value, + Description: txState.Description, + } + } + + loggerV1 := t.logger.V(1).Register() + loggerV1.Debug("send with retry: started", + "description", request.Description, + "to", retryToForLog(request, &txState), + "max_retries", t.txMaxRetries, + "retry_delay", t.txRetryDelay, + "gas_increase_percent", t.txRetryGasIncreasePercent, + "resume_from_attempt", txState.NextAttempt, + "nonce_assigned", txState.NonceAssigned, + "previous_tip", txState.PreviousTip) + + for attempt := txState.NextAttempt; attempt < t.txMaxRetries; attempt++ { + if txState.NonceAssigned { + nonce = &txState.Nonce + } + + signedTx, err := t.broadcastTx(ctx, request, nonce, txState.PreviousTip, attempt) + if err != nil { + if isErrCritical(err) { + t.logger.Error(err, + "transaction with retry: broadcast failed with critical error, stop retry", + "attempt", attempt, "nonce", nonce, "to", retryToForLog(request, &txState)) + t.deleteRetryStateAndPending(txRetryKey, txState) + t.recordRetryComplete(txState.NextAttempt, err) + return common.Hash{}, nil, err + } + t.logger.Warning("transaction retry broadcast failed, will retry", "attempt", attempt, "error", err, "to", retryToForLog(request, &txState)) + } + + if err := t.updateStates(signedTx, &txState); err != nil { + t.logger.Error(err, + "transaction with retry: failed update states, stop retry", + "attempt", attempt, "nonce", nonce, "to", retryToForLog(request, &txState)) + t.deleteRetryStateAndPending(txRetryKey, txState) + t.recordRetryComplete(txState.NextAttempt, err) + return common.Hash{}, nil, err + } + + if txState.NonceAssigned { + txRetryKey = retryStateKey(txState.Nonce) + } + + loggerV1.Debug("send with retry: state updated", + "attempt", attempt, + "tx_hash", txState.LastTxHash, + "nonce", txState.Nonce, + "nonce_assigned", txState.NonceAssigned, + "previous_tip", txState.PreviousTip, + "description", request.Description) + + if txState.LastTxHash == (common.Hash{}) { + loggerV1.Debug("send with retry: no tx hash after broadcast failure, waiting before next attempt", + "attempt", attempt, + "retry_delay", t.txRetryDelay, + "description", request.Description) + select { + case <-ctx.Done(): + err := ctx.Err() + t.recordRetryComplete(txState.NextAttempt, err) + return common.Hash{}, nil, err + case <-time.After(t.txRetryDelay): + continue + } + } + + waitCtx, cancel := context.WithTimeout(ctx, t.txRetryDelay) + rec, waitErr := t.WaitForReceipt(waitCtx, txState.LastTxHash) + cancel() + + if waitErr == nil { + loggerV1.Debug("send with retry: receipt received", + "tx_hash", txState.LastTxHash, + "status", rec.Status, + "gas_used", rec.GasUsed, + "block_number", rec.BlockNumber, + "nonce", txState.Nonce, + "description", request.Description) + t.deleteRetryStateAndPending(txRetryKey, txState) + if rec.Status == 0 { + t.recordRetryComplete(txState.NextAttempt, ErrTransactionReverted) + return txState.LastTxHash, rec, ErrTransactionReverted + } + t.recordRetryComplete(txState.NextAttempt, nil) + return txState.LastTxHash, rec, nil + } else if isErrCritical(waitErr) { + t.logger.Error(waitErr, + "send with retry: wait for receipt failed with critical error, stop retry", + "attempt", attempt, + "tx_hash", txState.LastTxHash, + "nonce", txState.Nonce, + "description", request.Description) + t.deleteRetryStateAndPending(txRetryKey, txState) + t.recordRetryComplete(txState.NextAttempt, waitErr) + return common.Hash{}, nil, waitErr + } else { + loggerV1.Debug("send with retry: receipt not received, will escalate gas", + "attempt", attempt, + "tx_hash", txState.LastTxHash, + "nonce", txState.Nonce, + "wait_error", waitErr, + "description", request.Description) + } + } + + exhaustionErr := fmt.Errorf("transaction failed after %d attempts (nonce=%d, description=%s)", t.txMaxRetries, txState.Nonce, txState.Description) + t.logger.Error(exhaustionErr, + "send with retry: all attempts exhausted", + "max_retries", t.txMaxRetries, + "nonce", txState.Nonce, + "last_tx_hash", txState.LastTxHash, + "description", txState.Description) + t.deleteRetryStateAndPending(txRetryKey, txState) + t.recordRetryComplete(txState.NextAttempt, exhaustionErr) + return txState.LastTxHash, nil, exhaustionErr +} + +func (t *transactionService) updateStates(signedTx *types.Transaction, txState *TransactionRetryState) error { + if txState.LastTxHash != (common.Hash{}) { + txState.AllTxHashes = append(txState.AllTxHashes, txState.LastTxHash) + _ = t.store.Delete(pendingTransactionKey(txState.LastTxHash)) + } + + txState.NextAttempt++ + + if signedTx == nil { + txState.LastTxHash = common.Hash{} + } else { + txHash := signedTx.Hash() + now := time.Now().Unix() + + if err := t.store.Put(storedTransactionKey(txHash), StoredTransaction{ + To: signedTx.To(), + Data: signedTx.Data(), + GasPrice: signedTx.GasPrice(), + GasLimit: signedTx.Gas(), + GasTipCap: signedTx.GasTipCap(), + GasFeeCap: signedTx.GasFeeCap(), + Value: signedTx.Value(), + Nonce: signedTx.Nonce(), + Created: now, + }); err != nil { + return err + } + + if err := t.store.Put(pendingTransactionKey(txHash), struct{}{}); err != nil { + return err + } + + txState.LastTxHash = txHash + txState.PreviousTip = signedTx.GasTipCap() + + if !txState.NonceAssigned { + txState.Nonce = signedTx.Nonce() + txState.NonceAssigned = true + txState.GasLimit = signedTx.Gas() + txState.To = signedTx.To() + txState.Data = signedTx.Data() + txState.Value = signedTx.Value() + } + } + if txState.NonceAssigned { + return t.store.Put(retryStateKey(txState.Nonce), txState) + } + return nil +} + +func isErrCritical(err error) bool { + if errors.Is(err, ErrTransactionReverted) || + errors.Is(err, ErrTransactionCancelled) || + errors.Is(err, ErrSignTransaction) || + errors.Is(err, context.Canceled) { + return true + } + + s := err.Error() + nonRetryable := []string{ + "specified gas price", + "nonce too low", + "AlreadyCommitted", + "AlreadyRevealed", + "AlreadyClaimed", + "NotCommitPhase", + "NotRevealPhase", + "NotClaimPhase", + "CommitRoundOver", + "CommitRoundNotStarted", + "PhaseLastBlock", + "OutOfDepth", + "OutOfDepthReveal", + "OutOfDepthClaim", + "NotStaked", + "MustStake2Rounds", + "NoReveals", + "NoCommitsReceived", + "execution reverted", + "insufficient funds", + } + for _, sub := range nonRetryable { + if strings.Contains(s, sub) { + return true + } + } + return false +} + +func (t *transactionService) retryPendingHashes() (map[common.Hash]struct{}, error) { + out := make(map[common.Hash]struct{}) + err := t.store.Iterate(retryStatePrefix, func(key, val []byte) (stop bool, err error) { + var s TransactionRetryState + if uErr := json.Unmarshal(val, &s); uErr != nil { + return false, uErr + } + for _, h := range s.AllTxHashes { + out[h] = struct{}{} + } + if s.LastTxHash != (common.Hash{}) { + out[s.LastTxHash] = struct{}{} + } + return false, nil + }) + return out, err +} + +func (t *transactionService) resumeRetryTransactions() error { + var keys []string + var states []TransactionRetryState + err := t.store.Iterate(retryStatePrefix, func(key, val []byte) (stop bool, err error) { + var s TransactionRetryState + if uErr := json.Unmarshal(val, &s); uErr != nil { + return false, uErr + } + keys = append(keys, string(key)) + states = append(states, s) + return false, nil + }) + if err != nil { + return err + } + + confirmed, err := t.backend.NonceAt(t.ctx, t.sender, nil) + if err != nil { + t.logger.Warning("resume retry: failed to get confirmed nonce, resuming all", "error", err) + } + + loggerV1 := t.logger.V(1).Register() + loggerV1.Debug("resume retry: scanning persisted retry states", + "count", len(keys), + "confirmed_nonce", confirmed) + + for i := range keys { + key := keys[i] + state := states[i] + + if confirmed > state.Nonce { + loggerV1.Debug("resume retry: skipping already confirmed transaction", + "nonce", state.Nonce, + "confirmed_nonce", confirmed, + "description", state.Description) + t.deleteRetryStateAndPending(key, state) + continue + } + + loggerV1.Debug("resume retry: resuming persisted retry", + "nonce", state.Nonce, + "next_attempt", state.NextAttempt, + "last_tx_hash", state.LastTxHash, + "previous_tip", state.PreviousTip, + "description", state.Description) + + sk := key + st := state + t.wg.Go(func() { + if _, _, err := t.retry(t.ctx, sk, nil); err != nil { + t.logger.Error(err, "resumed transaction retry aborted", "nonce", st.Nonce, "description", st.Description) + } + }) + } + return nil +} + +func addressForLog(addr *common.Address) string { + if addr == nil { + return "" + } + return addr.Hex() +} + +func retryToForLog(req *TxRequest, state *TransactionRetryState) string { + if state != nil && state.To != nil { + return state.To.Hex() + } + if req != nil && req.To != nil { + return req.To.Hex() + } + return "" +} diff --git a/pkg/transaction/send_tx_with_retry_test.go b/pkg/transaction/send_tx_with_retry_test.go new file mode 100644 index 00000000000..ecd7a2b3161 --- /dev/null +++ b/pkg/transaction/send_tx_with_retry_test.go @@ -0,0 +1,872 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package transaction_test + +import ( + "context" + "errors" + "math/big" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + signermock "github.com/ethersphere/bee/v2/pkg/crypto/mock" + "github.com/ethersphere/bee/v2/pkg/log" + storemock "github.com/ethersphere/bee/v2/pkg/statestore/mock" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/transaction" + "github.com/ethersphere/bee/v2/pkg/transaction/backendmock" + "github.com/ethersphere/bee/v2/pkg/transaction/monitormock" + "github.com/ethersphere/bee/v2/pkg/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSuggestGasFeeGasTipCapWithHistory(t *testing.T) { + t.Parallel() + + const ( + baseFee = int64(1000) + tipBase = int64(100) + marketTip = int64(200) // tipBase * 2 + gasIncreasePct = 20 + prevTip = int64(1000) + escalatedTip = int64(1200) // prevTip * 1.2 + baseFeeCap = int64(2000) // baseFee * 2 + ) + + headerOption := func() backendmock.Option { + return backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + return &types.Header{BaseFee: big.NewInt(baseFee)}, nil + }) + } + + feeHistoryOption := func(called *atomic.Int32) backendmock.Option { + return backendmock.WithSuggestedFeeAndTipsFromHistoryFunc(func(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + if called != nil { + called.Add(1) + } + return &transaction.FeeHistorySuggestedFeeAndTips{ + LowTip: big.NewInt(tipBase), + MarketTip: big.NewInt(marketTip), + AggressiveTip: big.NewInt(tipBase * 3), + }, nil + }) + } + + t.Run("prevGasTipCap nil uses market tip from fee history", func(t *testing.T) { + t.Parallel() + + var feeHistoryCalls atomic.Int32 + backend := backendmock.New(headerOption(), feeHistoryOption(&feeHistoryCalls)) + + gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeGasTipCapWithHistory( + backend, gasIncreasePct, nil, context.Background(), nil, + ) + + require.NoError(t, err) + assert.Equal(t, int32(1), feeHistoryCalls.Load()) + assert.Equal(t, marketTip, gasTipCap.Int64()) + assert.Equal(t, baseFeeCap+marketTip, gasFeeCap.Int64()) + }) + + t.Run("escalates previous tip by configured percent", func(t *testing.T) { + t.Parallel() + + var feeHistoryCalls atomic.Int32 + backend := backendmock.New(headerOption(), feeHistoryOption(&feeHistoryCalls)) + + gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeGasTipCapWithHistory( + backend, gasIncreasePct, nil, context.Background(), big.NewInt(prevTip), + ) + + require.NoError(t, err) + assert.Equal(t, int32(0), feeHistoryCalls.Load(), "fee history must not be called when previous tip is set") + assert.Equal(t, escalatedTip, gasTipCap.Int64()) + assert.Equal(t, baseFeeCap+escalatedTip, gasFeeCap.Int64()) + }) + + t.Run("max tx price exceeded falls back to previous tip", func(t *testing.T) { + t.Parallel() + + // escalated: 2000+1200=3200, previous: 2000+1000=3000 + maxTxPrice := big.NewInt(baseFeeCap + prevTip + 100) + + backend := backendmock.New(headerOption()) + + gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeGasTipCapWithHistory( + backend, gasIncreasePct, maxTxPrice, context.Background(), big.NewInt(prevTip), + ) + + require.NoError(t, err) + assert.Equal(t, prevTip, gasTipCap.Int64(), "must fall back to previous tip without escalation") + assert.Equal(t, baseFeeCap+prevTip, gasFeeCap.Int64()) + }) + + t.Run("max tx price exceeded and previous tip also exceeds limit returns error", func(t *testing.T) { + t.Parallel() + + maxTxPrice := big.NewInt(baseFeeCap + prevTip - 1) + + backend := backendmock.New(headerOption()) + + gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeGasTipCapWithHistory( + backend, gasIncreasePct, maxTxPrice, context.Background(), big.NewInt(prevTip), + ) + + assert.ErrorIs(t, err, transaction.ErrTxMaxPriceExceeded) + assert.Nil(t, gasFeeCap) + assert.Nil(t, gasTipCap) + }) +} + +// capturedBroadcast records the parameters of a transaction as seen by SendTransaction. +type capturedBroadcast struct { + Nonce uint64 + GasTipCap *big.Int + GasFeeCap *big.Int + GasLimit uint64 + To *common.Address + Data []byte + Value *big.Int +} + +func captureTx(tx *types.Transaction) capturedBroadcast { + return capturedBroadcast{ + Nonce: tx.Nonce(), + GasTipCap: new(big.Int).Set(tx.GasTipCap()), + GasFeeCap: new(big.Int).Set(tx.GasFeeCap()), + GasLimit: tx.Gas(), + To: tx.To(), + Data: tx.Data(), + Value: new(big.Int).Set(tx.Value()), + } +} + +// assertTxDataUnchanged verifies that nonce, to, data, value, and gas limit +// are identical across all broadcast attempts (only fees should change). +func assertTxDataUnchanged(t *testing.T, broadcasts []capturedBroadcast) { + t.Helper() + for i := 1; i < len(broadcasts); i++ { + assert.Equal(t, broadcasts[0].Nonce, broadcasts[i].Nonce, + "attempt %d: nonce must not change across retries", i) + assert.Equal(t, broadcasts[0].To, broadcasts[i].To, + "attempt %d: To must not change across retries", i) + assert.Equal(t, broadcasts[0].Data, broadcasts[i].Data, + "attempt %d: Data must not change across retries", i) + assert.True(t, broadcasts[0].Value.Cmp(broadcasts[i].Value) == 0, + "attempt %d: Value must not change across retries (got %s, want %s)", i, broadcasts[i].Value, broadcasts[0].Value) + assert.Equal(t, broadcasts[0].GasLimit, broadcasts[i].GasLimit, + "attempt %d: GasLimit must not change across retries", i) + } +} + +// retryTestSetup holds shared constants and helpers for SendWithRetry tests. +type retryTestSetup struct { + sender common.Address + recipient common.Address + chainID *big.Int + nonce uint64 + txData []byte + value *big.Int + tipBase *big.Int // base value for fee tiers: LowTip=tipBase, MarketTip=tipBase*2, AggressiveTip=tipBase*3 + baseFee *big.Int + gasLimit uint64 +} + +func newRetryTestSetup() retryTestSetup { + return retryTestSetup{ + sender: common.HexToAddress("0xddff"), + recipient: common.HexToAddress("0xabcd"), + chainID: big.NewInt(5), + nonce: uint64(2), + txData: common.Hex2Bytes("abcdee"), + value: big.NewInt(1), + tipBase: big.NewInt(100), + baseFee: big.NewInt(1000), + gasLimit: uint64(50000), + } +} + +func (s retryTestSetup) expectedMarketTip() *big.Int { + return new(big.Int).Mul(s.tipBase, big.NewInt(2)) +} + +func (s retryTestSetup) expectedGasFeeCap(tip *big.Int) *big.Int { + return new(big.Int).Add(new(big.Int).Mul(s.baseFee, big.NewInt(2)), tip) +} + +func (s retryTestSetup) retryConfig() transaction.TransactionsRetryConfig { + return transaction.TransactionsRetryConfig{ + MaxRetries: 3, + RetryDelay: 50 * time.Millisecond, + GasIncreasePercent: 20, + MaxTxPrice: big.NewInt(100_000_000), + } +} + +func (s retryTestSetup) request() *transaction.TxRequest { + return &transaction.TxRequest{ + To: &s.recipient, + Data: s.txData, + Value: s.value, + GasLimit: s.gasLimit, + } +} + +func (s retryTestSetup) passThroughSigner() signermock.Option { + return signermock.WithSignTxFunc(func(tx *types.Transaction, chainID *big.Int) (*types.Transaction, error) { + return tx, nil + }) +} + +func (s retryTestSetup) signerAddr() signermock.Option { + return signermock.WithEthereumAddressFunc(func() (common.Address, error) { + return s.sender, nil + }) +} + +func (s retryTestSetup) feeHistoryOption(counter *atomic.Int32) backendmock.Option { + return backendmock.WithSuggestedFeeAndTipsFromHistoryFunc(func(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + if counter != nil { + counter.Add(1) + } + return &transaction.FeeHistorySuggestedFeeAndTips{ + LowTip: new(big.Int).Set(s.tipBase), + MarketTip: new(big.Int).Mul(s.tipBase, big.NewInt(2)), + AggressiveTip: new(big.Int).Mul(s.tipBase, big.NewInt(3)), + }, nil + }) +} + +func (s retryTestSetup) headerOption() backendmock.Option { + return backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + return &types.Header{BaseFee: new(big.Int).Set(s.baseFee)}, nil + }) +} + +func (s retryTestSetup) nonceOption() backendmock.Option { + var counter atomic.Uint64 + counter.Store(s.nonce) + return backendmock.WithPendingNonceAtFunc(func(ctx context.Context, account common.Address) (uint64, error) { + return counter.Add(1) - 1, nil + }) +} + +func (s retryTestSetup) estimateGasOption() backendmock.Option { + return backendmock.WithEstimateGasFunc(func(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { + return s.gasLimit, nil + }) +} + +// receiptWatchTimeout returns a monitor option that never returns a receipt (for testing timeout). +func receiptWatchTimeout() monitormock.Option { + return monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + return make(chan types.Receipt), make(chan error), nil + }) +} + +// receiptWatchErr returns a monitor option that returns an error on the error channel. +func receiptWatchErr(err error) monitormock.Option { + return monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + ch := make(chan error, 1) + ch <- err + return nil, ch, nil + }) +} + +// Broadcast returns critical error → immediate exit, verify tx was built correctly. +func TestSendWithRetry_BroadcastCriticalError(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var feeHistoryCalls atomic.Int32 + var broadcasts []capturedBroadcast + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(&feeHistoryCalls), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return errors.New("execution reverted") + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New(), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "execution reverted") + assert.Equal(t, common.Hash{}, txHash) + assert.Nil(t, receipt) + + assert.Equal(t, int32(1), feeHistoryCalls.Load(), "fee history must be called once") + + require.Len(t, broadcasts, 1, "exactly one broadcast before critical error") + marketTip := s.expectedMarketTip() + assert.Equal(t, marketTip.Int64(), broadcasts[0].GasTipCap.Int64(), + "initial tip must be MarketTip") + assert.Equal(t, s.expectedGasFeeCap(marketTip).Int64(), broadcasts[0].GasFeeCap.Int64(), + "gasFeeCap must be baseFee*2 + MarketTip") + assert.Equal(t, s.recipient, *broadcasts[0].To) + assert.Equal(t, s.txData, broadcasts[0].Data) + assert.Equal(t, s.value.Int64(), broadcasts[0].Value.Int64()) + assert.Equal(t, s.gasLimit, broadcasts[0].GasLimit) + + var rs transaction.TransactionRetryState + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(s.nonce), &rs), storage.ErrNotFound, + "retry state should be cleaned up after critical error") +} + +// WaitForReceipt returns critical error → immediate exit, verify tx params. +func TestSendWithRetry_WaitForReceiptCriticalError(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var feeHistoryCalls atomic.Int32 + var broadcasts []capturedBroadcast + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(&feeHistoryCalls), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New(receiptWatchErr(transaction.ErrTransactionCancelled)), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + _, _, err = svc.SendWithRetry(context.Background(), s.request()) + assert.Error(t, err) + + assert.Equal(t, int32(1), feeHistoryCalls.Load(), + "fee history called once: tip was set after first broadcast, no more calls needed") + + require.Len(t, broadcasts, 1, "exactly one broadcast before critical wait error") + marketTip := s.expectedMarketTip() + assert.Equal(t, marketTip.Int64(), broadcasts[0].GasTipCap.Int64(), + "initial tip must be MarketTip") + assert.Equal(t, s.expectedGasFeeCap(marketTip).Int64(), broadcasts[0].GasFeeCap.Int64(), + "gasFeeCap must be baseFee*2 + MarketTip") + + var rs transaction.TransactionRetryState + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(s.nonce), &rs), storage.ErrNotFound, + "retry state should be cleaned up after critical WaitForReceipt error") +} + +// updateStates returns any error → immediate exit. +func TestSendWithRetry_UpdateStateError(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + + putErr := errors.New("disk write failed") + callCount := 0 + failingStore := &failOnNthPutStore{ + StateStorer: storemock.NewStateStore(), + failOnPut: 1, + putErr: putErr, + callCount: &callCount, + } + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + failingStore, + s.chainID, + monitormock.New(), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + _, _, err = svc.SendWithRetry(context.Background(), s.request()) + assert.ErrorIs(t, err, putErr) +} + +// First broadcast fails (non-critical, signedTx nil because prepare fails), second succeeds. +// On the first attempt HeaderByNumber fails → prepareTransactionWithRetry fails → broadcastTxWithRetry +// returns (nil, err) with a non-critical error. +// UpdateStates receives nil signedTx → state is not updated, only number of attempt increased in-memory +// After retry delay, second broadcast attempt succeeds → receipt → exit. +func TestSendWithRetry_NonCriticalThenSuccess(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var headerCalls atomic.Int32 + var feeHistoryCalls atomic.Int32 + var broadcasts []capturedBroadcast + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(&feeHistoryCalls), + s.estimateGasOption(), + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + n := headerCalls.Add(1) + if n == 1 { + return nil, errors.New("temporary RPC error") + } + return &types.Header{BaseFee: new(big.Int).Set(s.baseFee)}, nil + }), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + }), + ), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + require.NoError(t, err) + assert.NotEqual(t, common.Hash{}, txHash) + require.NotNil(t, receipt) + assert.Equal(t, uint64(1), receipt.Status) + + assert.GreaterOrEqual(t, int(headerCalls.Load()), 2, "should have retried after non-critical error") + assert.Equal(t, int32(1), feeHistoryCalls.Load(), + "fee history called once: first attempt failed at HeaderByNumber before reaching fee history") + + require.Len(t, broadcasts, 1, "only one successful broadcast (first attempt failed before SendTransaction)") + marketTip := s.expectedMarketTip() + assert.Equal(t, marketTip.Int64(), broadcasts[0].GasTipCap.Int64(), + "tip must be MarketTip (no previous tip was set since first attempt failed)") + assert.Equal(t, s.expectedGasFeeCap(marketTip).Int64(), broadcasts[0].GasFeeCap.Int64(), + "gasFeeCap must be baseFee*2 + MarketTip") + assert.Equal(t, s.recipient, *broadcasts[0].To) + assert.Equal(t, s.txData, broadcasts[0].Data) + + var rs transaction.TransactionRetryState + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(broadcasts[0].Nonce), &rs), storage.ErrNotFound, + "retry state should be cleaned up on success") + assert.ErrorIs(t, store.Get(transaction.PendingTransactionKey(txHash), &struct{}{}), storage.ErrNotFound, + "pending tx entry should be cleaned up on success") +} + +// First broadcast OK, receipt not found (timeout), second broadcast with escalated gas → receipt found. +// Verifies exact fee values, nonce immutability, and tx data immutability. +func TestSendWithRetry_EscalateGasThenSuccess(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var broadcastCount atomic.Int32 + var feeHistoryCalls atomic.Int32 + var broadcasts []capturedBroadcast + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(&feeHistoryCalls), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcastCount.Add(1) + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + if broadcastCount.Load() <= 1 { + return make(chan types.Receipt), make(chan error), nil + } + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + }), + ), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + require.NoError(t, err) + assert.NotEqual(t, common.Hash{}, txHash) + require.NotNil(t, receipt) + assert.Equal(t, uint64(1), receipt.Status) + + require.Len(t, broadcasts, 2, "should have exactly 2 broadcast attempts") + + assertTxDataUnchanged(t, broadcasts) + + marketTip := s.expectedMarketTip() + assert.Equal(t, marketTip.Int64(), broadcasts[0].GasTipCap.Int64(), + "first attempt must use MarketTip from fee history") + assert.Equal(t, s.expectedGasFeeCap(marketTip).Int64(), broadcasts[0].GasFeeCap.Int64(), + "first attempt gasFeeCap = baseFee*2 + MarketTip") + + escalatedTip := transaction.EscalateGasTip(marketTip, 20) + assert.Equal(t, escalatedTip.Int64(), broadcasts[1].GasTipCap.Int64(), + "second attempt must use escalated tip (MarketTip * 1.2)") + assert.Equal(t, s.expectedGasFeeCap(escalatedTip).Int64(), broadcasts[1].GasFeeCap.Int64(), + "second attempt gasFeeCap = baseFee*2 + escalated tip") + + assert.Equal(t, int32(1), feeHistoryCalls.Load(), + "fee history called once: PreviousTip known after first broadcast, retries use escalated tip") + + var rs transaction.TransactionRetryState + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(broadcasts[0].Nonce), &rs), storage.ErrNotFound, + "retry state should be cleaned up on success") +} + +// All attempts exhausted, receipt never found → error. +// Verifies compound escalation chain, nonce immutability, and gasFeeCap on every attempt. +func TestSendWithRetry_AllAttemptsExhausted(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var feeHistoryCalls atomic.Int32 + var broadcasts []capturedBroadcast + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(&feeHistoryCalls), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New(receiptWatchTimeout()), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "transaction failed after 3 attempts") + assert.NotEqual(t, common.Hash{}, txHash, "should return last tx hash even on exhaustion") + assert.Nil(t, receipt) + + require.Len(t, broadcasts, 3, "should have made exactly maxRetries attempts") + + assertTxDataUnchanged(t, broadcasts) + + tip0 := s.expectedMarketTip() // tipBase*2 = 200 + tip1 := transaction.EscalateGasTip(tip0, 20) // 240 + tip2 := transaction.EscalateGasTip(tip1, 20) // 288 + expectedTips := []*big.Int{tip0, tip1, tip2} + + for i, expectedTip := range expectedTips { + assert.Equal(t, expectedTip.Int64(), broadcasts[i].GasTipCap.Int64(), + "attempt %d: tip must match compound escalation chain", i) + assert.Equal(t, s.expectedGasFeeCap(expectedTip).Int64(), broadcasts[i].GasFeeCap.Int64(), + "attempt %d: gasFeeCap must be baseFee*2 + tip", i) + } + + assert.Equal(t, int32(1), feeHistoryCalls.Load(), + "fee history called once: PreviousTip known after first broadcast, retries use escalated tip") + + var rs transaction.TransactionRetryState + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(broadcasts[0].Nonce), &rs), storage.ErrNotFound, + "retry state should be cleaned up after exhaustion") +} + +// Resume after node restart — transaction is re-sent starting from persisted attempt. +// Verifies nonce, escalated tip, gasFeeCap, and that fee history is NOT called. +func TestSendWithRetry_ResumeAfterRestart(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + previousTip := new(big.Int).Set(s.tipBase) + lastTxHash := common.HexToHash("0xdeadbeef") + + priorState := transaction.TransactionRetryState{ + Nonce: s.nonce, + NonceAssigned: true, + NextAttempt: 1, + LastTxHash: lastTxHash, + AllTxHashes: nil, + GasLimit: s.gasLimit, + To: &s.recipient, + Data: s.txData, + Value: s.value, + Description: "test-resume", + PreviousTip: previousTip, + } + + retryKey := transaction.RetryStateKey(s.nonce) + require.NoError(t, store.Put(retryKey, priorState)) + + require.NoError(t, store.Put(transaction.StoredTransactionKey(lastTxHash), transaction.StoredTransaction{ + To: &s.recipient, + Data: s.txData, + GasLimit: s.gasLimit, + Value: s.value, + Nonce: s.nonce, + GasTipCap: previousTip, + GasFeeCap: big.NewInt(5000), + GasPrice: big.NewInt(0), + Created: time.Now().Unix(), + })) + require.NoError(t, store.Put(transaction.PendingTransactionKey(lastTxHash), struct{}{})) + + var ( + broadcastsMu sync.Mutex + broadcasts []capturedBroadcast + ) + var feeHistoryCalls atomic.Int32 + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.feeHistoryOption(&feeHistoryCalls), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithPendingNonceAtFunc(func(ctx context.Context, account common.Address) (uint64, error) { + return s.nonce, nil + }), + backendmock.WithNonceAtFunc(func(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + return s.nonce, nil + }), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + captured := captureTx(tx) + broadcastsMu.Lock() + broadcasts = append(broadcasts, captured) + broadcastsMu.Unlock() + return nil + }), + backendmock.WithTransactionByHashFunc(func(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) { + return nil, false, ethereum.NotFound + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + }), + ), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + require.Eventually(t, func() bool { + broadcastsMu.Lock() + defer broadcastsMu.Unlock() + return len(broadcasts) > 0 + }, 5*time.Second, 10*time.Millisecond, "resume should have triggered a broadcast") + + require.NoError(t, svc.Close()) + + broadcastsMu.Lock() + require.Len(t, broadcasts, 1) + resumed := broadcasts[0] + gasTipCap := new(big.Int).Set(resumed.GasTipCap) + gasFeeCap := new(big.Int).Set(resumed.GasFeeCap) + resumedNonce := resumed.Nonce + broadcastsMu.Unlock() + + assert.Equal(t, s.nonce, resumedNonce, "resumed transaction must use the same nonce") + + expectedTip := transaction.EscalateGasTip(previousTip, 20) + assert.Equal(t, expectedTip.Int64(), gasTipCap.Int64(), + "resumed transaction should use escalated tip (one step from persisted PreviousTip)") + + assert.Equal(t, s.expectedGasFeeCap(expectedTip).Int64(), gasFeeCap.Int64(), + "resumed gasFeeCap must be baseFee*2 + escalated tip") + + assert.Equal(t, int32(0), feeHistoryCalls.Load(), + "fee history should NOT be called on resume — tip is restored from persisted state") + + var rs transaction.TransactionRetryState + assert.Eventually(t, func() bool { + return errors.Is(store.Get(retryKey, &rs), storage.ErrNotFound) + }, 5*time.Second, 10*time.Millisecond, "retry state should be cleaned up after success") +} + +// MaxTxPrice cap prevents escalation beyond the configured limit. +func TestSendWithRetry_MaxTxPriceCap(t *testing.T) { + t.Parallel() + + t.Run("escalation capped to previous tip", func(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + marketTip := s.expectedMarketTip() // 200 + // Set maxTxPrice so that first attempt (baseFee*2 + MarketTip = 2200) fits, + // but escalated (baseFee*2 + 240 = 2240) exceeds the cap. + maxTxPrice := new(big.Int).Add(s.expectedGasFeeCap(marketTip), big.NewInt(10)) // 2210 + + cfg := s.retryConfig() + cfg.MaxTxPrice = maxTxPrice + + var broadcasts []capturedBroadcast + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New(receiptWatchTimeout()), + 0, + cfg, + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + _, _, err = svc.SendWithRetry(context.Background(), s.request()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "transaction failed after 3 attempts") + + require.Len(t, broadcasts, 3, "all 3 attempts should have been sent") + + assertTxDataUnchanged(t, broadcasts) + + for i, bc := range broadcasts { + assert.Equal(t, marketTip.Int64(), bc.GasTipCap.Int64(), + "attempt %d: tip must stay at MarketTip (escalation capped by maxTxPrice)", i) + assert.Equal(t, s.expectedGasFeeCap(marketTip).Int64(), bc.GasFeeCap.Int64(), + "attempt %d: gasFeeCap must be baseFee*2 + MarketTip (capped)", i) + } + }) + + t.Run("exceeds limit on first attempt", func(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + marketTip := s.expectedMarketTip() + // Set maxTxPrice below baseFee*2 + MarketTip so even the first attempt fails. + maxTxPrice := new(big.Int).Sub(s.expectedGasFeeCap(marketTip), big.NewInt(1)) // 2199 + + cfg := s.retryConfig() + cfg.MaxTxPrice = maxTxPrice + + var broadcasts []capturedBroadcast + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New(receiptWatchTimeout()), + 0, + cfg, + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + _, _, err = svc.SendWithRetry(context.Background(), s.request()) + assert.Error(t, err) + assert.Len(t, broadcasts, 0, + "no transaction should be sent when maxTxPrice is below the minimum fee") + }) +} + +// failOnNthPutStore wraps a StateStorer and fails the Nth Put call with putErr. +type failOnNthPutStore struct { + storage.StateStorer + failOnPut int + putErr error + callCount *int +} + +func (s *failOnNthPutStore) Put(key string, i any) error { + *s.callCount++ + if *s.callCount >= s.failOnPut { + return s.putErr + } + return s.StateStorer.Put(key, i) +} diff --git a/pkg/transaction/transaction.go b/pkg/transaction/transaction.go index 68fc74d6f53..cdc927f83c7 100644 --- a/pkg/transaction/transaction.go +++ b/pkg/transaction/transaction.go @@ -40,6 +40,9 @@ var ( ErrTransactionReverted = errors.New("transaction reverted") ErrUnknownTransaction = errors.New("unknown transaction") ErrAlreadyImported = errors.New("already imported") + ErrTxMaxPriceExceeded = errors.New("transaction retry: exceeds maximum tx price (max fee per gas)") + // ErrSignTransaction is returned when signing a transaction fails. + ErrSignTransaction = errors.New("sign transaction") ) const ( @@ -49,8 +52,37 @@ const ( MinGasLimit = 21_000 // Minimum gas for any transaction GasBufferPercent = 33 // Add 33% buffer to estimated gas FallbackGasLimit = 500_000 // Fallback when estimation fails and no minimum is set + + // DefaultSendWithRetryAttempts is the default maximum number of broadcast rounds for SendWithRetry. + DefaultSendWithRetryAttempts = 5 + // DefaultSendWithRetryDelay is the default wait for a receipt before escalating fees in SendWithRetry. + DefaultSendWithRetryDelay = 1 * time.Minute + // DefaultTransactionRetryGasIncreasePercent is the default percent increase applied to priority fee after each retry step. + DefaultTransactionRetryGasIncreasePercent = 20 ) +// TransactionsRetryConfig configures SendWithRetry behaviour. Zero values are replaced by defaults in NewService. +type TransactionsRetryConfig struct { + MaxRetries int + RetryDelay time.Duration + GasIncreasePercent int + MaxTxPrice *big.Int +} + +// normalizeServiceRetryConfig fills zero fields with package defaults. +func normalizeServiceRetryConfig(c TransactionsRetryConfig) TransactionsRetryConfig { + if c.MaxRetries <= 0 { + c.MaxRetries = DefaultSendWithRetryAttempts + } + if c.RetryDelay <= 0 { + c.RetryDelay = DefaultSendWithRetryDelay + } + if c.GasIncreasePercent <= 0 { + c.GasIncreasePercent = DefaultTransactionRetryGasIncreasePercent + } + return c +} + // TxRequest describes a request for a transaction that can be executed. type TxRequest struct { To *common.Address // recipient of the transaction @@ -83,6 +115,8 @@ type Service interface { io.Closer // Send creates a transaction based on the request (with gasprice increased by provided percentage) and sends it. Send(ctx context.Context, request *TxRequest, tipCapBoostPercent int) (txHash common.Hash, err error) + // SendWithRetry sends a transaction using fee-history tiers and automatic fee escalation; see send_tx_with_retry.go. + SendWithRetry(ctx context.Context, request *TxRequest) (txHash common.Hash, receipt *types.Receipt, err error) // Call simulate a transaction based on the request. Call(ctx context.Context, request *TxRequest) (result []byte, err error) // WaitForReceipt waits until either the transaction with the given hash has been mined or the context is cancelled. @@ -122,10 +156,17 @@ type transactionService struct { chainID *big.Int monitor Monitor fallbackGasLimit uint64 + + txMaxRetries int + txRetryDelay time.Duration + txRetryGasIncreasePercent int + maxTxPrice *big.Int + + metrics transactionsWithRetryMetrics } // NewService creates a new transaction service. -func NewService(logger log.Logger, overlayEthAddress common.Address, backend Backend, signer crypto.Signer, store storage.StateStorer, chainID *big.Int, monitor Monitor, fallbackGasLimit uint64) (Service, error) { +func NewService(logger log.Logger, overlayEthAddress common.Address, backend Backend, signer crypto.Signer, store storage.StateStorer, chainID *big.Int, monitor Monitor, fallbackGasLimit uint64, retryCfg TransactionsRetryConfig) (Service, error) { senderAddress, err := signer.EthereumAddress() if err != nil { return nil, err @@ -135,35 +176,65 @@ func NewService(logger log.Logger, overlayEthAddress common.Address, backend Bac fallbackGasLimit = FallbackGasLimit } + rc := normalizeServiceRetryConfig(retryCfg) + ctx, cancel := context.WithCancel(context.Background()) + logger.Info("transaction retry configuration", + "max_retries", rc.MaxRetries, + "retry_delay", rc.RetryDelay, + "gas_increase_percent", rc.GasIncreasePercent, + "max_tx_price_wei", rc.MaxTxPrice, + ) t := &transactionService{ - ctx: ctx, - cancel: cancel, - logger: logger.WithName(loggerName).WithValues("sender_address", overlayEthAddress).Register(), - backend: backend, - signer: signer, - sender: senderAddress, - store: store, - chainID: chainID, - monitor: monitor, - fallbackGasLimit: fallbackGasLimit, + ctx: ctx, + cancel: cancel, + logger: logger.WithName(loggerName).WithValues("sender_address", overlayEthAddress).Register(), + backend: backend, + signer: signer, + sender: senderAddress, + store: store, + chainID: chainID, + monitor: monitor, + fallbackGasLimit: fallbackGasLimit, + txMaxRetries: rc.MaxRetries, + txRetryDelay: rc.RetryDelay, + txRetryGasIncreasePercent: rc.GasIncreasePercent, + maxTxPrice: rc.MaxTxPrice, + metrics: newRetryMetrics(), } if err = t.waitForAllPendingTx(); err != nil { return nil, err } + if err = t.resumeRetryTransactions(); err != nil { + return nil, err + } + return t, nil } func (t *transactionService) waitForAllPendingTx() error { + retryHashes, err := t.retryPendingHashes() + if err != nil { + return err + } + pendingTxs, err := t.PendingTransactions() if err != nil { return err } - pending := t.filterPendingTransactions(t.ctx, pendingTxs) + nonRetry := make([]common.Hash, 0, len(pendingTxs)) + for _, txHash := range pendingTxs { + if _, skip := retryHashes[txHash]; skip { + continue + } + nonRetry = append(nonRetry, txHash) + } + + pending := t.filterPendingTransactions(t.ctx, nonRetry) for txHash := range pending { t.waitForPendingTx(txHash) @@ -278,9 +349,7 @@ func (t *transactionService) StoredTransaction(txHash common.Hash) (*StoredTrans return &tx, nil } -// prepareTransaction creates a signable transaction based on a request. -func (t *transactionService) prepareTransaction(ctx context.Context, request *TxRequest, nonce uint64, boostPercent int) (tx *types.Transaction, err error) { - var gasLimit uint64 +func (t *transactionService) estimateGasLimit(ctx context.Context, request *TxRequest) (gasLimit uint64, err error) { if request.GasLimit == 0 { // Estimate gas using pending state for consistency with PendingNonceAt gasLimit, err = t.backend.EstimateGas(ctx, ethereum.CallMsg{ @@ -330,7 +399,16 @@ func (t *transactionService) prepareTransaction(ctx context.Context, request *Tx } if gasLimit == 0 { - return nil, errors.New("gas limit cannot be zero") + return 0, errors.New("gas limit cannot be zero") + } + return gasLimit, nil +} + +// prepareTransaction creates a signable transaction based on a request. +func (t *transactionService) prepareTransaction(ctx context.Context, request *TxRequest, nonce uint64, boostPercent int) (tx *types.Transaction, err error) { + gasLimit, err := t.estimateGasLimit(ctx, request) + if err != nil { + return nil, err } /* diff --git a/pkg/transaction/transaction_test.go b/pkg/transaction/transaction_test.go index 0a43b055d98..a1668dc6a44 100644 --- a/pkg/transaction/transaction_test.go +++ b/pkg/transaction/transaction_test.go @@ -176,6 +176,7 @@ func TestTransactionSend(t *testing.T) { }), ), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -257,6 +258,7 @@ func TestTransactionSend(t *testing.T) { }), ), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -344,6 +346,7 @@ func TestTransactionSend(t *testing.T) { }), ), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -423,6 +426,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -489,6 +493,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -556,6 +561,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -625,6 +631,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -694,6 +701,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -766,6 +774,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -840,6 +849,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -914,6 +924,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -983,6 +994,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -1052,6 +1064,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -1124,6 +1137,7 @@ func TestTransactionWaitForReceipt(t *testing.T) { }), ), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -1198,6 +1212,7 @@ func TestTransactionResend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -1286,6 +1301,7 @@ func TestTransactionCancel(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -1337,6 +1353,7 @@ func TestTransactionCancel(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -1423,6 +1440,7 @@ func TestTransactionService_UnwrapABIError(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) diff --git a/pkg/transaction/wrapped/fee.go b/pkg/transaction/wrapped/fee.go index dd8f5b80e80..2e382544d7a 100644 --- a/pkg/transaction/wrapped/fee.go +++ b/pkg/transaction/wrapped/fee.go @@ -9,6 +9,8 @@ import ( "errors" "fmt" "math/big" + + "github.com/ethersphere/bee/v2/pkg/transaction" ) const ( @@ -81,3 +83,20 @@ func (b *wrappedBackend) SuggestedFeeAndTip(ctx context.Context, gasPrice *big.I return gasFeeCap, gasTipCap, nil } + +func (b *wrappedBackend) SuggestedFeeAndTipsFromHistory(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + fh, err := b.FeeHistory(ctx, b.feeHistoryBlockCount, lastBlock, b.feeHistoryRewardPercentiles) + if err != nil { + return nil, err + } + low, market, aggressive, err := suggestedFeesFromFeeHistoryResult(fh) + if err != nil { + b.metrics.FeeHistoryParseErrors.Inc() + return nil, err + } + return &transaction.FeeHistorySuggestedFeeAndTips{ + LowTip: low, + MarketTip: market, + AggressiveTip: aggressive, + }, nil +} diff --git a/pkg/transaction/wrapped/fee_history.go b/pkg/transaction/wrapped/fee_history.go new file mode 100644 index 00000000000..435ef340988 --- /dev/null +++ b/pkg/transaction/wrapped/fee_history.go @@ -0,0 +1,51 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package wrapped + +import ( + "errors" + "math/big" + "sort" + + "github.com/ethereum/go-ethereum" +) + +func suggestedFeesFromFeeHistoryResult(fh *ethereum.FeeHistory) (low, market, aggressive *big.Int, err error) { + if fh == nil { + return nil, nil, nil, errors.New("fee history: empty response") + } + if len(fh.BaseFee) == 0 { + return nil, nil, nil, errors.New("fee history: no base fees") + } + low = medianPriorityTipAtPercentileIndex(fh.Reward, 0) + market = medianPriorityTipAtPercentileIndex(fh.Reward, 1) + aggressive = medianPriorityTipAtPercentileIndex(fh.Reward, 2) + return low, market, aggressive, nil +} + +func medianPriorityTipAtPercentileIndex(reward [][]*big.Int, idx int) *big.Int { + var vals []*big.Int + for _, row := range reward { + if idx >= len(row) { + continue + } + if row[idx] == nil { + continue + } + vals = append(vals, new(big.Int).Set(row[idx])) + } + if len(vals) == 0 { + return big.NewInt(0) + } + sort.Slice(vals, func(i, j int) bool { + return vals[i].Cmp(vals[j]) < 0 + }) + mid := len(vals) / 2 + if len(vals)%2 == 0 { + sum := new(big.Int).Add(vals[mid-1], vals[mid]) + return sum.Div(sum, big.NewInt(2)) + } + return vals[mid] +} diff --git a/pkg/transaction/wrapped/fee_history_test.go b/pkg/transaction/wrapped/fee_history_test.go new file mode 100644 index 00000000000..87c988acfc0 --- /dev/null +++ b/pkg/transaction/wrapped/fee_history_test.go @@ -0,0 +1,39 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package wrapped + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum" +) + +func TestSuggestedFeeAndTipsFromFeeHistoryResult(t *testing.T) { + t.Parallel() + + base := big.NewInt(1000) + fh := ðereum.FeeHistory{ + BaseFee: []*big.Int{big.NewInt(1), base}, + Reward: [][]*big.Int{ + {big.NewInt(10), big.NewInt(50), big.NewInt(90)}, + {big.NewInt(20), big.NewInt(60), big.NewInt(100)}, + }, + } + + low, market, agg, err := suggestedFeesFromFeeHistoryResult(fh) + if err != nil { + t.Fatal(err) + } + if got, want := low.String(), "15"; got != want { + t.Fatalf("low: got %s want %s", got, want) + } + if got, want := market.String(), "55"; got != want { + t.Fatalf("market: got %s want %s", got, want) + } + if got, want := agg.String(), "95"; got != want { + t.Fatalf("aggressive: got %s want %s", got, want) + } +} diff --git a/pkg/transaction/wrapped/fee_test.go b/pkg/transaction/wrapped/fee_test.go index 33ddc31da88..57585d4cc7e 100644 --- a/pkg/transaction/wrapped/fee_test.go +++ b/pkg/transaction/wrapped/fee_test.go @@ -108,6 +108,8 @@ func TestSuggestedFeeAndTip(t *testing.T) { minimumGasTipCap, 5*time.Second, 90, + 0, + nil, ) gasFeeCap, gasTipCap, err := backend.SuggestedFeeAndTip(ctx, tc.gasPrice, tc.boostPercent) diff --git a/pkg/transaction/wrapped/metrics.go b/pkg/transaction/wrapped/metrics.go index 0570cc813d3..6e435f5f1f8 100644 --- a/pkg/transaction/wrapped/metrics.go +++ b/pkg/transaction/wrapped/metrics.go @@ -26,6 +26,8 @@ type metrics struct { SendTransactionCalls prometheus.Counter FilterLogsCalls prometheus.Counter ChainIDCalls prometheus.Counter + FeeHistoryCalls prometheus.Counter + FeeHistoryParseErrors prometheus.Counter } func newMetrics() metrics { @@ -122,6 +124,18 @@ func newMetrics() metrics { Name: "calls_chain_id", Help: "Count of eth_chainId rpc calls", }), + FeeHistoryCalls: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "calls_fee_history", + Help: "Count of eth_feeHistory rpc calls", + }), + FeeHistoryParseErrors: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "fee_history_parse_errors", + Help: "Count of failures to derive suggested fees from fee history response", + }), } } diff --git a/pkg/transaction/wrapped/wrapped.go b/pkg/transaction/wrapped/wrapped.go index 723c405587e..99df3ca0ec3 100644 --- a/pkg/transaction/wrapped/wrapped.go +++ b/pkg/transaction/wrapped/wrapped.go @@ -8,6 +8,7 @@ import ( "context" "errors" "math/big" + "slices" "time" "github.com/ethereum/go-ethereum" @@ -20,18 +21,24 @@ import ( var _ transaction.Backend = (*wrappedBackend)(nil) +const feeHistoryDefaultBlockCount = 100 + +var feeHistoryDefaultRewardPercentiles = []float64{10, 50, 90} + type blockNumberAnchor struct { number uint64 timestamp time.Time } type wrappedBackend struct { - backend backend.Geth - metrics metrics - minimumGasTipCap int64 - blockTime time.Duration - blockSyncInterval uint64 - blockNumberCache *cache.SingleFlightCache[blockNumberAnchor] + backend backend.Geth + metrics metrics + minimumGasTipCap int64 + blockTime time.Duration + blockSyncInterval uint64 + blockNumberCache *cache.SingleFlightCache[blockNumberAnchor] + feeHistoryBlockCount uint64 + feeHistoryRewardPercentiles []float64 } func NewBackend( @@ -39,18 +46,33 @@ func NewBackend( minimumGasTipCap uint64, blockTime time.Duration, blockSyncInterval uint64, + feeHistoryBlockCount uint64, + feeHistoryRewardPercentiles []float64, ) transaction.Backend { if blockSyncInterval == 0 { blockSyncInterval = 1 } + if feeHistoryBlockCount == 0 { + feeHistoryBlockCount = feeHistoryDefaultBlockCount + } + + var rewardPerc []float64 + if len(feeHistoryRewardPercentiles) >= 3 { + rewardPerc = slices.Clone(feeHistoryRewardPercentiles) + } else { + rewardPerc = slices.Clone(feeHistoryDefaultRewardPercentiles) + } + return &wrappedBackend{ - backend: backend, - minimumGasTipCap: int64(minimumGasTipCap), - blockTime: blockTime, - metrics: newMetrics(), - blockSyncInterval: blockSyncInterval, - blockNumberCache: cache.NewSingleFlightCache[blockNumberAnchor]("block_number"), + backend: backend, + metrics: newMetrics(), + minimumGasTipCap: int64(minimumGasTipCap), + blockTime: blockTime, + blockSyncInterval: blockSyncInterval, + blockNumberCache: cache.NewSingleFlightCache[blockNumberAnchor]("block_number"), + feeHistoryBlockCount: feeHistoryBlockCount, + feeHistoryRewardPercentiles: rewardPerc, } } @@ -242,6 +264,17 @@ func (b *wrappedBackend) ChainID(ctx context.Context) (*big.Int, error) { return chainID, nil } +func (b *wrappedBackend) FeeHistory(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) { + b.metrics.TotalRPCCalls.Inc() + b.metrics.FeeHistoryCalls.Inc() + fh, err := b.backend.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) + if err != nil { + b.metrics.TotalRPCErrors.Inc() + return nil, err + } + return fh, nil +} + func (b *wrappedBackend) Close() { b.backend.Close() } diff --git a/pkg/transaction/wrapped/wrapped_test.go b/pkg/transaction/wrapped/wrapped_test.go index b0af463f6a6..0ff7c334cb7 100644 --- a/pkg/transaction/wrapped/wrapped_test.go +++ b/pkg/transaction/wrapped/wrapped_test.go @@ -188,6 +188,8 @@ func newTestWrappedBackend(t *testing.T, opts ...backendmock.Option) *wrappedBac testMinimumGasTipCap, testBlockTime, testBlockSyncInterval, + 0, + nil, ).(*wrappedBackend) assert.True(t, ok)