Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions sei-tendermint/internal/p2p/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"context"
"fmt"
"math"
"reflect"

"github.com/sei-protocol/sei-chain/sei-tendermint/internal/p2p/conn"
Expand All @@ -16,6 +17,12 @@ import (
type InBytes uint64
type InMsgs uint64

// recvAllocMultiplier scales each stream's inbound MsgSize to derive the
// per-message alloc limit passed to UnmarshalWithLimit. Load testing against
// the largest message (~2 MB wire for LaneProposal) showed its alloc estimate
// fits comfortably within 2× MsgSize.
const recvAllocMultiplier = 2.0

func (spec Msg[M]) Verify() error {
var msg M
if m := InBytes(msg.MaxSize()); m > spec.MsgSize { // nolint: gosec // MaxSize() >= 0
Expand Down Expand Up @@ -112,7 +119,7 @@ func (r *RPC[API, Req, Resp]) Call(ctx context.Context, client Client[API]) (Str
if err != nil {
return Stream[Req, Resp]{}, err
}
return Stream[Req, Resp]{inner: s}, nil
return Stream[Req, Resp]{inner: s, allocLimit: int(math.Round(float64(r.Resp.MsgSize) * recvAllocMultiplier))}, nil //nolint:gosec // MsgSize is a validated config value
}

func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], handler func(context.Context, Stream[Resp, Req]) error) error {
Expand All @@ -128,7 +135,7 @@ func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], han
if err != nil {
return err
}
err = handler(ctx, Stream[Resp, Req]{inner: stream})
err = handler(ctx, Stream[Resp, Req]{inner: stream, allocLimit: int(math.Round(float64(r.Req.MsgSize) * recvAllocMultiplier))}) //nolint:gosec // MsgSize is a validated config value
stream.Close()
if err != nil {
return err
Expand All @@ -141,7 +148,10 @@ func (r *RPC[API, Req, Resp]) Serve(ctx context.Context, server Server[API], han
})
}

type Stream[SendT, RecvT protoutils.Message] struct{ inner *mux.Stream }
type Stream[SendT, RecvT protoutils.Message] struct {
inner *mux.Stream
allocLimit int
}

func (s Stream[SendT, RecvT]) Close() { s.inner.Close() }
func (s Stream[SendT, RecvT]) Send(ctx context.Context, msg SendT) error {
Expand All @@ -152,5 +162,5 @@ func (s Stream[SendT, RecvT]) Recv(ctx context.Context) (RecvT, error) {
if err != nil {
return utils.Zero[RecvT](), err
}
return protoutils.Unmarshal[RecvT](raw)
return protoutils.UnmarshalWithLimit[RecvT](raw, s.allocLimit)
}
Loading