diff --git a/sei-tendermint/internal/p2p/rpc/rpc.go b/sei-tendermint/internal/p2p/rpc/rpc.go index 4a09658043..7ccb736372 100644 --- a/sei-tendermint/internal/p2p/rpc/rpc.go +++ b/sei-tendermint/internal/p2p/rpc/rpc.go @@ -3,6 +3,7 @@ package rpc import ( "context" "fmt" + "math" "reflect" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/conn" @@ -16,6 +17,12 @@ import ( type InBytes uint64 type InMsgs uint64 +// recvAllocMultiplier scales each stream's inbound MsgSize to derive the +// per-message alloc limit passed to UnmarshalWithLimit. Load testing against +// the largest message (~2 MB wire for LaneProposal) showed its alloc estimate +// fits comfortably within 2× MsgSize. +const recvAllocMultiplier = 2.0 + func (spec Msg[M]) Verify() error { var msg M if m := InBytes(msg.MaxSize()); m > spec.MsgSize { // nolint: gosec // MaxSize() >= 0 @@ -112,7 +119,7 @@ func (r *RPC[API, Req, Resp]) Call(ctx context.Context, client Client[API]) (Str if err != nil { return Stream[Req, Resp]{}, err } - return Stream[Req, Resp]{inner: s}, nil + return Stream[Req, Resp]{inner: s, allocLimit: int(math.Round(float64(r.Resp.MsgSize) * recvAllocMultiplier))}, nil //nolint:gosec // MsgSize is a validated config value } func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], handler func(context.Context, Stream[Resp, Req]) error) error { @@ -128,7 +135,7 @@ func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], han if err != nil { return err } - err = handler(ctx, Stream[Resp, Req]{inner: stream}) + err = handler(ctx, Stream[Resp, Req]{inner: stream, allocLimit: int(math.Round(float64(r.Req.MsgSize) * recvAllocMultiplier))}) //nolint:gosec // MsgSize is a validated config value stream.Close() if err != nil { return err @@ -141,7 +148,10 @@ func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], han }) } -type Stream[SendT, RecvT protoutils.Message] struct{ inner *mux.Stream } +type Stream[SendT, RecvT protoutils.Message] struct { + inner *mux.Stream + allocLimit int +} func (s Stream[SendT, RecvT]) Close() { s.inner.Close() } func (s Stream[SendT, RecvT]) Send(ctx context.Context, msg SendT) error { @@ -152,5 +162,5 @@ func (s Stream[SendT, RecvT]) Recv(ctx context.Context) (RecvT, error) { if err != nil { return utils.Zero[RecvT](), err } - return protoutils.Unmarshal[RecvT](raw) + return protoutils.UnmarshalWithLimit[RecvT](raw, s.allocLimit) }