From a97dd6d2116e259054fb2d9b01370c91968804bb Mon Sep 17 00:00:00 2001 From: Wen Date: Tue, 23 Jun 2026 14:20:06 -0700 Subject: [PATCH 1/4] feat(rpc): wire UnmarshalWithLimit into Stream.Recv with P2P config override Stream.Recv now calls protoutils.UnmarshalWithLimit instead of Unmarshal, bounding per-message heap allocation to MsgSize * allocMultiplier. The multiplier defaults to 2 (a value load-tested against the largest message, LaneProposal at ~2MB wire / ~4MB alloc estimate). It is configurable via P2PConfig.ProtoAllocLimitMultiplier and applied at node startup through rpc.SetAllocMultiplier. Co-Authored-By: Claude Sonnet 4.6 --- sei-tendermint/config/config.go | 5 +++++ sei-tendermint/internal/p2p/rpc/rpc.go | 28 ++++++++++++++++++++++---- sei-tendermint/node/setup.go | 4 ++++ 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/sei-tendermint/config/config.go b/sei-tendermint/config/config.go index ce47df8c84..0913ded8e8 100644 --- a/sei-tendermint/config/config.go +++ b/sei-tendermint/config/config.go @@ -722,6 +722,10 @@ type P2PConfig struct { // List of node IDs, from which a connection will be accepted regardless of the connection limits. UnconditionalPeerIDs string `mapstructure:"unconditional-peer-ids"` + + // ProtoAllocLimitMultiplier scales the per-stream inbound MsgSize to set + // the alloc limit for proto unmarshalling. A value of 0 uses the default (2). + ProtoAllocLimitMultiplier int `mapstructure:"proto-alloc-limit-multiplier"` } // DefaultP2PConfig returns a default configuration for the peer-to-peer layer @@ -743,6 +747,7 @@ func DefaultP2PConfig() *P2PConfig { DialInterval: 10 * time.Second, TestDialFail: false, QueueType: "simple-priority", + ProtoAllocLimitMultiplier: 2, } } diff --git a/sei-tendermint/internal/p2p/rpc/rpc.go b/sei-tendermint/internal/p2p/rpc/rpc.go index 4a09658043..bf7e070829 100644 --- a/sei-tendermint/internal/p2p/rpc/rpc.go +++ b/sei-tendermint/internal/p2p/rpc/rpc.go @@ -16,6 +16,23 @@ import ( type InBytes uint64 type InMsgs uint64 +// allocMultiplier scales each stream's inbound MsgSize to set the per-message +// alloc limit for UnmarshalWithLimit. Must be set once at startup before any +// streams are opened. +var allocMultiplier = 2 + +// SetAllocMultiplier overrides the default alloc multiplier (2). A value of 0 +// is treated as 1 (no scaling). Panics if m is negative. +func SetAllocMultiplier(m int) { + if m < 0 { + panic(fmt.Sprintf("rpc: SetAllocMultiplier: multiplier must be non-negative, got %d", m)) + } + if m == 0 { + m = 1 + } + allocMultiplier = m +} + func (spec Msg[M]) Verify() error { var msg M if m := InBytes(msg.MaxSize()); m > spec.MsgSize { // nolint: gosec // MaxSize() >= 0 @@ -112,7 +129,7 @@ func (r *RPC[API, Req, Resp]) Call(ctx context.Context, client Client[API]) (Str if err != nil { return Stream[Req, Resp]{}, err } - return Stream[Req, Resp]{inner: s}, nil + return Stream[Req, Resp]{inner: s, allocLimit: int(r.Resp.MsgSize) * allocMultiplier}, nil //nolint:gosec // MsgSize is a validated config value } func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], handler func(context.Context, Stream[Resp, Req]) error) error { @@ -128,7 +145,7 @@ func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], han if err != nil { return err } - err = handler(ctx, Stream[Resp, Req]{inner: stream}) + err = handler(ctx, Stream[Resp, Req]{inner: stream, allocLimit: int(r.Req.MsgSize) * allocMultiplier}) //nolint:gosec // MsgSize is a validated config value stream.Close() if err != nil { return err @@ -141,7 +158,10 @@ func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], han }) } -type Stream[SendT, RecvT protoutils.Message] struct{ inner *mux.Stream } +type Stream[SendT, RecvT protoutils.Message] struct { + inner *mux.Stream + allocLimit int +} func (s Stream[SendT, RecvT]) Close() { s.inner.Close() } func (s Stream[SendT, RecvT]) Send(ctx context.Context, msg SendT) error { @@ -152,5 +172,5 @@ func (s Stream[SendT, RecvT]) Recv(ctx context.Context) (RecvT, error) { if err != nil { return utils.Zero[RecvT](), err } - return protoutils.Unmarshal[RecvT](raw) + return protoutils.UnmarshalWithLimit[RecvT](raw, s.allocLimit) } diff --git a/sei-tendermint/node/setup.go b/sei-tendermint/node/setup.go index 2479a698c0..30a6743e4a 100644 --- a/sei-tendermint/node/setup.go +++ b/sei-tendermint/node/setup.go @@ -25,6 +25,7 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/conn" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/pex" + "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/rpc" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/proxy" sm "github.com/sei-protocol/sei-chain/sei-tendermint/internal/state" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/state/indexer" @@ -274,6 +275,9 @@ func createRouter( dbProvider config.DBProvider, ) (*p2p.Router, closer, error) { closer := func() error { return nil } + if m := cfg.P2P.ProtoAllocLimitMultiplier; m != 0 { + rpc.SetAllocMultiplier(m) + } ep, err := p2p.ResolveEndpoint(nodeKey.ID().AddressString(cfg.P2P.ListenAddress)) if err != nil { return nil, closer, err From 7c51eb3406224dd46f2b443f2d7b035856a91781 Mon Sep 17 00:00:00 2001 From: Wen Date: Tue, 23 Jun 2026 15:43:07 -0700 Subject: [PATCH 2/4] simplify: replace allocMultiplier var+setter with a constant Co-Authored-By: Claude Sonnet 4.6 --- sei-tendermint/config/config.go | 7 +------ sei-tendermint/internal/p2p/rpc/rpc.go | 25 +++++++------------------ sei-tendermint/node/setup.go | 4 ---- 3 files changed, 8 insertions(+), 28 deletions(-) diff --git a/sei-tendermint/config/config.go b/sei-tendermint/config/config.go index 0913ded8e8..f8422d292f 100644 --- a/sei-tendermint/config/config.go +++ b/sei-tendermint/config/config.go @@ -722,10 +722,6 @@ type P2PConfig struct { // List of node IDs, from which a connection will be accepted regardless of the connection limits. UnconditionalPeerIDs string `mapstructure:"unconditional-peer-ids"` - - // ProtoAllocLimitMultiplier scales the per-stream inbound MsgSize to set - // the alloc limit for proto unmarshalling. A value of 0 uses the default (2). - ProtoAllocLimitMultiplier int `mapstructure:"proto-alloc-limit-multiplier"` } // DefaultP2PConfig returns a default configuration for the peer-to-peer layer @@ -746,8 +742,7 @@ func DefaultP2PConfig() *P2PConfig { DialTimeout: 3 * time.Second, DialInterval: 10 * time.Second, TestDialFail: false, - QueueType: "simple-priority", - ProtoAllocLimitMultiplier: 2, + QueueType: "simple-priority", } } diff --git a/sei-tendermint/internal/p2p/rpc/rpc.go b/sei-tendermint/internal/p2p/rpc/rpc.go index bf7e070829..ff1fc2c48e 100644 --- a/sei-tendermint/internal/p2p/rpc/rpc.go +++ b/sei-tendermint/internal/p2p/rpc/rpc.go @@ -16,22 +16,11 @@ import ( type InBytes uint64 type InMsgs uint64 -// allocMultiplier scales each stream's inbound MsgSize to set the per-message -// alloc limit for UnmarshalWithLimit. Must be set once at startup before any -// streams are opened. -var allocMultiplier = 2 - -// SetAllocMultiplier overrides the default alloc multiplier (2). A value of 0 -// is treated as 1 (no scaling). Panics if m is negative. -func SetAllocMultiplier(m int) { - if m < 0 { - panic(fmt.Sprintf("rpc: SetAllocMultiplier: multiplier must be non-negative, got %d", m)) - } - if m == 0 { - m = 1 - } - allocMultiplier = m -} +// recvAllocMultiplier scales each stream's inbound MsgSize to derive the +// per-message alloc limit passed to UnmarshalWithLimit. Load testing against +// the largest message (~2 MB wire for LaneProposal) showed its alloc estimate +// fits comfortably within 2× MsgSize. +const recvAllocMultiplier = 2 func (spec Msg[M]) Verify() error { var msg M @@ -129,7 +118,7 @@ func (r *RPC[API, Req, Resp]) Call(ctx context.Context, client Client[API]) (Str if err != nil { return Stream[Req, Resp]{}, err } - return Stream[Req, Resp]{inner: s, allocLimit: int(r.Resp.MsgSize) * allocMultiplier}, nil //nolint:gosec // MsgSize is a validated config value + return Stream[Req, Resp]{inner: s, allocLimit: int(r.Resp.MsgSize) * recvAllocMultiplier}, nil //nolint:gosec // MsgSize is a validated config value } func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], handler func(context.Context, Stream[Resp, Req]) error) error { @@ -145,7 +134,7 @@ func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], han if err != nil { return err } - err = handler(ctx, Stream[Resp, Req]{inner: stream, allocLimit: int(r.Req.MsgSize) * allocMultiplier}) //nolint:gosec // MsgSize is a validated config value + err = handler(ctx, Stream[Resp, Req]{inner: stream, allocLimit: int(r.Req.MsgSize) * recvAllocMultiplier}) //nolint:gosec // MsgSize is a validated config value stream.Close() if err != nil { return err diff --git a/sei-tendermint/node/setup.go b/sei-tendermint/node/setup.go index 30a6743e4a..2479a698c0 100644 --- a/sei-tendermint/node/setup.go +++ b/sei-tendermint/node/setup.go @@ -25,7 +25,6 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/conn" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/pex" - "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/rpc" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/proxy" sm "github.com/sei-protocol/sei-chain/sei-tendermint/internal/state" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/state/indexer" @@ -275,9 +274,6 @@ func createRouter( dbProvider config.DBProvider, ) (*p2p.Router, closer, error) { closer := func() error { return nil } - if m := cfg.P2P.ProtoAllocLimitMultiplier; m != 0 { - rpc.SetAllocMultiplier(m) - } ep, err := p2p.ResolveEndpoint(nodeKey.ID().AddressString(cfg.P2P.ListenAddress)) if err != nil { return nil, closer, err From 156067557a64ef81236fd4d3f1f819d3535a381b Mon Sep 17 00:00:00 2001 From: Wen Date: Tue, 23 Jun 2026 15:49:15 -0700 Subject: [PATCH 3/4] fix: gofmt alignment in DefaultP2PConfig Co-Authored-By: Claude Sonnet 4.6 --- sei-tendermint/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sei-tendermint/config/config.go b/sei-tendermint/config/config.go index f8422d292f..ce47df8c84 100644 --- a/sei-tendermint/config/config.go +++ b/sei-tendermint/config/config.go @@ -742,7 +742,7 @@ func DefaultP2PConfig() *P2PConfig { DialTimeout: 3 * time.Second, DialInterval: 10 * time.Second, TestDialFail: false, - QueueType: "simple-priority", + QueueType: "simple-priority", } } From 2fdc4acbd4e23ce6ee94b2572b6dda97dc1b9e84 Mon Sep 17 00:00:00 2001 From: Wen Date: Tue, 23 Jun 2026 16:13:49 -0700 Subject: [PATCH 4/4] refactor(rpc): make recvAllocMultiplier a float, round result to int Co-Authored-By: Claude Sonnet 4.6 --- sei-tendermint/internal/p2p/rpc/rpc.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sei-tendermint/internal/p2p/rpc/rpc.go b/sei-tendermint/internal/p2p/rpc/rpc.go index ff1fc2c48e..7ccb736372 100644 --- a/sei-tendermint/internal/p2p/rpc/rpc.go +++ b/sei-tendermint/internal/p2p/rpc/rpc.go @@ -3,6 +3,7 @@ package rpc import ( "context" "fmt" + "math" "reflect" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/conn" @@ -20,7 +21,7 @@ type InMsgs uint64 // per-message alloc limit passed to UnmarshalWithLimit. Load testing against // the largest message (~2 MB wire for LaneProposal) showed its alloc estimate // fits comfortably within 2× MsgSize. -const recvAllocMultiplier = 2 +const recvAllocMultiplier = 2.0 func (spec Msg[M]) Verify() error { var msg M @@ -118,7 +119,7 @@ func (r *RPC[API, Req, Resp]) Call(ctx context.Context, client Client[API]) (Str if err != nil { return Stream[Req, Resp]{}, err } - return Stream[Req, Resp]{inner: s, allocLimit: int(r.Resp.MsgSize) * recvAllocMultiplier}, nil //nolint:gosec // MsgSize is a validated config value + return Stream[Req, Resp]{inner: s, allocLimit: int(math.Round(float64(r.Resp.MsgSize) * recvAllocMultiplier))}, nil //nolint:gosec // MsgSize is a validated config value } func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], handler func(context.Context, Stream[Resp, Req]) error) error { @@ -134,7 +135,7 @@ func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], han if err != nil { return err } - err = handler(ctx, Stream[Resp, Req]{inner: stream, allocLimit: int(r.Req.MsgSize) * recvAllocMultiplier}) //nolint:gosec // MsgSize is a validated config value + err = handler(ctx, Stream[Resp, Req]{inner: stream, allocLimit: int(math.Round(float64(r.Req.MsgSize) * recvAllocMultiplier))}) //nolint:gosec // MsgSize is a validated config value stream.Close() if err != nil { return err