Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions evmrpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ type Config struct {

// IPRateLimitBurst is the maximum per-IP burst size.
IPRateLimitBurst int `mapstructure:"ip_rate_limit_burst"`

// MaxOpenConnections caps the number of simultaneously accepted connections
// on the EVM HTTP and WebSocket listeners. The limit is applied per listener
// (HTTP and WS each get their own budget). Excess connections block in the
// accept queue until an active connection closes. Zero disables the limit.
MaxOpenConnections int `mapstructure:"max_open_connections"`
}

var DefaultConfig = Config{
Expand Down Expand Up @@ -217,6 +223,7 @@ var DefaultConfig = Config{
TraceBakeSnapshotWindow: 64,
IPRateLimitRPS: 200,
IPRateLimitBurst: 400,
MaxOpenConnections: 2000,
}

const (
Expand Down Expand Up @@ -261,6 +268,7 @@ const (
flagTraceBakeSnapshotWindow = "evm.trace_bake_snapshot_window"
flagIPRateLimitRPS = "evm.ip_rate_limit_rps"
flagIPRateLimitBurst = "evm.ip_rate_limit_burst"
flagMaxOpenConnections = "evm.max_open_connections"
)

func ReadConfig(opts servertypes.AppOptions) (Config, error) {
Expand Down Expand Up @@ -471,6 +479,11 @@ func ReadConfig(opts servertypes.AppOptions) (Config, error) {
return cfg, err
}
}
if v := opts.Get(flagMaxOpenConnections); v != nil {
if cfg.MaxOpenConnections, err = cast.ToIntE(v); err != nil {
return cfg, err
}
}
return cfg, nil
}

Expand Down Expand Up @@ -670,4 +683,8 @@ ip_rate_limit_rps = {{ .EVM.IPRateLimitRPS }}
# ip_rate_limit_burst is the maximum per-IP burst above the sustained rate.
ip_rate_limit_burst = {{ .EVM.IPRateLimitBurst }}
# max_open_connections caps the number of simultaneously accepted connections on
# the EVM HTTP and WebSocket listeners. Set to 0 to disable the limit.
max_open_connections = {{ .EVM.MaxOpenConnections }}
`
5 changes: 5 additions & 0 deletions evmrpc/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type opts struct {
workerQueueSize interface{}
ipRateLimitRPS interface{}
ipRateLimitBurst interface{}
maxOpenConnections interface{}
}

func (o *opts) Get(k string) interface{} {
Expand Down Expand Up @@ -160,6 +161,9 @@ func (o *opts) Get(k string) interface{} {
if k == "evm.ip_rate_limit_burst" {
return o.ipRateLimitBurst
}
if k == "evm.max_open_connections" {
return o.maxOpenConnections
}
panic("unknown key")
}

Expand Down Expand Up @@ -200,6 +204,7 @@ func getDefaultOpts() opts {
1000,
200.0,
400,
2000,
}
}

Expand Down
17 changes: 17 additions & 0 deletions evmrpc/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket"
"github.com/rs/cors"
"golang.org/x/net/netutil"
)

// HTTPConfig is the JSON-RPC/HTTP configuration.
Expand Down Expand Up @@ -91,6 +92,10 @@ type HTTPServer struct {
host string
port int

// maxOpenConns caps simultaneous accepted connections on the listener.
// Zero (the default) disables the limit.
maxOpenConns int

handlerNames map[string]string
}

Expand Down Expand Up @@ -123,6 +128,15 @@ func (h *HTTPServer) SetListenAddr(host string, port int) error {
return nil
}

// SetMaxOpenConns sets the maximum number of simultaneously accepted
// connections on the listener. A value <= 0
// leaves connections unbounded.
func (h *HTTPServer) SetMaxOpenConns(n int) {
h.mu.Lock()
defer h.mu.Unlock()
h.maxOpenConns = n
}

// ListenAddr returns the listening address of the server.
func (h *HTTPServer) ListenAddr() string {
h.mu.Lock()
Expand Down Expand Up @@ -161,6 +175,9 @@ func (h *HTTPServer) Start() error {
h.disableWS()
return err
}
if h.maxOpenConns > 0 {
listener = netutil.LimitListener(listener, h.maxOpenConns)
}
h.listener = listener
go func() {
if err := h.server.Serve(listener); !errors.Is(err, http.ErrServerClosed) {
Expand Down
52 changes: 52 additions & 0 deletions evmrpc/rpcstack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -580,6 +581,57 @@ func TestHTTPWriteTimeout(t *testing.T) {
})
}

// TestMaxOpenConns verifies that SetMaxOpenConns wraps the listener so that no
// more than the configured number of connections are accepted at once. With a
// cap of 1, a second connection is not served until the first one is closed.
func TestMaxOpenConns(t *testing.T) {
srv := evmrpc.NewHTTPServer(rpc.DefaultHTTPTimeouts)
srv.SetMaxOpenConns(1)
assert.NoError(t, srv.EnableRPC(apis(), evmrpc.HTTPConfig{}))
assert.NoError(t, srv.SetListenAddr("localhost", 0))
assert.NoError(t, srv.Start())
defer srv.Stop()

addr := srv.ListenAddr()

// Open the first connection and send only request headers advertising a body
// that never arrives. The server accepts it (consuming the single slot), and
// its serving goroutine blocks reading the body, holding the slot open.
c1, err := net.Dial("tcp", addr)
assert.NoError(t, err)
defer func() {
_ = c1.Close()
}()
_, err = c1.Write([]byte("POST / HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: 4096\r\n\r\n"))
assert.NoError(t, err)

// Give the accepting loop time to accept c1 and consume the slot.
time.Sleep(200 * time.Millisecond)

// While c1 holds the only slot, a second connection is not accepted, so a
// complete request over it receives no response before the read deadline.
body := `{"jsonrpc":"2.0","id":1,"method":"test_greet","params":[]}`
req := fmt.Sprintf("POST / HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: %d\r\n\r\n%s", len(body), body)
c2, err := net.DialTimeout("tcp", addr, time.Second)
assert.NoError(t, err)
defer func() {
_ = c2.Close()
}()
_, err = c2.Write([]byte(req))
assert.NoError(t, err)
assert.NoError(t, c2.SetReadDeadline(time.Now().Add(500*time.Millisecond)))
buf := make([]byte, 64)
_, err = c2.Read(buf)
assert.Error(t, err, "second connection should not be served while the slot is held")

// Closing c1 frees the slot; c2 is then accepted and served.
assert.NoError(t, c1.Close())
assert.NoError(t, c2.SetReadDeadline(time.Now().Add(5*time.Second)))
n, err := c2.Read(buf)
assert.NoError(t, err)
assert.Greater(t, n, 0)
}

func apis() []rpc.API {
return []rpc.API{
{
Expand Down
2 changes: 2 additions & 0 deletions evmrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func NewEVMHTTPServer(
IdleTimeout: config.IdleTimeout,
})
methodTimeout := tmutils.Some(httpServer.timeouts.WriteTimeout)
httpServer.SetMaxOpenConns(config.MaxOpenConnections)
if err := httpServer.SetListenAddr(LocalAddress, config.HTTPPort); err != nil {
return nil, err
}
Expand Down Expand Up @@ -259,6 +260,7 @@ func NewEVMWebSocketServer(
IdleTimeout: config.IdleTimeout,
})
methodTimeout := tmutils.Some(httpServer.timeouts.WriteTimeout)
httpServer.SetMaxOpenConns(config.MaxOpenConnections)
if err := httpServer.SetListenAddr(LocalAddress, config.WSPort); err != nil {
return nil, err
}
Expand Down
Loading