Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type KeepAlive struct {
}

type ProxyServerOnline struct {
MaxConnections uint64 `yaml:"max-connections,omitempty" toml:"max-connections,omitempty" json:"max-connections,omitempty" reloadable:"true"`
ConnBufferSize int `yaml:"conn-buffer-size,omitempty" toml:"conn-buffer-size,omitempty" json:"conn-buffer-size,omitempty" reloadable:"true"`
FrontendKeepalive KeepAlive `yaml:"frontend-keepalive" toml:"frontend-keepalive" json:"frontend-keepalive"`
MaxConnections uint64 `yaml:"max-connections,omitempty" toml:"max-connections,omitempty" json:"max-connections,omitempty" reloadable:"true"`
HighMemoryUsageRejectThreshold float64 `yaml:"high-memory-usage-reject-threshold,omitempty" toml:"high-memory-usage-reject-threshold,omitempty" json:"high-memory-usage-reject-threshold,omitempty" reloadable:"true"`
ConnBufferSize int `yaml:"conn-buffer-size,omitempty" toml:"conn-buffer-size,omitempty" json:"conn-buffer-size,omitempty" reloadable:"true"`
FrontendKeepalive KeepAlive `yaml:"frontend-keepalive" toml:"frontend-keepalive" json:"frontend-keepalive"`
// BackendHealthyKeepalive applies when the observer treats the backend as healthy.
// The config values should be conservative to save CPU and tolerate network fluctuation.
BackendHealthyKeepalive KeepAlive `yaml:"backend-healthy-keepalive" toml:"backend-healthy-keepalive" json:"backend-healthy-keepalive"`
Expand Down Expand Up @@ -132,6 +133,7 @@ func NewConfig() *Config {

cfg.Proxy.Addr = "0.0.0.0:6000"
cfg.Proxy.FrontendKeepalive, cfg.Proxy.BackendHealthyKeepalive, cfg.Proxy.BackendUnhealthyKeepalive = DefaultKeepAlive()
cfg.Proxy.HighMemoryUsageRejectThreshold = 0.9
cfg.Proxy.PDAddrs = "127.0.0.1:2379"
cfg.Proxy.GracefulCloseConnTimeout = 15

Expand Down Expand Up @@ -255,6 +257,12 @@ func (cfg *Config) GetBackendClusters() []BackendCluster {
}

func (ps *ProxyServer) Check() error {
if ps.HighMemoryUsageRejectThreshold < 0 || ps.HighMemoryUsageRejectThreshold > 1 {
return errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.high-memory-usage-reject-threshold")
}
if ps.HighMemoryUsageRejectThreshold > 0 && ps.HighMemoryUsageRejectThreshold < 0.5 {
ps.HighMemoryUsageRejectThreshold = 0.5
}
if _, err := ps.GetSQLAddrs(); err != nil {
return errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.addr or proxy.port-range: %s", err.Error())
}
Expand Down
31 changes: 26 additions & 5 deletions lib/config/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ var testProxyConfig = Config{
Addr: "0.0.0.0:4000",
PDAddrs: "127.0.0.1:4089",
ProxyServerOnline: ProxyServerOnline{
MaxConnections: 1,
FrontendKeepalive: KeepAlive{Enabled: true},
ProxyProtocol: "v2",
GracefulWaitBeforeShutdown: 10,
ConnBufferSize: 32 * 1024,
MaxConnections: 1,
HighMemoryUsageRejectThreshold: 0.9,
FrontendKeepalive: KeepAlive{Enabled: true},
ProxyProtocol: "v2",
GracefulWaitBeforeShutdown: 10,
ConnBufferSize: 32 * 1024,
BackendClusters: []BackendCluster{
{
Name: "cluster-a",
Expand Down Expand Up @@ -114,6 +115,26 @@ func TestProxyCheck(t *testing.T) {
post func(*testing.T, *Config)
err error
}{
{
pre: func(t *testing.T, c *Config) {
c.Proxy.HighMemoryUsageRejectThreshold = -0.1
},
err: ErrInvalidConfigValue,
},
{
pre: func(t *testing.T, c *Config) {
c.Proxy.HighMemoryUsageRejectThreshold = 1.1
},
err: ErrInvalidConfigValue,
},
{
pre: func(t *testing.T, c *Config) {
c.Proxy.HighMemoryUsageRejectThreshold = 0.4
},
post: func(t *testing.T, c *Config) {
require.Equal(t, 0.5, c.Proxy.HighMemoryUsageRejectThreshold)
},
},
{
pre: func(t *testing.T, c *Config) {
c.Workdir = ""
Expand Down
92 changes: 78 additions & 14 deletions pkg/manager/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"runtime"
"runtime/pprof"
"sync/atomic"
"time"

"github.com/pingcap/tidb/pkg/util/memory"
Expand All @@ -18,16 +19,26 @@ import (
)

const (
// Check the memory usage every 30 seconds.
checkInterval = 30 * time.Second
// Check the memory usage every 5 seconds.
checkInterval = 5 * time.Second
// No need to record too frequently.
recordMinInterval = 5 * time.Minute
// Record the profiles when the memory usage is higher than 60%.
alarmThreshold = 0.6
// Remove the oldest profiles when the number of profiles exceeds this limit.
maxSavedProfiles = 20
// Fail open if the latest sampled usage is too old.
snapshotExpireInterval = 2 * checkInterval
)

type UsageSnapshot struct {
Used uint64
Limit uint64
Usage float64
UpdateTime time.Time
Valid bool
}

// MemManager is a manager for memory usage.
// Although the continuous profiling collects profiles periodically, when TiProxy runs in the replayer mode,
// the profiles are not collected.
Expand All @@ -41,17 +52,22 @@ type MemManager struct {
checkInterval time.Duration // used for test
recordMinInterval time.Duration // used for test
maxSavedProfiles int // used for test
snapshotExpire time.Duration // used for test
memoryLimit uint64
latestUsage atomic.Value
}

func NewMemManager(lg *zap.Logger, cfgGetter config.ConfigGetter) *MemManager {
return &MemManager{
mgr := &MemManager{
lg: lg,
cfgGetter: cfgGetter,
checkInterval: checkInterval,
recordMinInterval: recordMinInterval,
maxSavedProfiles: maxSavedProfiles,
snapshotExpire: snapshotExpireInterval,
}
mgr.latestUsage.Store(UsageSnapshot{})
return mgr
}

func (m *MemManager) Start(ctx context.Context) {
Expand All @@ -62,6 +78,9 @@ func (m *MemManager) Start(ctx context.Context) {
return
}
m.memoryLimit = limit
if _, err = m.refreshUsage(); err != nil {
return
}
childCtx, cancel := context.WithCancel(ctx)
m.cancel = cancel
m.wg.RunWithRecover(func() {
Expand All @@ -83,32 +102,77 @@ func (m *MemManager) alarmLoop(ctx context.Context) {
}

func (m *MemManager) checkAndAlarm() {
snapshot, err := m.refreshUsage()
if err != nil || !snapshot.Valid {
return
}
if snapshot.Usage < alarmThreshold {
return
}
if time.Since(m.lastRecordTime) < m.recordMinInterval {
return
}
// The filename is hot-reloadable.
logPath := m.cfgGetter.GetConfig().Log.LogFile.Filename
cfg := m.cfgGetter.GetConfig()
if cfg == nil {
return
}
logPath := cfg.Log.LogFile.Filename
if logPath == "" {
return
}
recordDir := filepath.Dir(logPath)

m.lastRecordTime = snapshot.UpdateTime
m.lg.Warn("memory usage alarm", zap.Uint64("limit", snapshot.Limit), zap.Uint64("used", snapshot.Used), zap.Float64("usage", snapshot.Usage))
now := time.Now().Format(time.RFC3339)
m.recordHeap(filepath.Join(recordDir, "heap_"+now))
m.recordGoroutine(filepath.Join(recordDir, "goroutine_"+now))
m.rmExpiredProfiles()
}

func (m *MemManager) refreshUsage() (UsageSnapshot, error) {
if m.memoryLimit == 0 {
return UsageSnapshot{}, nil
}
used, err := memory.MemUsed()
if err != nil || used == 0 {
m.lg.Error("get used memory failed", zap.Uint64("used", used), zap.Error(err))
return
return UsageSnapshot{}, err
}
memoryUsage := float64(used) / float64(m.memoryLimit)
if memoryUsage < alarmThreshold {
return
snapshot := UsageSnapshot{
Used: used,
Limit: m.memoryLimit,
Usage: float64(used) / float64(m.memoryLimit),
UpdateTime: time.Now(),
Valid: true,
}
m.latestUsage.Store(snapshot)
return snapshot, nil
}

m.lastRecordTime = time.Now()
m.lg.Warn("memory usage alarm", zap.Uint64("limit", m.memoryLimit), zap.Uint64("used", used), zap.Float64("usage", memoryUsage))
now := time.Now().Format(time.RFC3339)
m.recordHeap(filepath.Join(recordDir, "heap_"+now))
m.recordGoroutine(filepath.Join(recordDir, "goroutine_"+now))
m.rmExpiredProfiles()
func (m *MemManager) LatestUsage() UsageSnapshot {
snapshot, _ := m.latestUsage.Load().(UsageSnapshot)
return snapshot
}

func (m *MemManager) ShouldRejectNewConn() (bool, UsageSnapshot, float64) {
if m == nil || m.cfgGetter == nil {
return false, UsageSnapshot{}, 0
}
cfg := m.cfgGetter.GetConfig()
if cfg == nil {
return false, UsageSnapshot{}, 0
}
threshold := cfg.Proxy.HighMemoryUsageRejectThreshold
if threshold == 0 {
return false, UsageSnapshot{}, 0
}
snapshot := m.LatestUsage()
if !snapshot.Valid || time.Since(snapshot.UpdateTime) > m.snapshotExpire {
return false, snapshot, threshold
}
return snapshot.Usage >= threshold, snapshot, threshold
}

func (m *MemManager) recordHeap(fileName string) {
Expand Down
48 changes: 48 additions & 0 deletions pkg/manager/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func (c *mockCfgGetter) GetConfig() *config.Config {
}

func TestRecordProfile(t *testing.T) {
oldMemUsed, oldMemTotal := memory.MemUsed, memory.MemTotal
defer func() {
memory.MemUsed = oldMemUsed
memory.MemTotal = oldMemTotal
}()

dir := t.TempDir()
cfg := &config.Config{}
cfg.Log.LogFile.Filename = path.Join(dir, "proxy.log")
Expand Down Expand Up @@ -75,3 +81,45 @@ func TestRecordProfile(t *testing.T) {
require.NoError(t, err)
require.Len(t, entries, m.maxSavedProfiles)
}

func TestShouldRejectNewConn(t *testing.T) {
oldMemUsed, oldMemTotal := memory.MemUsed, memory.MemTotal
defer func() {
memory.MemUsed = oldMemUsed
memory.MemTotal = oldMemTotal
}()

cfg := config.NewConfig()
cfg.Proxy.HighMemoryUsageRejectThreshold = 0.9
cfgGetter := mockCfgGetter{cfg: cfg}
memory.MemUsed = func() (uint64, error) {
return 9 * (1 << 30), nil
}
memory.MemTotal = func() (uint64, error) {
return 10 * (1 << 30), nil
}
m := NewMemManager(zap.NewNop(), &cfgGetter)
m.checkInterval = 50 * time.Millisecond
m.snapshotExpire = 200 * time.Millisecond
m.Start(context.Background())
defer m.Close()

require.Eventually(t, func() bool {
reject, snapshot, threshold := m.ShouldRejectNewConn()
return reject && snapshot.Valid && threshold == 0.9
}, time.Second, 10*time.Millisecond)
m.Close()

cfg.Proxy.HighMemoryUsageRejectThreshold = 0
reject, _, threshold := m.ShouldRejectNewConn()
require.False(t, reject)
require.Zero(t, threshold)

staleSnapshot := m.LatestUsage()
staleSnapshot.UpdateTime = time.Now().Add(-m.snapshotExpire - time.Second)
m.latestUsage.Store(staleSnapshot)
cfg.Proxy.HighMemoryUsageRejectThreshold = 0.9
reject, _, threshold = m.ShouldRejectNewConn()
require.False(t, reject)
require.Equal(t, 0.9, threshold)
}
1 change: 1 addition & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func init() {
colls = []prometheus.Collector{
ConnGauge,
CreateConnCounter,
RejectConnCounter,
DisConnCounter,
MaxProcsGauge,
OwnerGauge,
Expand Down
8 changes: 8 additions & 0 deletions pkg/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ var (
Help: "Number of create connections.",
})

RejectConnCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: ModuleProxy,
Subsystem: LabelServer,
Name: "reject_connection_total",
Help: "Number of rejected connections.",
}, []string{LblType})

DisConnCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: ModuleProxy,
Expand Down
Loading