Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions conf/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@

graceful-close-conn-timeout = 15

# fail-backend-list marks backend pod names or backend addresses as failed. TiProxy will stop routing new
# connections to them and migrate existing connections away.
# fail-backend-list = ["db-2033841436272623616-0f6e346b-tidb-0", "10.0.0.10:4000"]

# failover-timeout is measured in seconds. If a failed backend still has remaining connections after the timeout,
# TiProxy will force close them.
# failover-timeout = 60

# possible values:
# "" => enable static routing.
# "pd-addr:pd-port" => automatically tidb discovery.
Expand Down
24 changes: 24 additions & 0 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ type ProxyServerOnline struct {
// BackendClusters represents multiple backend clusters that the proxy can route to. It can be reloaded
// online.
BackendClusters []BackendCluster `yaml:"backend-clusters,omitempty" toml:"backend-clusters,omitempty" json:"backend-clusters,omitempty" reloadable:"true"`
// FailBackendList contains backend pod names or backend addresses (IP:port) that should be drained immediately
// and excluded from new routing.
FailBackendList []string `yaml:"fail-backend-list,omitempty" toml:"fail-backend-list,omitempty" json:"fail-backend-list,omitempty" reloadable:"true"`
// FailoverTimeout is the grace period in seconds before force closing the remaining connections on failed backends.
FailoverTimeout int `yaml:"failover-timeout,omitempty" toml:"failover-timeout,omitempty" json:"failover-timeout,omitempty" reloadable:"true"`
}

type ProxyServer struct {
Expand Down Expand Up @@ -134,6 +139,7 @@ func NewConfig() *Config {
cfg.Proxy.FrontendKeepalive, cfg.Proxy.BackendHealthyKeepalive, cfg.Proxy.BackendUnhealthyKeepalive = DefaultKeepAlive()
cfg.Proxy.PDAddrs = "127.0.0.1:2379"
cfg.Proxy.GracefulCloseConnTimeout = 15
cfg.Proxy.FailoverTimeout = 60

cfg.API.Addr = "0.0.0.0:3080"

Expand All @@ -160,6 +166,7 @@ func (cfg *Config) Clone() *Config {
newCfg.Labels = maps.Clone(cfg.Labels)
newCfg.Proxy.PublicEndpoints = slices.Clone(cfg.Proxy.PublicEndpoints)
newCfg.Proxy.BackendClusters = slices.Clone(cfg.Proxy.BackendClusters)
newCfg.Proxy.FailBackendList = slices.Clone(cfg.Proxy.FailBackendList)
for i := range newCfg.Proxy.BackendClusters {
newCfg.Proxy.BackendClusters[i].NSServers = slices.Clone(newCfg.Proxy.BackendClusters[i].NSServers)
}
Expand Down Expand Up @@ -279,6 +286,23 @@ func (ps *ProxyServer) Check() error {
return errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.backend-clusters.ns-servers: %s", err.Error())
}
}
if ps.FailoverTimeout < 0 {
return errors.Wrapf(ErrInvalidConfigValue, "proxy.failover-timeout must be greater than or equal to 0")
}
failBackends := ps.FailBackendList[:0]
failBackendSet := make(map[string]struct{}, len(ps.FailBackendList))
for i, backendName := range ps.FailBackendList {
backendName = strings.TrimSpace(backendName)
if backendName == "" {
return errors.Wrapf(ErrInvalidConfigValue, "proxy.fail-backend-list[%d] is empty", i)
}
if _, ok := failBackendSet[backendName]; ok {
return errors.Wrapf(ErrInvalidConfigValue, "duplicate proxy.fail-backend-list entry %s", backendName)
}
failBackendSet[backendName] = struct{}{}
failBackends = append(failBackends, backendName)
}
ps.FailBackendList = failBackends
return nil
}

Expand Down
22 changes: 22 additions & 0 deletions lib/config/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ var testProxyConfig = Config{
FrontendKeepalive: KeepAlive{Enabled: true},
ProxyProtocol: "v2",
GracefulWaitBeforeShutdown: 10,
FailBackendList: []string{"db-tidb-0", "db-tidb-1"},
FailoverTimeout: 60,
ConnBufferSize: 32 * 1024,
BackendClusters: []BackendCluster{
{
Expand Down Expand Up @@ -188,6 +190,24 @@ func TestProxyCheck(t *testing.T) {
},
err: ErrInvalidConfigValue,
},
{
pre: func(t *testing.T, c *Config) {
c.Proxy.FailBackendList = []string{"db-tidb-0", " "}
},
err: ErrInvalidConfigValue,
},
{
pre: func(t *testing.T, c *Config) {
c.Proxy.FailBackendList = []string{"db-tidb-0", "db-tidb-0"}
},
err: ErrInvalidConfigValue,
},
{
pre: func(t *testing.T, c *Config) {
c.Proxy.FailoverTimeout = -1
},
err: ErrInvalidConfigValue,
},
}
for _, tc := range testcases {
cfg := testProxyConfig
Expand Down Expand Up @@ -311,10 +331,12 @@ func TestCloneConfig(t *testing.T) {
require.Equal(t, cfg, *clone)
cfg.Labels["c"] = "d"
cfg.Proxy.PublicEndpoints[0] = "2.2.2.0/24"
cfg.Proxy.FailBackendList[0] = "db-tidb-9"
cfg.Proxy.BackendClusters[0].Name = "cluster-updated"
cfg.Proxy.BackendClusters[0].NSServers[0] = "10.0.0.9"
require.NotContains(t, clone.Labels, "c")
require.Equal(t, []string{"1.1.1.0/24"}, clone.Proxy.PublicEndpoints)
require.Equal(t, []string{"db-tidb-0", "db-tidb-1"}, clone.Proxy.FailBackendList)
require.Equal(t, "cluster-a", clone.Proxy.BackendClusters[0].Name)
require.Equal(t, []string{"10.0.0.2", "10.0.0.3"}, clone.Proxy.BackendClusters[0].NSServers)
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/balance/router/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ func (g *Group) Balance(ctx context.Context) {
i := 0
for ele := fromBackend.connList.Front(); ele != nil && ctx.Err() == nil && i < count; ele = ele.Next() {
conn := ele.Value
if conn.forceClosing {
continue
}
switch conn.phase {
case phaseRedirectNotify:
// A connection cannot be redirected again when it has not finished redirecting.
Expand All @@ -279,6 +282,7 @@ func (g *Group) onCreateConn(backendInst BackendInst, conn RedirectableConn, suc
RedirectableConn: conn,
createTime: time.Now(),
phase: phaseNotRedirected,
forceClosing: false,
}
g.addConn(backend, connWrapper)
conn.SetEventReceiver(g)
Expand All @@ -287,6 +291,37 @@ func (g *Group) onCreateConn(backendInst BackendInst, conn RedirectableConn, suc
}
}

func (g *Group) CloseTimedOutFailoverConnections(now time.Time, timeout time.Duration) {
g.Lock()
defer g.Unlock()
for _, backend := range g.backends {
active, since := backend.Failover()
if !active {
continue
}
if timeout > 0 && since.Add(timeout).After(now) {
continue
}
for ele := backend.connList.Front(); ele != nil; ele = ele.Next() {
conn := ele.Value
if conn.phase == phaseClosed || conn.forceClosing {
continue
Comment on lines +307 to +308
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Skip force-closing connections that are mid-redirect

CloseTimedOutFailoverConnections currently force-closes any non-closed connection, including ones in phaseRedirectNotify. With failover-timeout=0 (or a very short timeout), rebalance() can queue a redirect in Balance() and then immediately call ForceClose() on the same session in the same tick, which races with the pending redirect signal and can leave router bookkeeping inconsistent (incorrect connScore/list state when close and redirect callbacks arrive in opposite order). Guarding phaseRedirectNotify here (or cancelling/marking the pending redirect before close) avoids this failover race.

Useful? React with 👍 / 👎.

}
fields := []zap.Field{
zap.Uint64("connID", conn.ConnectionID()),
zap.String("backend_addr", backend.addr),
zap.String("backend_pod", backend.PodName()),
zap.Duration("failover_timeout", timeout),
zap.Duration("failover_elapsed", now.Sub(since)),
}
if conn.ForceClose() {
conn.forceClosing = true
g.lg.Warn("force close connection on failover backend", fields...)
}
}
}
}

func (g *Group) removeConn(backend *backendWrapper, ce *glist.Element[*connWrapper]) {
backend.connList.Remove(ce)
setBackendConnMetrics(backend.addr, backend.connList.Len())
Expand Down
10 changes: 10 additions & 0 deletions pkg/balance/router/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ func (conn *mockRedirectableConn) Redirect(inst BackendInst) bool {
return true
}

func (conn *mockRedirectableConn) ForceClose() bool {
conn.Lock()
defer conn.Unlock()
if conn.closing {
return false
}
conn.closing = true
return true
}

func (conn *mockRedirectableConn) GetRedirectingAddr() string {
conn.Lock()
defer conn.Unlock()
Expand Down
65 changes: 64 additions & 1 deletion pkg/balance/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package router

import (
"net"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -68,6 +69,8 @@ type RedirectableConn interface {
Value(key any) any
// Redirect returns false if the current conn is not redirectable.
Redirect(backend BackendInst) bool
// ForceClose closes the connection immediately and returns false if it's already closing.
ForceClose() bool
ConnectionID() uint64
ConnInfo() []zap.Field
}
Expand All @@ -85,8 +88,11 @@ type backendWrapper struct {
mu struct {
sync.RWMutex
observer.BackendHealth
failoverActive bool
failoverSince time.Time
}
addr string
addr string
podName string
// connScore is used for calculating backend scores and check if the backend can be removed from the list.
// connScore = connList.Len() + incoming connections - outgoing connections.
connScore int
Expand All @@ -100,6 +106,7 @@ type backendWrapper struct {
func newBackendWrapper(addr string, health observer.BackendHealth) *backendWrapper {
wrapper := &backendWrapper{
addr: addr,
podName: backendPodNameFromAddr(addr),
connList: glist.New[*connWrapper](),
}
wrapper.setHealth(health)
Expand Down Expand Up @@ -128,12 +135,50 @@ func (b *backendWrapper) Addr() string {
}

func (b *backendWrapper) Healthy() bool {
b.mu.RLock()
healthy := b.mu.Healthy && !b.mu.failoverActive
b.mu.RUnlock()
return healthy
}

func (b *backendWrapper) ObservedHealthy() bool {
b.mu.RLock()
healthy := b.mu.Healthy
b.mu.RUnlock()
return healthy
}

func (b *backendWrapper) PodName() string {
return b.podName
}

func (b *backendWrapper) setFailover(active bool, since time.Time) (changed bool, failoverSince time.Time) {
b.mu.Lock()
defer b.mu.Unlock()
if active {
if b.mu.failoverActive {
return false, b.mu.failoverSince
}
b.mu.failoverActive = true
b.mu.failoverSince = since
return true, b.mu.failoverSince
}
if !b.mu.failoverActive {
return false, time.Time{}
}
b.mu.failoverActive = false
b.mu.failoverSince = time.Time{}
return true, time.Time{}
}

func (b *backendWrapper) Failover() (active bool, since time.Time) {
b.mu.RLock()
active = b.mu.failoverActive
since = b.mu.failoverSince
b.mu.RUnlock()
return
}

func (b *backendWrapper) ServerVersion() string {
b.mu.RLock()
version := b.mu.ServerVersion
Expand Down Expand Up @@ -213,4 +258,22 @@ type connWrapper struct {
lastRedirect time.Time
createTime time.Time
phase connPhase
forceClosing bool
}

func backendPodNameFromAddr(addr string) string {
host, _, err := net.SplitHostPort(addr)
if err != nil {
host = addr
}
if host == "" {
return ""
}
if ip := net.ParseIP(host); ip != nil {
return host
}
if idx := strings.IndexByte(host, '.'); idx >= 0 {
return host[:idx]
}
return host
}
Loading