From 201bf8cc396186132cf9e266d02919681d8ed8de Mon Sep 17 00:00:00 2001 From: djshow832 Date: Thu, 2 Apr 2026 21:03:36 +0800 Subject: [PATCH 01/12] avoid multiple vip --- go.mod | 1 - go.sum | 2 - lib/config/proxy.go | 21 +++++- lib/config/proxy_test.go | 33 +++++++++ pkg/manager/elect/election.go | 72 +++++++------------ pkg/manager/elect/election_test.go | 14 +--- pkg/manager/elect/mock_test.go | 67 ++++++++--------- pkg/manager/vip/manager.go | 112 +++++++++++++++++++++++------ pkg/manager/vip/manager_test.go | 86 +++++++++++++++++++++- pkg/manager/vip/mock_test.go | 49 ++++++++++--- pkg/manager/vip/network.go | 32 ++++++--- pkg/manager/vip/network_test.go | 2 +- 12 files changed, 344 insertions(+), 147 deletions(-) diff --git a/go.mod b/go.mod index d83a4fb7b..e6ae7ca06 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/go-sql-driver/mysql v1.7.1 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 - github.com/j-keck/arping v1.0.3 github.com/klauspost/compress v1.18.0 github.com/pelletier/go-toml/v2 v2.2.2 github.com/pingcap/kvproto v0.0.0-20250728031536-f08901d17bf4 diff --git a/go.sum b/go.sum index 669bd3be2..80480d9e5 100644 --- a/go.sum +++ b/go.sum @@ -518,8 +518,6 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY= github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y= -github.com/j-keck/arping v1.0.3 h1:aeVk5WnsK6xPaRsFt5wV6W2x5l/n5XBNp0MMr/FEv2k= -github.com/j-keck/arping v1.0.3/go.mod h1:aJbELhR92bSk7tp79AWM/ftfc90EfEi2bQJrbBFOsPw= github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jedib0t/go-pretty/v6 v6.2.2 h1:o3McN0rQ4X+IU+HduppSp9TwRdGLRW2rhJXy9CJaCRw= github.com/jedib0t/go-pretty/v6 v6.2.2/go.mod h1:+nE9fyyHGil+PuISTCrp7avEdo6bqoMwqZnuiK2r2a0= diff --git a/lib/config/proxy.go b/lib/config/proxy.go index cb0cc3209..e549dae92 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -108,8 +108,11 @@ type LogFile struct { } type HA struct { - VirtualIP string `yaml:"virtual-ip,omitempty" toml:"virtual-ip,omitempty" json:"virtual-ip,omitempty" reloadable:"false"` - Interface string `yaml:"interface,omitempty" toml:"interface,omitempty" json:"interface,omitempty" reloadable:"false"` + VirtualIP string `yaml:"virtual-ip,omitempty" toml:"virtual-ip,omitempty" json:"virtual-ip,omitempty" reloadable:"false"` + Interface string `yaml:"interface,omitempty" toml:"interface,omitempty" json:"interface,omitempty" reloadable:"false"` + GARPBurstCount int `yaml:"garp-burst-count,omitempty" toml:"garp-burst-count,omitempty" json:"garp-burst-count,omitempty" reloadable:"false"` + GARPBurstInterval time.Duration `yaml:"garp-burst-interval,omitempty" toml:"garp-burst-interval,omitempty" json:"garp-burst-interval,omitempty" reloadable:"false"` + GARPRefreshInterval time.Duration `yaml:"garp-refresh-interval,omitempty" toml:"garp-refresh-interval,omitempty" json:"garp-refresh-interval,omitempty" reloadable:"false"` } func DefaultKeepAlive() (frontend, backendHealthy, backendUnhealthy KeepAlive) { @@ -150,6 +153,8 @@ func NewConfig() *Config { cfg.Balance = DefaultBalance() + cfg.HA.GARPBurstCount = 5 + cfg.EnableTrafficReplay = true return &cfg @@ -192,6 +197,18 @@ func (cfg *Config) Check() error { if err := cfg.Balance.Check(); err != nil { return err } + if cfg.HA.GARPBurstCount < 0 { + return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-burst-count must be greater than or equal to 0") + } + if cfg.HA.GARPBurstInterval < 0 { + return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-burst-interval must be greater than or equal to 0") + } + if cfg.HA.GARPRefreshInterval < 0 { + return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-refresh-interval must be greater than or equal to 0") + } + if cfg.HA.GARPBurstCount == 0 && cfg.HA.GARPRefreshInterval > 0 { + return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-burst-count must be greater than 0 when ha.garp-refresh-interval is enabled") + } return nil } diff --git a/lib/config/proxy_test.go b/lib/config/proxy_test.go index da9f3ecaa..9391c9af3 100644 --- a/lib/config/proxy_test.go +++ b/lib/config/proxy_test.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/BurntSushi/toml" mconfig "github.com/pingcap/metering_sdk/config" @@ -85,6 +86,13 @@ var testProxyConfig = Config{ }, RequireBackendTLS: true, }, + HA: HA{ + VirtualIP: "10.10.10.10/32", + Interface: "eth0", + GARPBurstCount: 5, + GARPBurstInterval: time.Second, + GARPRefreshInterval: time.Minute, + }, Metering: mconfig.MeteringConfig{ Type: storage.ProviderTypeAzure, Bucket: "metering-container", @@ -188,6 +196,31 @@ func TestProxyCheck(t *testing.T) { }, err: ErrInvalidConfigValue, }, + { + pre: func(t *testing.T, c *Config) { + c.HA.GARPBurstCount = -1 + }, + err: ErrInvalidConfigValue, + }, + { + pre: func(t *testing.T, c *Config) { + c.HA.GARPBurstInterval = -time.Second + }, + err: ErrInvalidConfigValue, + }, + { + pre: func(t *testing.T, c *Config) { + c.HA.GARPRefreshInterval = -time.Second + }, + err: ErrInvalidConfigValue, + }, + { + pre: func(t *testing.T, c *Config) { + c.HA.GARPBurstCount = 0 + c.HA.GARPRefreshInterval = time.Second + }, + err: ErrInvalidConfigValue, + }, } for _, tc := range testcases { cfg := testProxyConfig diff --git a/pkg/manager/elect/election.go b/pkg/manager/elect/election.go index aed8c0247..d97c2b669 100644 --- a/pkg/manager/elect/election.go +++ b/pkg/manager/elect/election.go @@ -38,27 +38,23 @@ type Election interface { ID() string // GetOwnerID gets the owner ID. GetOwnerID(ctx context.Context) (string, error) - // Close resigns and but doesn't retire. + // Close stops campaigning and retires if this member currently owns the key. Close() } type ElectionConfig struct { - Timeout time.Duration - RetryIntvl time.Duration - QueryIntvl time.Duration - WaitBeforeRetire time.Duration - RetryCnt uint64 - SessionTTL int + Timeout time.Duration + RetryIntvl time.Duration + RetryCnt uint64 + SessionTTL int } func DefaultElectionConfig(sessionTTL int) ElectionConfig { return ElectionConfig{ - Timeout: 2 * time.Second, - RetryIntvl: 500 * time.Millisecond, - QueryIntvl: 1 * time.Second, - WaitBeforeRetire: 3 * time.Second, - RetryCnt: 3, - SessionTTL: sessionTTL, + Timeout: 2 * time.Second, + RetryIntvl: 500 * time.Millisecond, + RetryCnt: 3, + SessionTTL: sessionTTL, } } @@ -123,6 +119,9 @@ func (m *election) campaignLoop(ctx context.Context) { m.lg.Debug("begin campaign") select { case <-session.Done(): + if m.isOwner { + m.onRetired() + } m.lg.Info("etcd session is done, creates a new one") leaseID := session.Lease() session, err = concurrency.NewSession(m.etcdCli, concurrency.WithTTL(m.cfg.SessionTTL), concurrency.WithContext(ctx)) @@ -142,6 +141,9 @@ func (m *election) campaignLoop(ctx context.Context) { // The etcd server deletes this session's lease ID, but etcd session doesn't find it. // In this case if we do the campaign operation, the etcd server will return ErrLeaseNotFound. if errors.Is(err, rpctypes.ErrLeaseNotFound) { + if m.isOwner { + m.onRetired() + } if session != nil { err = session.Close() m.lg.Warn("etcd session encounters ErrLeaseNotFound, close it", zap.Error(err)) @@ -149,26 +151,21 @@ func (m *election) campaignLoop(ctx context.Context) { continue } - var wg waitgroup.WaitGroup - childCtx, cancel := context.WithCancel(ctx) - if m.isOwner { - // Check if another member becomes the new owner during campaign. - wg.RunWithRecover(func() { - m.waitRetire(childCtx) - }, nil, m.lg) - } - elec := concurrency.NewElection(session, m.key) err = elec.Campaign(ctx, m.id) - cancel() - wg.Wait() if err != nil { + if m.isOwner { + m.onRetired() + } m.lg.Info("failed to campaign", zap.Error(errors.WithStack(err))) continue } kv, err := m.getOwnerInfo(ctx) if err != nil { + if m.isOwner { + m.onRetired() + } m.lg.Warn("failed to get owner info", zap.Error(err)) continue } @@ -188,6 +185,9 @@ func (m *election) campaignLoop(ctx context.Context) { m.lg.Info("still the owner") } m.watchOwner(ctx, session, hack.String(kv.Key)) + if m.isOwner { + m.onRetired() + } } } @@ -206,30 +206,6 @@ func (m *election) onRetired() { metrics.OwnerGauge.MetricVec.DeletePartialMatch(map[string]string{metrics.LblType: m.trimedKey}) } -// waitRetire retires after another member becomes the owner so that there will always be an owner. -// It's allowed if multiple members act as the owner for some time but it's not allowed if no member acts as the owner. -// E.g. at least one member needs to bind the VIP even if the etcd server leader is down. -func (m *election) waitRetire(ctx context.Context) { - ticker := time.NewTicker(m.cfg.QueryIntvl) - defer ticker.Stop() - for ctx.Err() == nil { - select { - case <-ticker.C: - id, err := m.GetOwnerID(ctx) - if err != nil { - continue - } - // Another member becomes the owner, retire. - if id != m.id { - m.onRetired() - return - } - case <-ctx.Done(): - return - } - } -} - // revokeLease revokes the session lease so that other members can compaign immediately. func (m *election) revokeLease(leaseID clientv3.LeaseID) { // If revoke takes longer than the ttl, lease is expired anyway. diff --git a/pkg/manager/elect/election_test.go b/pkg/manager/elect/election_test.go index 456804596..723df6a81 100644 --- a/pkg/manager/elect/election_test.go +++ b/pkg/manager/elect/election_test.go @@ -77,11 +77,7 @@ func TestEtcdServerDown(t *testing.T) { addr := ts.shutdownServer() _, err := elec1.GetOwnerID(context.Background()) require.Error(t, err) - // the owner should not retire before the server is up again - ts.expectNoEvent("1") ts.startServer(addr) - // the owner should not retire because there's no other member - ts.expectNoEvent("1") ownerID := ts.getOwnerID() require.Equal(t, "1", ownerID) @@ -91,18 +87,10 @@ func TestEtcdServerDown(t *testing.T) { require.Error(t, err) elec2 := ts.newElection("2") elec2.Start(context.Background()) - // the owner should not retire before the server is up again - ts.expectNoEvent("1") // start the server again and the elections recover ts.startServer(addr) - ownerID = ts.getOwnerID() - if ownerID == "1" { - ts.expectNoEvent("1") - } else { - ts.expectEvent("1", eventTypeRetired) - ts.expectEvent(ownerID, eventTypeElected) - } + require.NotEmpty(t, ts.getOwnerID()) } func TestOwnerHang(t *testing.T) { diff --git a/pkg/manager/elect/mock_test.go b/pkg/manager/elect/mock_test.go index ad84c7178..2ce5e27c4 100644 --- a/pkg/manager/elect/mock_test.go +++ b/pkg/manager/elect/mock_test.go @@ -5,6 +5,7 @@ package elect import ( "context" + "sync" "testing" "time" @@ -25,14 +26,19 @@ const ( var _ Member = (*mockMember)(nil) type mockMember struct { - ch chan int + ch chan int + hangElectedMu sync.Mutex + hangElectedCh chan struct{} } func newMockMember() *mockMember { - return &mockMember{ch: make(chan int, 2)} + return &mockMember{ch: make(chan int, 32)} } func (mo *mockMember) OnElected() { + if ch := mo.getHangElectedCh(); ch != nil { + <-ch + } mo.ch <- eventTypeElected } @@ -43,7 +49,7 @@ func (mo *mockMember) OnRetired() { func (mo *mockMember) expectEvent(t *testing.T, expected ...int) { for _, exp := range expected { select { - case <-time.After(3 * time.Second): + case <-time.After(5 * time.Second): t.Fatal("timeout") case event := <-mo.ch: require.Equal(t, exp, event) @@ -51,34 +57,26 @@ func (mo *mockMember) expectEvent(t *testing.T, expected ...int) { } } -func (mo *mockMember) expectNoEvent(t *testing.T) { - select { - case <-time.After(100 * time.Millisecond): +func (mo *mockMember) hang(hang bool) { + mo.hangElectedMu.Lock() + defer mo.hangElectedMu.Unlock() + + if hang { + if mo.hangElectedCh == nil { + mo.hangElectedCh = make(chan struct{}) + } return - case event := <-mo.ch: - require.Fail(t, "unexpected event", event) + } + if mo.hangElectedCh != nil { + close(mo.hangElectedCh) + mo.hangElectedCh = nil } } -func (mo *mockMember) hang(hang bool) { - contn := true - for contn { - if hang { - // fill the channel - select { - case mo.ch <- eventTypeElected: - default: - contn = false - } - } else { - // clear the channel - select { - case <-mo.ch: - default: - contn = false - } - } - } +func (mo *mockMember) getHangElectedCh() chan struct{} { + mo.hangElectedMu.Lock() + defer mo.hangElectedMu.Unlock() + return mo.hangElectedCh } type etcdTestSuite struct { @@ -156,11 +154,6 @@ func (ts *etcdTestSuite) expectEvent(id string, event ...int) { elec.member.(*mockMember).expectEvent(ts.t, event...) } -func (ts *etcdTestSuite) expectNoEvent(id string) { - elec := ts.getElection(id) - elec.member.(*mockMember).expectNoEvent(ts.t) -} - func (ts *etcdTestSuite) hang(id string, hang bool) { elec := ts.getElection(id) elec.member.(*mockMember).hang(hang) @@ -196,11 +189,9 @@ func (ts *etcdTestSuite) shutdownServer() string { func electionConfigForTest(ttl int) ElectionConfig { return ElectionConfig{ - SessionTTL: ttl, - Timeout: 100 * time.Millisecond, - RetryIntvl: 10 * time.Millisecond, - QueryIntvl: 10 * time.Millisecond, - WaitBeforeRetire: 3 * time.Second, - RetryCnt: 2, + SessionTTL: ttl, + Timeout: 100 * time.Millisecond, + RetryIntvl: 10 * time.Millisecond, + RetryCnt: 2, } } diff --git a/pkg/manager/vip/manager.go b/pkg/manager/vip/manager.go index 8fe64fa6f..29997c59c 100644 --- a/pkg/manager/vip/manager.go +++ b/pkg/manager/vip/manager.go @@ -7,7 +7,8 @@ import ( "context" "fmt" "net" - "sync/atomic" + "sync" + "time" "github.com/pingcap/tiproxy/lib/config" "github.com/pingcap/tiproxy/pkg/manager/elect" @@ -23,6 +24,9 @@ const ( // The etcd client keeps alive every TTL/3 seconds. // The TTL determines the failover time so it should be short. sessionTTL = 3 + // Refresh GARP for a short window after takeover so upstream devices have + // several chances to update the VIP neighbor entry. + garpRefreshRounds = 10 ) type VIPManager interface { @@ -34,11 +38,14 @@ type VIPManager interface { var _ VIPManager = (*vipManager)(nil) type vipManager struct { - operation NetworkOperation - cfgGetter config.ConfigGetter - election elect.Election - lg *zap.Logger - delOnRetire atomic.Bool + mu sync.Mutex + closing bool + refreshWG sync.WaitGroup + refreshCancel context.CancelFunc + operation NetworkOperation + cfgGetter config.ConfigGetter + election elect.Election + lg *zap.Logger } func NewVIPManager(lg *zap.Logger, cfgGetter config.ConfigGetter) (*vipManager, error) { @@ -54,7 +61,7 @@ func NewVIPManager(lg *zap.Logger, cfgGetter config.ConfigGetter) (*vipManager, vm.lg.Warn("Both address and link must be specified to enable VIP. VIP is disabled") return nil, nil } - operation, err := NewNetworkOperation(cfg.HA.VirtualIP, cfg.HA.Interface, lg) + operation, err := NewNetworkOperation(cfg.HA.VirtualIP, cfg.HA.Interface, cfg.HA.GARPBurstCount, cfg.HA.GARPBurstInterval, lg) if err != nil { vm.lg.Error("init network operation failed", zap.Error(err)) return nil, err @@ -64,9 +71,12 @@ func NewVIPManager(lg *zap.Logger, cfgGetter config.ConfigGetter) (*vipManager, } func (vm *vipManager) Start(ctx context.Context, etcdCli *clientv3.Client) error { + vm.mu.Lock() + defer vm.mu.Unlock() + vm.closing = false + // This node may have bound the VIP before last failure. vm.delVIP() - vm.delOnRetire.Store(true) cfg := vm.cfgGetter.GetConfig() ip, port, _, err := cfg.GetIPPort() @@ -83,13 +93,23 @@ func (vm *vipManager) Start(ctx context.Context, etcdCli *clientv3.Client) error } func (vm *vipManager) OnElected() { + vm.mu.Lock() + defer vm.mu.Unlock() + + if vm.closing { + vm.lg.Info("skip adding VIP because the manager is closing") + return + } vm.addVIP() + vm.startARPRefresh() } func (vm *vipManager) OnRetired() { - if vm.delOnRetire.Load() { - vm.delVIP() - } + vm.mu.Lock() + defer vm.mu.Unlock() + + vm.stopARPRefresh() + vm.delVIP() } func (vm *vipManager) addVIP() { @@ -130,20 +150,72 @@ func (vm *vipManager) delVIP() { vm.lg.Info("deleting VIP success") } -// PreClose resigns the owner but doesn't delete the VIP. -// It makes use of the graceful-wait time to wait for the new owner to shorten the failover time. +func (vm *vipManager) startARPRefresh() { + refreshInterval := vm.cfgGetter.GetConfig().HA.GARPRefreshInterval + if refreshInterval <= 0 { + return + } + vm.stopARPRefresh() + + ctx, cancel := context.WithCancel(context.Background()) + vm.refreshCancel = cancel + vm.refreshWG.Add(1) + go func() { + defer vm.refreshWG.Done() + + ticker := time.NewTicker(refreshInterval) + defer ticker.Stop() + for i := 0; i < garpRefreshRounds; i++ { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := vm.operation.SendARP(); err != nil { + vm.lg.Warn("refreshing GARP failed", zap.Error(err)) + return + } + } + } + }() +} + +func (vm *vipManager) stopARPRefresh() { + cancel := vm.refreshCancel + vm.refreshCancel = nil + if cancel != nil { + cancel() + } + vm.refreshWG.Wait() +} + +// PreClose deletes the VIP before resigning the owner so that controlled +// shutdowns do not expose the VIP on two nodes at the same time. func (vm *vipManager) PreClose() { - vm.delOnRetire.Store(false) - if vm.election != nil { - vm.election.Close() + election := vm.prepareForClose() + if election != nil { + election.Close() } } -// Close resigns the owner and deletes the VIP if it was the owner. -// The new owner may not be elected but we won't wait anymore. +// Close resigns the owner and makes sure the VIP is removed locally. func (vm *vipManager) Close() { - if vm.election != nil { - vm.election.Close() + election := vm.prepareForClose() + if election != nil { + election.Close() } + + vm.mu.Lock() + vm.stopARPRefresh() + vm.delVIP() + vm.mu.Unlock() +} + +func (vm *vipManager) prepareForClose() elect.Election { + vm.mu.Lock() + defer vm.mu.Unlock() + + vm.closing = true + vm.stopARPRefresh() vm.delVIP() + return vm.election } diff --git a/pkg/manager/vip/manager_test.go b/pkg/manager/vip/manager_test.go index 0774e00f0..54d9092c6 100644 --- a/pkg/manager/vip/manager_test.go +++ b/pkg/manager/vip/manager_test.go @@ -126,7 +126,6 @@ func TestNetworkOperation(t *testing.T) { cfgGetter: newMockConfigGetter(newMockConfig()), operation: operation, } - vm.delOnRetire.Store(true) vm.election = newMockElection(ch, vm) childCtx, cancel := context.WithCancel(context.Background()) vm.election.Start(childCtx) @@ -147,6 +146,71 @@ func TestNetworkOperation(t *testing.T) { vm.Close() } +func TestPreCloseDeletesVIPBeforeCloseElection(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + operation := newMockNetworkOperation() + operation.hasIP.Store(true) + + vm := &vipManager{ + lg: lg, + cfgGetter: newMockConfigGetter(newMockConfig()), + operation: operation, + } + vm.election = &closeHookElection{ + closeFn: func() { + require.False(t, operation.hasIP.Load()) + require.EqualValues(t, 1, operation.delIPCnt.Load()) + }, + } + + vm.PreClose() +} + +func TestPreClosePreventsReacquireDuringElectionClose(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + operation := newMockNetworkOperation() + operation.hasIP.Store(true) + + vm := &vipManager{ + lg: lg, + cfgGetter: newMockConfigGetter(newMockConfig()), + operation: operation, + } + vm.election = &closeHookElection{ + closeFn: func() { + vm.OnElected() + }, + } + + vm.PreClose() + require.False(t, operation.hasIP.Load()) + require.EqualValues(t, 0, operation.addIPCnt.Load()) +} + +func TestGARPRefresh(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + cfg := newMockConfig() + cfg.HA.GARPBurstCount = 1 + cfg.HA.GARPRefreshInterval = 10 * time.Millisecond + operation := newMockNetworkOperation() + vm := &vipManager{ + lg: lg, + cfgGetter: newMockConfigGetter(cfg), + operation: operation, + } + + vm.OnElected() + require.Eventually(t, func() bool { + return operation.sendArpCnt.Load() >= 2 + }, time.Second, 10*time.Millisecond) + + vm.OnRetired() + sendArpCnt := operation.sendArpCnt.Load() + time.Sleep(50 * time.Millisecond) + require.Equal(t, sendArpCnt, operation.sendArpCnt.Load()) + require.False(t, operation.hasIP.Load()) +} + func TestStartAndClose(t *testing.T) { lg, _ := logger.CreateLoggerForTest(t) vm, err := NewVIPManager(lg, newMockConfigGetter(newMockConfig())) @@ -216,3 +280,23 @@ func TestMultiVIP(t *testing.T) { vm1.Close() vm2.Close() } + +type closeHookElection struct { + closeFn func() +} + +func (e *closeHookElection) Start(context.Context) {} + +func (e *closeHookElection) ID() string { + return "" +} + +func (e *closeHookElection) GetOwnerID(context.Context) (string, error) { + return "", nil +} + +func (e *closeHookElection) Close() { + if e.closeFn != nil { + e.closeFn() + } +} diff --git a/pkg/manager/vip/mock_test.go b/pkg/manager/vip/mock_test.go index 582a2fe2a..59c0aaa59 100644 --- a/pkg/manager/vip/mock_test.go +++ b/pkg/manager/vip/mock_test.go @@ -5,7 +5,9 @@ package vip import ( "context" + "sync" "sync/atomic" + "time" "github.com/pingcap/tiproxy/lib/config" "github.com/pingcap/tiproxy/lib/util/errors" @@ -37,15 +39,18 @@ const ( ) type mockElection struct { - wg waitgroup.WaitGroup - ch chan int - member elect.Member + wg waitgroup.WaitGroup + ch chan int + member elect.Member + closeOnce sync.Once + closeCh chan struct{} } func newMockElection(ch chan int, member elect.Member) *mockElection { return &mockElection{ - ch: ch, - member: member, + ch: ch, + member: member, + closeCh: make(chan struct{}), } } @@ -63,6 +68,8 @@ func (me *mockElection) Start(ctx context.Context) { select { case <-ctx.Done(): return + case <-me.closeCh: + return case event := <-me.ch: switch event { case eventTypeElected: @@ -76,17 +83,24 @@ func (me *mockElection) Start(ctx context.Context) { } func (me *mockElection) Close() { + me.closeOnce.Do(func() { + close(me.closeCh) + }) me.wg.Wait() } var _ NetworkOperation = (*mockNetworkOperation)(nil) type mockNetworkOperation struct { - hasIP atomic.Bool - hasIPErr atomic.Bool - addIPErr atomic.Bool - delIPErr atomic.Bool - sendArpErr atomic.Bool + hasIP atomic.Bool + hasIPErr atomic.Bool + addIPErr atomic.Bool + delIPErr atomic.Bool + sendArpErr atomic.Bool + sendArpCnt atomic.Int32 + addIPCnt atomic.Int32 + delIPCnt atomic.Int32 + sendArpDelay atomic.Int64 } func newMockNetworkOperation() *mockNetworkOperation { @@ -105,6 +119,8 @@ func (mno *mockNetworkOperation) AddIP() error { if mno.addIPErr.Load() { return errors.New("mock AddIP error") } + mno.addIPCnt.Add(1) + mno.hasIP.Store(true) return nil } @@ -112,6 +128,8 @@ func (mno *mockNetworkOperation) DeleteIP() error { if mno.delIPErr.Load() { return errors.New("mock DeleteIP error") } + mno.delIPCnt.Add(1) + mno.hasIP.Store(false) return nil } @@ -119,6 +137,11 @@ func (mno *mockNetworkOperation) SendARP() error { if mno.sendArpErr.Load() { return errors.New("mock SendARP error") } + delay := time.Duration(mno.sendArpDelay.Load()) + if delay > 0 { + time.Sleep(delay) + } + mno.sendArpCnt.Add(1) return nil } @@ -130,6 +153,10 @@ func newMockConfig() *config.Config { return &config.Config{ Proxy: config.ProxyServer{Addr: "0.0.0.0:6000"}, API: config.API{Addr: "0.0.0.0:3080"}, - HA: config.HA{VirtualIP: "127.0.0.2/24", Interface: "lo"}, + HA: config.HA{ + VirtualIP: "127.0.0.2/24", + Interface: "lo", + GARPBurstCount: 5, + }, } } diff --git a/pkg/manager/vip/network.go b/pkg/manager/vip/network.go index 6b162f475..2f7376e45 100644 --- a/pkg/manager/vip/network.go +++ b/pkg/manager/vip/network.go @@ -7,8 +7,8 @@ import ( "runtime" "strings" "syscall" + "time" - "github.com/j-keck/arping" "github.com/pingcap/tiproxy/lib/util/errors" "github.com/pingcap/tiproxy/pkg/util/cmd" "github.com/vishvananda/netlink" @@ -31,13 +31,17 @@ type networkOperation struct { // the VIP address address *netlink.Addr // the network interface - link netlink.Link - lg *zap.Logger + link netlink.Link + lg *zap.Logger + garpBurstCount int + garpBurstInterval time.Duration } -func NewNetworkOperation(addressStr, linkStr string, lg *zap.Logger) (NetworkOperation, error) { +func NewNetworkOperation(addressStr, linkStr string, garpBurstCount int, garpBurstInterval time.Duration, lg *zap.Logger) (NetworkOperation, error) { no := &networkOperation{ - lg: lg, + lg: lg, + garpBurstCount: garpBurstCount, + garpBurstInterval: garpBurstInterval, } if err := no.initAddr(addressStr, linkStr); err != nil { return nil, err @@ -93,12 +97,20 @@ func (no *networkOperation) DeleteIP() error { } func (no *networkOperation) SendARP() error { - if err := arping.GratuitousArpOverIfaceByName(no.address.IP, no.link.Attrs().Name); err != nil { - no.lg.Warn("gratuitous arping failed", zap.Stringer("ip", no.address.IP), zap.String("iface", no.link.Attrs().Name), zap.Error(err)) + if no.garpBurstCount <= 0 { + return nil } - // GratuitousArpOverIfaceByName may not work properly even if it returns nil, so always run a command. - err := no.execCmd("sudo", "arping", "-c", "3", "-U", "-I", no.link.Attrs().Name, no.address.IP.String()) - return errors.WithStack(err) + for i := 0; i < no.garpBurstCount; i++ { + // Use "arping -c 1" repeatedly so that TiProxy controls the burst interval instead of + // relying on arping's built-in pacing. + if err := no.execCmd("sudo", "arping", "-c", "1", "-U", "-I", no.link.Attrs().Name, no.address.IP.String()); err != nil { + return errors.WithStack(err) + } + if no.garpBurstInterval > 0 && i+1 < no.garpBurstCount { + time.Sleep(no.garpBurstInterval) + } + } + return nil } func (no *networkOperation) Addr() string { diff --git a/pkg/manager/vip/network_test.go b/pkg/manager/vip/network_test.go index 2b00420b4..773dec489 100644 --- a/pkg/manager/vip/network_test.go +++ b/pkg/manager/vip/network_test.go @@ -51,7 +51,7 @@ func TestAddDelIP(t *testing.T) { } for i, test := range tests { - operation, err := NewNetworkOperation(test.virtualIP, test.link, zap.NewNop()) + operation, err := NewNetworkOperation(test.virtualIP, test.link, 1, 0, zap.NewNop()) if test.initErr != "" { require.Error(t, err, "case %d", i) require.Contains(t, err.Error(), test.initErr, "case %d", i) From 4bfefbfbf2ed579bb30d0e0f1c2be981a625e0d6 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Thu, 2 Apr 2026 21:14:47 +0800 Subject: [PATCH 02/12] add comments --- lib/config/proxy.go | 25 +++++++++++++++++++++---- pkg/manager/vip/manager.go | 19 ++++++++++++++++--- pkg/manager/vip/network.go | 12 ++++++++---- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/lib/config/proxy.go b/lib/config/proxy.go index e549dae92..cb3a379c8 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -108,10 +108,22 @@ type LogFile struct { } type HA struct { - VirtualIP string `yaml:"virtual-ip,omitempty" toml:"virtual-ip,omitempty" json:"virtual-ip,omitempty" reloadable:"false"` - Interface string `yaml:"interface,omitempty" toml:"interface,omitempty" json:"interface,omitempty" reloadable:"false"` - GARPBurstCount int `yaml:"garp-burst-count,omitempty" toml:"garp-burst-count,omitempty" json:"garp-burst-count,omitempty" reloadable:"false"` - GARPBurstInterval time.Duration `yaml:"garp-burst-interval,omitempty" toml:"garp-burst-interval,omitempty" json:"garp-burst-interval,omitempty" reloadable:"false"` + // VirtualIP is the floating service address managed by TiProxy. + // It is expected to use a host route such as /32 so only the elected node + // answers for the VIP itself. + VirtualIP string `yaml:"virtual-ip,omitempty" toml:"virtual-ip,omitempty" json:"virtual-ip,omitempty" reloadable:"false"` + // Interface is the local NIC that binds VirtualIP and sends GARP from. + Interface string `yaml:"interface,omitempty" toml:"interface,omitempty" json:"interface,omitempty" reloadable:"false"` + // GARPBurstCount is the number of GARP packets sent immediately after the + // new owner binds the VIP. A small burst makes takeover visible quickly even + // if the first packet is dropped by the host, bond driver, or upstream device. + GARPBurstCount int `yaml:"garp-burst-count,omitempty" toml:"garp-burst-count,omitempty" json:"garp-burst-count,omitempty" reloadable:"false"` + // GARPBurstInterval is the spacing inside one burst. Zero means "send the + // burst as fast as possible". + GARPBurstInterval time.Duration `yaml:"garp-burst-interval,omitempty" toml:"garp-burst-interval,omitempty" json:"garp-burst-interval,omitempty" reloadable:"false"` + // GARPRefreshInterval controls the delay between follow-up bursts after + // takeover. It is used to refresh stale neighbor caches for a short window + // after failover instead of emitting high-rate GARP forever. GARPRefreshInterval time.Duration `yaml:"garp-refresh-interval,omitempty" toml:"garp-refresh-interval,omitempty" json:"garp-refresh-interval,omitempty" reloadable:"false"` } @@ -153,6 +165,9 @@ func NewConfig() *Config { cfg.Balance = DefaultBalance() + // Match the common VRRP-style default of sending a small burst immediately + // after takeover. Refresh is disabled by default and can be enabled when the + // network requires extra neighbor-cache nudges. cfg.HA.GARPBurstCount = 5 cfg.EnableTrafficReplay = true @@ -207,6 +222,8 @@ func (cfg *Config) Check() error { return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-refresh-interval must be greater than or equal to 0") } if cfg.HA.GARPBurstCount == 0 && cfg.HA.GARPRefreshInterval > 0 { + // Refresh reuses the same burst sender. Requiring at least one packet per + // burst keeps the runtime behavior and the configuration model aligned. return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-burst-count must be greater than 0 when ha.garp-refresh-interval is enabled") } diff --git a/pkg/manager/vip/manager.go b/pkg/manager/vip/manager.go index 29997c59c..bf1717422 100644 --- a/pkg/manager/vip/manager.go +++ b/pkg/manager/vip/manager.go @@ -24,8 +24,9 @@ const ( // The etcd client keeps alive every TTL/3 seconds. // The TTL determines the failover time so it should be short. sessionTTL = 3 - // Refresh GARP for a short window after takeover so upstream devices have - // several chances to update the VIP neighbor entry. + // Refresh GARP for a short, bounded window after takeover so upstream + // devices have several chances to update the VIP neighbor entry without + // keeping permanent ARP noise on the network. garpRefreshRounds = 10 ) @@ -38,7 +39,11 @@ type VIPManager interface { var _ VIPManager = (*vipManager)(nil) type vipManager struct { - mu sync.Mutex + mu sync.Mutex + // closing blocks late OnElected callbacks during controlled shutdown. + // A VIP must not be present on two nodes at the same time because upstream + // L3 devices cache only one VIP->MAC entry; if old and new owners both answer + // ARP, whichever reply is learned last may blackhole cross-subnet traffic. closing bool refreshWG sync.WaitGroup refreshCancel context.CancelFunc @@ -96,6 +101,8 @@ func (vm *vipManager) OnElected() { vm.mu.Lock() defer vm.mu.Unlock() + // Election.Close may race with an already in-flight OnElected callback. + // Once controlled shutdown starts, never bind the VIP again locally. if vm.closing { vm.lg.Info("skip adding VIP because the manager is closing") return @@ -165,6 +172,9 @@ func (vm *vipManager) startARPRefresh() { ticker := time.NewTicker(refreshInterval) defer ticker.Stop() + // The first burst is sent synchronously by addVIP. The follow-up bursts + // cover devices that probe or refresh neighbor state a little later than + // the handover moment. for i := 0; i < garpRefreshRounds; i++ { select { case <-ctx.Done(): @@ -214,6 +224,9 @@ func (vm *vipManager) prepareForClose() elect.Election { vm.mu.Lock() defer vm.mu.Unlock() + // Drop the VIP before resigning. Letting the new owner add the VIP first is + // unsafe on real networks because upstream devices remember only one MAC for + // the VIP and may keep forwarding to the old node long after the overlap. vm.closing = true vm.stopARPRefresh() vm.delVIP() diff --git a/pkg/manager/vip/network.go b/pkg/manager/vip/network.go index 2f7376e45..f2bbc6d31 100644 --- a/pkg/manager/vip/network.go +++ b/pkg/manager/vip/network.go @@ -31,8 +31,10 @@ type networkOperation struct { // the VIP address address *netlink.Addr // the network interface - link netlink.Link - lg *zap.Logger + link netlink.Link + lg *zap.Logger + // garpBurstCount and garpBurstInterval define one takeover burst. The + // manager may replay the whole burst later during the refresh window. garpBurstCount int garpBurstInterval time.Duration } @@ -101,8 +103,10 @@ func (no *networkOperation) SendARP() error { return nil } for i := 0; i < no.garpBurstCount; i++ { - // Use "arping -c 1" repeatedly so that TiProxy controls the burst interval instead of - // relying on arping's built-in pacing. + // Use "arping -c 1" repeatedly so TiProxy controls both dimensions: + // 1. a tight burst right after takeover, and + // 2. later refresh bursts spaced by the manager. + // This keeps the behavior predictable across different arping versions. if err := no.execCmd("sudo", "arping", "-c", "1", "-U", "-I", no.link.Attrs().Name, no.address.IP.String()); err != nil { return errors.WithStack(err) } From 6281898869a72da3fbf4410966299e9462a961f8 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 3 Apr 2026 10:44:17 +0800 Subject: [PATCH 03/12] remove onRetire --- pkg/manager/elect/election.go | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/pkg/manager/elect/election.go b/pkg/manager/elect/election.go index d97c2b669..9440cda6e 100644 --- a/pkg/manager/elect/election.go +++ b/pkg/manager/elect/election.go @@ -154,18 +154,19 @@ func (m *election) campaignLoop(ctx context.Context) { elec := concurrency.NewElection(session, m.key) err = elec.Campaign(ctx, m.id) if err != nil { - if m.isOwner { - m.onRetired() - } + // Campaign may fail because etcd is temporarily unavailable while the + // current owner lease is still valid. Retiring immediately here would + // turn a transient etcd error into a local no-owner window. m.lg.Info("failed to campaign", zap.Error(errors.WithStack(err))) continue } kv, err := m.getOwnerInfo(ctx) if err != nil { - if m.isOwner { - m.onRetired() - } + // Failing to read the owner key does not necessarily mean this member + // has lost its lease. Keep the current owner state until we see a + // stronger signal such as session.Done, ErrLeaseNotFound, or an owner + // mismatch. m.lg.Warn("failed to get owner info", zap.Error(err)) continue } @@ -184,8 +185,7 @@ func (m *election) campaignLoop(ctx context.Context) { // It was the owner before the etcd failure and now is still the owner. m.lg.Info("still the owner") } - m.watchOwner(ctx, session, hack.String(kv.Key)) - if m.isOwner { + if m.watchOwner(ctx, session, hack.String(kv.Key)) && m.isOwner { m.onRetired() } } @@ -240,30 +240,32 @@ func (m *election) getOwnerInfo(ctx context.Context) (*mvccpb.KeyValue, error) { return kvs[0], nil } -func (m *election) watchOwner(ctx context.Context, session *concurrency.Session, key string) { +func (m *election) watchOwner(ctx context.Context, session *concurrency.Session, key string) bool { watchCh := m.etcdCli.Watch(ctx, key) for { select { case resp, ok := <-watchCh: if !ok { - m.lg.Info("watcher is closed, no owner") - return + // A closed watcher only means the watch stream needs to be rebuilt. + // It is not a proof that the current owner has lost its lease. + m.lg.Info("watcher is closed, retry watching owner") + return false } if resp.Canceled { - m.lg.Info("watch canceled, no owner") - return + m.lg.Info("watch canceled, retry watching owner") + return false } for _, ev := range resp.Events { if ev.Type == mvccpb.DELETE { m.lg.Info("watch failed, owner is deleted") - return + return true } } case <-session.Done(): - return + return true case <-ctx.Done(): - return + return false } } } From 22a5c8d7fdca46761317ff1a7c4cf3b190e069cf Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 3 Apr 2026 13:03:06 +0800 Subject: [PATCH 04/12] remove onRetired --- lib/config/proxy.go | 8 +++++--- pkg/manager/elect/election.go | 32 +++++++++++++----------------- pkg/manager/elect/election_test.go | 4 ++++ pkg/manager/elect/mock_test.go | 13 ++++++++++++ pkg/manager/vip/manager.go | 8 ++++---- 5 files changed, 40 insertions(+), 25 deletions(-) diff --git a/lib/config/proxy.go b/lib/config/proxy.go index cb3a379c8..50f2aa3c8 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -122,7 +122,7 @@ type HA struct { // burst as fast as possible". GARPBurstInterval time.Duration `yaml:"garp-burst-interval,omitempty" toml:"garp-burst-interval,omitempty" json:"garp-burst-interval,omitempty" reloadable:"false"` // GARPRefreshInterval controls the delay between follow-up bursts after - // takeover. It is used to refresh stale neighbor caches for a short window + // takeover. It is used to refresh stale neighbor caches for a bounded window // after failover instead of emitting high-rate GARP forever. GARPRefreshInterval time.Duration `yaml:"garp-refresh-interval,omitempty" toml:"garp-refresh-interval,omitempty" json:"garp-refresh-interval,omitempty" reloadable:"false"` } @@ -166,9 +166,11 @@ func NewConfig() *Config { cfg.Balance = DefaultBalance() // Match the common VRRP-style default of sending a small burst immediately - // after takeover. Refresh is disabled by default and can be enabled when the - // network requires extra neighbor-cache nudges. + // after takeover, then keep refreshing for a short period. The refresh helps + // upstream devices overwrite stale VIP->MAC entries after an abnormal owner + // handover. cfg.HA.GARPBurstCount = 5 + cfg.HA.GARPRefreshInterval = time.Second cfg.EnableTrafficReplay = true diff --git a/pkg/manager/elect/election.go b/pkg/manager/elect/election.go index 9440cda6e..3ff3fd9c2 100644 --- a/pkg/manager/elect/election.go +++ b/pkg/manager/elect/election.go @@ -119,9 +119,9 @@ func (m *election) campaignLoop(ctx context.Context) { m.lg.Debug("begin campaign") select { case <-session.Done(): - if m.isOwner { - m.onRetired() - } + // Keep the local owner state until we observe a stronger signal that + // another member has taken over. Retiring here would trade split-brain + // risk for a no-owner window during etcd faults. m.lg.Info("etcd session is done, creates a new one") leaseID := session.Lease() session, err = concurrency.NewSession(m.etcdCli, concurrency.WithTTL(m.cfg.SessionTTL), concurrency.WithContext(ctx)) @@ -141,9 +141,6 @@ func (m *election) campaignLoop(ctx context.Context) { // The etcd server deletes this session's lease ID, but etcd session doesn't find it. // In this case if we do the campaign operation, the etcd server will return ErrLeaseNotFound. if errors.Is(err, rpctypes.ErrLeaseNotFound) { - if m.isOwner { - m.onRetired() - } if session != nil { err = session.Close() m.lg.Warn("etcd session encounters ErrLeaseNotFound, close it", zap.Error(err)) @@ -185,9 +182,7 @@ func (m *election) campaignLoop(ctx context.Context) { // It was the owner before the etcd failure and now is still the owner. m.lg.Info("still the owner") } - if m.watchOwner(ctx, session, hack.String(kv.Key)) && m.isOwner { - m.onRetired() - } + m.watchOwner(ctx, session, hack.String(kv.Key)) } } @@ -240,32 +235,33 @@ func (m *election) getOwnerInfo(ctx context.Context) (*mvccpb.KeyValue, error) { return kvs[0], nil } -func (m *election) watchOwner(ctx context.Context, session *concurrency.Session, key string) bool { +func (m *election) watchOwner(ctx context.Context, session *concurrency.Session, key string) { watchCh := m.etcdCli.Watch(ctx, key) for { select { case resp, ok := <-watchCh: if !ok { - // A closed watcher only means the watch stream needs to be rebuilt. - // It is not a proof that the current owner has lost its lease. m.lg.Info("watcher is closed, retry watching owner") - return false + return } if resp.Canceled { m.lg.Info("watch canceled, retry watching owner") - return false + return } for _, ev := range resp.Events { if ev.Type == mvccpb.DELETE { - m.lg.Info("watch failed, owner is deleted") - return true + // The owner key may disappear before another member campaigns. + // Keep the local owner state and let the next campaign round decide + // whether a new owner has really taken over. + m.lg.Info("watch found owner deleted, retry campaigning") + return } } case <-session.Done(): - return true + return case <-ctx.Done(): - return false + return } } } diff --git a/pkg/manager/elect/election_test.go b/pkg/manager/elect/election_test.go index 723df6a81..35726402b 100644 --- a/pkg/manager/elect/election_test.go +++ b/pkg/manager/elect/election_test.go @@ -75,6 +75,10 @@ func TestEtcdServerDown(t *testing.T) { // server is down addr := ts.shutdownServer() + // Losing etcd temporarily should not force the current owner to retire + // locally. Otherwise VIP owner election would turn a control-plane fault + // into a no-owner data-plane outage. + ts.expectNoEvent("1", 1500*time.Millisecond) _, err := elec1.GetOwnerID(context.Background()) require.Error(t, err) ts.startServer(addr) diff --git a/pkg/manager/elect/mock_test.go b/pkg/manager/elect/mock_test.go index 2ce5e27c4..b677490c2 100644 --- a/pkg/manager/elect/mock_test.go +++ b/pkg/manager/elect/mock_test.go @@ -57,6 +57,14 @@ func (mo *mockMember) expectEvent(t *testing.T, expected ...int) { } } +func (mo *mockMember) expectNoEvent(t *testing.T, timeout time.Duration) { + select { + case event := <-mo.ch: + t.Fatalf("unexpected event %d", event) + case <-time.After(timeout): + } +} + func (mo *mockMember) hang(hang bool) { mo.hangElectedMu.Lock() defer mo.hangElectedMu.Unlock() @@ -154,6 +162,11 @@ func (ts *etcdTestSuite) expectEvent(id string, event ...int) { elec.member.(*mockMember).expectEvent(ts.t, event...) } +func (ts *etcdTestSuite) expectNoEvent(id string, timeout time.Duration) { + elec := ts.getElection(id) + elec.member.(*mockMember).expectNoEvent(ts.t, timeout) +} + func (ts *etcdTestSuite) hang(id string, hang bool) { elec := ts.getElection(id) elec.member.(*mockMember).hang(hang) diff --git a/pkg/manager/vip/manager.go b/pkg/manager/vip/manager.go index bf1717422..970cc1b88 100644 --- a/pkg/manager/vip/manager.go +++ b/pkg/manager/vip/manager.go @@ -24,10 +24,10 @@ const ( // The etcd client keeps alive every TTL/3 seconds. // The TTL determines the failover time so it should be short. sessionTTL = 3 - // Refresh GARP for a short, bounded window after takeover so upstream - // devices have several chances to update the VIP neighbor entry without - // keeping permanent ARP noise on the network. - garpRefreshRounds = 10 + // Refresh GARP for a bounded window after takeover so upstream devices have + // repeated chances to overwrite stale VIP->MAC cache entries after abnormal + // failover, while still avoiding permanent ARP noise. + garpRefreshRounds = 30 ) type VIPManager interface { From e8fe83a703b6556a41c804085478759ce3871e0a Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 3 Apr 2026 17:30:04 +0800 Subject: [PATCH 05/12] waitgroup --- AGENTS.md | 7 +++++++ pkg/manager/vip/manager.go | 10 ++++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index dd553c8dc..3ca6353fd 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -92,6 +92,11 @@ Guidelines: - Reuse common error helpers from `lib/util` / `pkg/util` when available. - Do not silently ignore errors; either handle them explicitly or return them to callers. +- **Comments for non-obvious logic** + - Add succinct comments around code paths whose correctness depends on subtle concurrency, networking, ownership, or failure-handling assumptions. + - The goal is to explain *why* the code is written that way so future contributors do not "simplify" it into a regression. + - Avoid repeating what the code already says; focus comments on invariants, trade-offs, and external behavior. + - **Logging** - Use the shared logging facilities (for example, logger manager) rather than creating ad-hoc loggers. - Include important identifiers (such as namespace, connection ID, cluster information) in logs when they help debugging. @@ -100,6 +105,8 @@ Guidelines: - **Concurrency and context** - Always pass `context.Context` through call chains where operations may block, allocate resources, or perform I/O. - Do not start goroutines without a clear lifetime; ensure there is a way to stop them (via context cancellation or explicit shutdown). + - Avoid bare `go func()` for managed background work. Prefer the repository `waitgroup` helpers (for example `pkg/util/waitgroup.WaitGroup.Run` or `RunWithRecover`) so goroutine lifecycle and shutdown are tracked consistently. + - If a background goroutine should recover from panic instead of crashing the whole process, use `waitgroup.RunWithRecover()` and handle recovery through the shared helper rather than ad-hoc `recover()` logic. - Avoid sharing mutable state across goroutines without proper synchronization. - Be careful when exposing channels and mutexes in public APIs; clearly document ownership and who is responsible for closing channels. diff --git a/pkg/manager/vip/manager.go b/pkg/manager/vip/manager.go index 970cc1b88..d6d76fc24 100644 --- a/pkg/manager/vip/manager.go +++ b/pkg/manager/vip/manager.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/tiproxy/lib/config" "github.com/pingcap/tiproxy/pkg/manager/elect" + "github.com/pingcap/tiproxy/pkg/util/waitgroup" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -45,7 +46,7 @@ type vipManager struct { // L3 devices cache only one VIP->MAC entry; if old and new owners both answer // ARP, whichever reply is learned last may blackhole cross-subnet traffic. closing bool - refreshWG sync.WaitGroup + refreshWG waitgroup.WaitGroup refreshCancel context.CancelFunc operation NetworkOperation cfgGetter config.ConfigGetter @@ -166,10 +167,7 @@ func (vm *vipManager) startARPRefresh() { ctx, cancel := context.WithCancel(context.Background()) vm.refreshCancel = cancel - vm.refreshWG.Add(1) - go func() { - defer vm.refreshWG.Done() - + vm.refreshWG.RunWithRecover(func() { ticker := time.NewTicker(refreshInterval) defer ticker.Stop() // The first burst is sent synchronously by addVIP. The follow-up bursts @@ -186,7 +184,7 @@ func (vm *vipManager) startARPRefresh() { } } } - }() + }, nil, vm.lg) } func (vm *vipManager) stopARPRefresh() { From a4755490afc41daa4e3a473012ce980cd24e5bd4 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 3 Apr 2026 18:35:10 +0800 Subject: [PATCH 06/12] addVIP bool --- go.mod | 1 + go.sum | 2 ++ pkg/manager/vip/manager.go | 18 +++++++++++------- pkg/manager/vip/manager_test.go | 19 +++++++++++++++++++ pkg/manager/vip/network.go | 25 ++++++++++++++++++++----- 5 files changed, 53 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index e6ae7ca06..d83a4fb7b 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/go-sql-driver/mysql v1.7.1 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 + github.com/j-keck/arping v1.0.3 github.com/klauspost/compress v1.18.0 github.com/pelletier/go-toml/v2 v2.2.2 github.com/pingcap/kvproto v0.0.0-20250728031536-f08901d17bf4 diff --git a/go.sum b/go.sum index 80480d9e5..669bd3be2 100644 --- a/go.sum +++ b/go.sum @@ -518,6 +518,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY= github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y= +github.com/j-keck/arping v1.0.3 h1:aeVk5WnsK6xPaRsFt5wV6W2x5l/n5XBNp0MMr/FEv2k= +github.com/j-keck/arping v1.0.3/go.mod h1:aJbELhR92bSk7tp79AWM/ftfc90EfEi2bQJrbBFOsPw= github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jedib0t/go-pretty/v6 v6.2.2 h1:o3McN0rQ4X+IU+HduppSp9TwRdGLRW2rhJXy9CJaCRw= github.com/jedib0t/go-pretty/v6 v6.2.2/go.mod h1:+nE9fyyHGil+PuISTCrp7avEdo6bqoMwqZnuiK2r2a0= diff --git a/pkg/manager/vip/manager.go b/pkg/manager/vip/manager.go index d6d76fc24..89fce88cb 100644 --- a/pkg/manager/vip/manager.go +++ b/pkg/manager/vip/manager.go @@ -108,8 +108,9 @@ func (vm *vipManager) OnElected() { vm.lg.Info("skip adding VIP because the manager is closing") return } - vm.addVIP() - vm.startARPRefresh() + if vm.addVIP() { + vm.startARPRefresh() + } } func (vm *vipManager) OnRetired() { @@ -120,25 +121,28 @@ func (vm *vipManager) OnRetired() { vm.delVIP() } -func (vm *vipManager) addVIP() { +func (vm *vipManager) addVIP() bool { hasIP, err := vm.operation.HasIP() if err != nil { vm.lg.Error("checking addresses failed", zap.Error(err)) - return + return false } if hasIP { vm.lg.Debug("already has VIP, do nothing") - return + return true } if err := vm.operation.AddIP(); err != nil { vm.lg.Error("adding address failed", zap.Error(err)) - return + return false } if err := vm.operation.SendARP(); err != nil { vm.lg.Error("broadcast ARP failed", zap.Error(err)) - return + // The VIP is already bound locally. Keep the later refresh loop as a + // best-effort retry path for notifying upstream devices. + return true } vm.lg.Info("adding VIP success") + return true } func (vm *vipManager) delVIP() { diff --git a/pkg/manager/vip/manager_test.go b/pkg/manager/vip/manager_test.go index 54d9092c6..9f4c4a9aa 100644 --- a/pkg/manager/vip/manager_test.go +++ b/pkg/manager/vip/manager_test.go @@ -211,6 +211,25 @@ func TestGARPRefresh(t *testing.T) { require.False(t, operation.hasIP.Load()) } +func TestGARPRefreshNotStartedWhenVIPNotBound(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + cfg := newMockConfig() + cfg.HA.GARPBurstCount = 1 + cfg.HA.GARPRefreshInterval = 10 * time.Millisecond + operation := newMockNetworkOperation() + operation.addIPErr.Store(true) + vm := &vipManager{ + lg: lg, + cfgGetter: newMockConfigGetter(cfg), + operation: operation, + } + + vm.OnElected() + time.Sleep(50 * time.Millisecond) + require.EqualValues(t, 0, operation.sendArpCnt.Load()) + require.False(t, operation.hasIP.Load()) +} + func TestStartAndClose(t *testing.T) { lg, _ := logger.CreateLoggerForTest(t) vm, err := NewVIPManager(lg, newMockConfigGetter(newMockConfig())) diff --git a/pkg/manager/vip/network.go b/pkg/manager/vip/network.go index f2bbc6d31..9972d5fbf 100644 --- a/pkg/manager/vip/network.go +++ b/pkg/manager/vip/network.go @@ -9,6 +9,7 @@ import ( "syscall" "time" + "github.com/j-keck/arping" "github.com/pingcap/tiproxy/lib/util/errors" "github.com/pingcap/tiproxy/pkg/util/cmd" "github.com/vishvananda/netlink" @@ -103,11 +104,10 @@ func (no *networkOperation) SendARP() error { return nil } for i := 0; i < no.garpBurstCount; i++ { - // Use "arping -c 1" repeatedly so TiProxy controls both dimensions: - // 1. a tight burst right after takeover, and - // 2. later refresh bursts spaced by the manager. - // This keeps the behavior predictable across different arping versions. - if err := no.execCmd("sudo", "arping", "-c", "1", "-U", "-I", no.link.Attrs().Name, no.address.IP.String()); err != nil { + // Keep both sending paths: the library path avoids depending on the + // external "arping" command, while the command path has proven more + // reliable on some customer environments. Treat either one as success. + if err := no.sendARPOneShot(); err != nil { return errors.WithStack(err) } if no.garpBurstInterval > 0 && i+1 < no.garpBurstCount { @@ -124,6 +124,21 @@ func (no *networkOperation) Addr() string { return no.address.IP.String() } +func (no *networkOperation) sendARPOneShot() error { + libErr := arping.GratuitousArpOverIfaceByName(no.address.IP, no.link.Attrs().Name) + if libErr != nil { + no.lg.Warn("gratuitous arping via library failed", + zap.Stringer("ip", no.address.IP), + zap.String("iface", no.link.Attrs().Name), + zap.Error(libErr)) + } + cmdErr := no.execCmd("sudo", "arping", "-c", "1", "-U", "-I", no.link.Attrs().Name, no.address.IP.String()) + if libErr == nil || cmdErr == nil { + return nil + } + return errors.Wrap(errors.WithStack(cmdErr), errors.WithStack(libErr)) +} + func (no *networkOperation) execCmd(args ...string) error { output, err := cmd.ExecCmd(args[0], args[1:]...) no.lg.Info("executed cmd", zap.String("cmd", strings.Join(args, " ")), zap.String("output", output), zap.Error(err)) From bef4a60413c2113e6d4e00cd03c914bad39c935e Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 3 Apr 2026 18:46:12 +0800 Subject: [PATCH 07/12] remove duplicate delete --- pkg/manager/vip/manager.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/manager/vip/manager.go b/pkg/manager/vip/manager.go index 89fce88cb..c16c91f70 100644 --- a/pkg/manager/vip/manager.go +++ b/pkg/manager/vip/manager.go @@ -215,11 +215,6 @@ func (vm *vipManager) Close() { if election != nil { election.Close() } - - vm.mu.Lock() - vm.stopARPRefresh() - vm.delVIP() - vm.mu.Unlock() } func (vm *vipManager) prepareForClose() elect.Election { From 499db679abb779a750ca63e2ea149961a579c612 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 3 Apr 2026 18:53:20 +0800 Subject: [PATCH 08/12] errors.WithStack() --- AGENTS.md | 1 + pkg/manager/vip/network.go | 13 +++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 3ca6353fd..88124ae88 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -90,6 +90,7 @@ Guidelines: - **Error handling** - Prefer wrapping errors with context so they are actionable (what operation failed, which component, key parameters). - Reuse common error helpers from `lib/util` / `pkg/util` when available. + - Add `errors.WithStack()` only at the lowest layer that first creates or captures the concrete error. Higher layers should usually add context with `errors.Wrap()` / `errors.Wrapf()` instead of attaching another stack trace. - Do not silently ignore errors; either handle them explicitly or return them to callers. - **Comments for non-obvious logic** diff --git a/pkg/manager/vip/network.go b/pkg/manager/vip/network.go index 9972d5fbf..e6e16141e 100644 --- a/pkg/manager/vip/network.go +++ b/pkg/manager/vip/network.go @@ -104,11 +104,8 @@ func (no *networkOperation) SendARP() error { return nil } for i := 0; i < no.garpBurstCount; i++ { - // Keep both sending paths: the library path avoids depending on the - // external "arping" command, while the command path has proven more - // reliable on some customer environments. Treat either one as success. if err := no.sendARPOneShot(); err != nil { - return errors.WithStack(err) + return err } if no.garpBurstInterval > 0 && i+1 < no.garpBurstCount { time.Sleep(no.garpBurstInterval) @@ -125,6 +122,10 @@ func (no *networkOperation) Addr() string { } func (no *networkOperation) sendARPOneShot() error { + // Keep both sending paths for a single logical GARP attempt: the library + // path avoids depending on the external "arping" command, while the command + // path has proven more reliable on some customer environments. Treat either + // one as success. libErr := arping.GratuitousArpOverIfaceByName(no.address.IP, no.link.Attrs().Name) if libErr != nil { no.lg.Warn("gratuitous arping via library failed", @@ -132,6 +133,10 @@ func (no *networkOperation) sendARPOneShot() error { zap.String("iface", no.link.Attrs().Name), zap.Error(libErr)) } + // Always use "arping -c 1" here and let SendARP control the outer burst + // count and interval. Using "arping -c 5" would hide pacing inside the + // command, making takeover and refresh timing less predictable from TiProxy + // and harder to reason about in tests and production troubleshooting. cmdErr := no.execCmd("sudo", "arping", "-c", "1", "-U", "-I", no.link.Attrs().Name, no.address.IP.String()) if libErr == nil || cmdErr == nil { return nil From b598a1e42bfb137473879c463985161645ba65ba Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 3 Apr 2026 19:02:30 +0800 Subject: [PATCH 09/12] context --- AGENTS.md | 1 + pkg/manager/vip/manager.go | 65 ++++++++++++++++++++++----------- pkg/manager/vip/manager_test.go | 37 +++++++++++++++++++ pkg/manager/vip/mock_test.go | 15 +++++++- pkg/manager/vip/network.go | 21 +++++++++-- pkg/manager/vip/network_test.go | 3 +- 6 files changed, 115 insertions(+), 27 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 88124ae88..802662c2f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -108,6 +108,7 @@ Guidelines: - Do not start goroutines without a clear lifetime; ensure there is a way to stop them (via context cancellation or explicit shutdown). - Avoid bare `go func()` for managed background work. Prefer the repository `waitgroup` helpers (for example `pkg/util/waitgroup.WaitGroup.Run` or `RunWithRecover`) so goroutine lifecycle and shutdown are tracked consistently. - If a background goroutine should recover from panic instead of crashing the whole process, use `waitgroup.RunWithRecover()` and handle recovery through the shared helper rather than ad-hoc `recover()` logic. + - If a wait or sleep may delay shutdown, owner handoff, or other cancellation-sensitive flows, do not use an unconditional `time.Sleep`. Prefer `timer + context` (or an equivalent cancellable wait) so the code can exit promptly. - Avoid sharing mutable state across goroutines without proper synchronization. - Be careful when exposing channels and mutexes in public APIs; clearly document ownership and who is responsible for closing channels. diff --git a/pkg/manager/vip/manager.go b/pkg/manager/vip/manager.go index c16c91f70..66480c521 100644 --- a/pkg/manager/vip/manager.go +++ b/pkg/manager/vip/manager.go @@ -11,6 +11,7 @@ import ( "time" "github.com/pingcap/tiproxy/lib/config" + "github.com/pingcap/tiproxy/lib/util/errors" "github.com/pingcap/tiproxy/pkg/manager/elect" "github.com/pingcap/tiproxy/pkg/util/waitgroup" clientv3 "go.etcd.io/etcd/client/v3" @@ -45,13 +46,13 @@ type vipManager struct { // A VIP must not be present on two nodes at the same time because upstream // L3 devices cache only one VIP->MAC entry; if old and new owners both answer // ARP, whichever reply is learned last may blackhole cross-subnet traffic. - closing bool - refreshWG waitgroup.WaitGroup - refreshCancel context.CancelFunc - operation NetworkOperation - cfgGetter config.ConfigGetter - election elect.Election - lg *zap.Logger + closing bool + arpCancel context.CancelFunc + refreshWG waitgroup.WaitGroup + operation NetworkOperation + cfgGetter config.ConfigGetter + election elect.Election + lg *zap.Logger } func NewVIPManager(lg *zap.Logger, cfgGetter config.ConfigGetter) (*vipManager, error) { @@ -100,16 +101,24 @@ func (vm *vipManager) Start(ctx context.Context, etcdCli *clientv3.Client) error func (vm *vipManager) OnElected() { vm.mu.Lock() - defer vm.mu.Unlock() - // Election.Close may race with an already in-flight OnElected callback. // Once controlled shutdown starts, never bind the VIP again locally. if vm.closing { + vm.mu.Unlock() vm.lg.Info("skip adding VIP because the manager is closing") return } - if vm.addVIP() { - vm.startARPRefresh() + vm.stopARPRefresh() + ctx, cancel := context.WithCancel(context.Background()) + vm.arpCancel = cancel + vm.mu.Unlock() + + if vm.addVIP(ctx) { + vm.mu.Lock() + if !vm.closing && ctx.Err() == nil { + vm.startARPRefresh(ctx) + } + vm.mu.Unlock() } } @@ -121,7 +130,10 @@ func (vm *vipManager) OnRetired() { vm.delVIP() } -func (vm *vipManager) addVIP() bool { +func (vm *vipManager) addVIP(ctx context.Context) bool { + if err := ctx.Err(); err != nil { + return false + } hasIP, err := vm.operation.HasIP() if err != nil { vm.lg.Error("checking addresses failed", zap.Error(err)) @@ -131,11 +143,23 @@ func (vm *vipManager) addVIP() bool { vm.lg.Debug("already has VIP, do nothing") return true } + if err := ctx.Err(); err != nil { + return false + } if err := vm.operation.AddIP(); err != nil { vm.lg.Error("adding address failed", zap.Error(err)) return false } - if err := vm.operation.SendARP(); err != nil { + if err := ctx.Err(); err != nil { + vm.delVIP() + return false + } + if err := vm.operation.SendARP(ctx); err != nil { + if errors.Is(err, context.Canceled) { + vm.lg.Info("broadcast ARP canceled") + vm.delVIP() + return false + } vm.lg.Error("broadcast ARP failed", zap.Error(err)) // The VIP is already bound locally. Keep the later refresh loop as a // best-effort retry path for notifying upstream devices. @@ -162,15 +186,11 @@ func (vm *vipManager) delVIP() { vm.lg.Info("deleting VIP success") } -func (vm *vipManager) startARPRefresh() { +func (vm *vipManager) startARPRefresh(ctx context.Context) { refreshInterval := vm.cfgGetter.GetConfig().HA.GARPRefreshInterval if refreshInterval <= 0 { return } - vm.stopARPRefresh() - - ctx, cancel := context.WithCancel(context.Background()) - vm.refreshCancel = cancel vm.refreshWG.RunWithRecover(func() { ticker := time.NewTicker(refreshInterval) defer ticker.Stop() @@ -182,7 +202,10 @@ func (vm *vipManager) startARPRefresh() { case <-ctx.Done(): return case <-ticker.C: - if err := vm.operation.SendARP(); err != nil { + if err := vm.operation.SendARP(ctx); err != nil { + if errors.Is(err, context.Canceled) { + return + } vm.lg.Warn("refreshing GARP failed", zap.Error(err)) return } @@ -192,8 +215,8 @@ func (vm *vipManager) startARPRefresh() { } func (vm *vipManager) stopARPRefresh() { - cancel := vm.refreshCancel - vm.refreshCancel = nil + cancel := vm.arpCancel + vm.arpCancel = nil if cancel != nil { cancel() } diff --git a/pkg/manager/vip/manager_test.go b/pkg/manager/vip/manager_test.go index 9f4c4a9aa..6816178bd 100644 --- a/pkg/manager/vip/manager_test.go +++ b/pkg/manager/vip/manager_test.go @@ -230,6 +230,43 @@ func TestGARPRefreshNotStartedWhenVIPNotBound(t *testing.T) { require.False(t, operation.hasIP.Load()) } +func TestPreCloseCancelsInFlightARP(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + cfg := newMockConfig() + cfg.HA.GARPBurstCount = 2 + cfg.HA.GARPBurstInterval = time.Second + operation := newMockNetworkOperation() + operation.sendArpDelay.Store(int64(200 * time.Millisecond)) + vm := &vipManager{ + lg: lg, + cfgGetter: newMockConfigGetter(cfg), + operation: operation, + } + + done := make(chan struct{}) + go func() { + vm.OnElected() + close(done) + }() + + require.Eventually(t, func() bool { + return operation.addIPCnt.Load() == 1 + }, time.Second, 10*time.Millisecond) + + start := time.Now() + vm.PreClose() + require.Less(t, time.Since(start), 500*time.Millisecond) + require.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + require.False(t, operation.hasIP.Load()) +} + func TestStartAndClose(t *testing.T) { lg, _ := logger.CreateLoggerForTest(t) vm, err := NewVIPManager(lg, newMockConfigGetter(newMockConfig())) diff --git a/pkg/manager/vip/mock_test.go b/pkg/manager/vip/mock_test.go index 59c0aaa59..7fd4be214 100644 --- a/pkg/manager/vip/mock_test.go +++ b/pkg/manager/vip/mock_test.go @@ -133,13 +133,24 @@ func (mno *mockNetworkOperation) DeleteIP() error { return nil } -func (mno *mockNetworkOperation) SendARP() error { +func (mno *mockNetworkOperation) SendARP(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } if mno.sendArpErr.Load() { return errors.New("mock SendARP error") } delay := time.Duration(mno.sendArpDelay.Load()) if delay > 0 { - time.Sleep(delay) + timer := time.NewTimer(delay) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return ctx.Err() + case <-timer.C: + } } mno.sendArpCnt.Add(1) return nil diff --git a/pkg/manager/vip/network.go b/pkg/manager/vip/network.go index e6e16141e..0ca49e975 100644 --- a/pkg/manager/vip/network.go +++ b/pkg/manager/vip/network.go @@ -4,6 +4,7 @@ package vip import ( + "context" "runtime" "strings" "syscall" @@ -22,7 +23,7 @@ type NetworkOperation interface { HasIP() (bool, error) AddIP() error DeleteIP() error - SendARP() error + SendARP(context.Context) error Addr() string } @@ -99,16 +100,30 @@ func (no *networkOperation) DeleteIP() error { return errors.WithStack(err) } -func (no *networkOperation) SendARP() error { +func (no *networkOperation) SendARP(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } if no.garpBurstCount <= 0 { return nil } for i := 0; i < no.garpBurstCount; i++ { + if err := ctx.Err(); err != nil { + return err + } if err := no.sendARPOneShot(); err != nil { return err } if no.garpBurstInterval > 0 && i+1 < no.garpBurstCount { - time.Sleep(no.garpBurstInterval) + timer := time.NewTimer(no.garpBurstInterval) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return ctx.Err() + case <-timer.C: + } } } return nil diff --git a/pkg/manager/vip/network_test.go b/pkg/manager/vip/network_test.go index 773dec489..eaea57362 100644 --- a/pkg/manager/vip/network_test.go +++ b/pkg/manager/vip/network_test.go @@ -6,6 +6,7 @@ package vip import ( + "context" "runtime" "strings" "testing" @@ -68,7 +69,7 @@ func TestAddDelIP(t *testing.T) { continue } - err = operation.SendARP() + err = operation.SendARP(context.Background()) if err != nil { require.True(t, isOtherErr(err)) } From f66c35b20f7b715072505a84684b73b9043c13d3 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 3 Apr 2026 19:08:23 +0800 Subject: [PATCH 10/12] remove ctx.Err --- pkg/manager/vip/manager.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/pkg/manager/vip/manager.go b/pkg/manager/vip/manager.go index 66480c521..4945d4535 100644 --- a/pkg/manager/vip/manager.go +++ b/pkg/manager/vip/manager.go @@ -115,7 +115,7 @@ func (vm *vipManager) OnElected() { if vm.addVIP(ctx) { vm.mu.Lock() - if !vm.closing && ctx.Err() == nil { + if !vm.closing { vm.startARPRefresh(ctx) } vm.mu.Unlock() @@ -131,9 +131,6 @@ func (vm *vipManager) OnRetired() { } func (vm *vipManager) addVIP(ctx context.Context) bool { - if err := ctx.Err(); err != nil { - return false - } hasIP, err := vm.operation.HasIP() if err != nil { vm.lg.Error("checking addresses failed", zap.Error(err)) @@ -143,9 +140,6 @@ func (vm *vipManager) addVIP(ctx context.Context) bool { vm.lg.Debug("already has VIP, do nothing") return true } - if err := ctx.Err(); err != nil { - return false - } if err := vm.operation.AddIP(); err != nil { vm.lg.Error("adding address failed", zap.Error(err)) return false From f34d2f069ff0c77e5c90c0d3bdcce85a80f095d2 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 3 Apr 2026 19:37:09 +0800 Subject: [PATCH 11/12] manual fix --- lib/config/proxy.go | 22 +++++++--------------- lib/config/proxy_test.go | 25 +++++-------------------- pkg/manager/vip/manager.go | 25 ++++++------------------- pkg/manager/vip/manager_test.go | 10 +++++----- pkg/manager/vip/network.go | 21 +++++---------------- pkg/manager/vip/network_test.go | 2 +- 6 files changed, 29 insertions(+), 76 deletions(-) diff --git a/lib/config/proxy.go b/lib/config/proxy.go index 50f2aa3c8..02462cf08 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -118,13 +118,10 @@ type HA struct { // new owner binds the VIP. A small burst makes takeover visible quickly even // if the first packet is dropped by the host, bond driver, or upstream device. GARPBurstCount int `yaml:"garp-burst-count,omitempty" toml:"garp-burst-count,omitempty" json:"garp-burst-count,omitempty" reloadable:"false"` - // GARPBurstInterval is the spacing inside one burst. Zero means "send the - // burst as fast as possible". - GARPBurstInterval time.Duration `yaml:"garp-burst-interval,omitempty" toml:"garp-burst-interval,omitempty" json:"garp-burst-interval,omitempty" reloadable:"false"` - // GARPRefreshInterval controls the delay between follow-up bursts after + // GARPRefreshCount controls the number of follow-up bursts after // takeover. It is used to refresh stale neighbor caches for a bounded window // after failover instead of emitting high-rate GARP forever. - GARPRefreshInterval time.Duration `yaml:"garp-refresh-interval,omitempty" toml:"garp-refresh-interval,omitempty" json:"garp-refresh-interval,omitempty" reloadable:"false"` + GARPRefreshCount int `yaml:"garp-refresh-count,omitempty" toml:"garp-refresh-count,omitempty" json:"garp-refresh-count,omitempty" reloadable:"false"` } func DefaultKeepAlive() (frontend, backendHealthy, backendUnhealthy KeepAlive) { @@ -170,7 +167,7 @@ func NewConfig() *Config { // upstream devices overwrite stale VIP->MAC entries after an abnormal owner // handover. cfg.HA.GARPBurstCount = 5 - cfg.HA.GARPRefreshInterval = time.Second + cfg.HA.GARPRefreshCount = 30 cfg.EnableTrafficReplay = true @@ -217,16 +214,11 @@ func (cfg *Config) Check() error { if cfg.HA.GARPBurstCount < 0 { return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-burst-count must be greater than or equal to 0") } - if cfg.HA.GARPBurstInterval < 0 { - return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-burst-interval must be greater than or equal to 0") + if cfg.HA.GARPBurstCount == 0 { + cfg.HA.GARPBurstCount = 1 } - if cfg.HA.GARPRefreshInterval < 0 { - return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-refresh-interval must be greater than or equal to 0") - } - if cfg.HA.GARPBurstCount == 0 && cfg.HA.GARPRefreshInterval > 0 { - // Refresh reuses the same burst sender. Requiring at least one packet per - // burst keeps the runtime behavior and the configuration model aligned. - return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-burst-count must be greater than 0 when ha.garp-refresh-interval is enabled") + if cfg.HA.GARPRefreshCount < 0 { + return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-refresh-count must be greater than or equal to 0") } return nil diff --git a/lib/config/proxy_test.go b/lib/config/proxy_test.go index 9391c9af3..ef032359b 100644 --- a/lib/config/proxy_test.go +++ b/lib/config/proxy_test.go @@ -8,7 +8,6 @@ import ( "os" "path/filepath" "testing" - "time" "github.com/BurntSushi/toml" mconfig "github.com/pingcap/metering_sdk/config" @@ -87,11 +86,10 @@ var testProxyConfig = Config{ RequireBackendTLS: true, }, HA: HA{ - VirtualIP: "10.10.10.10/32", - Interface: "eth0", - GARPBurstCount: 5, - GARPBurstInterval: time.Second, - GARPRefreshInterval: time.Minute, + VirtualIP: "10.10.10.10/32", + Interface: "eth0", + GARPBurstCount: 5, + GARPRefreshCount: 30, }, Metering: mconfig.MeteringConfig{ Type: storage.ProviderTypeAzure, @@ -204,20 +202,7 @@ func TestProxyCheck(t *testing.T) { }, { pre: func(t *testing.T, c *Config) { - c.HA.GARPBurstInterval = -time.Second - }, - err: ErrInvalidConfigValue, - }, - { - pre: func(t *testing.T, c *Config) { - c.HA.GARPRefreshInterval = -time.Second - }, - err: ErrInvalidConfigValue, - }, - { - pre: func(t *testing.T, c *Config) { - c.HA.GARPBurstCount = 0 - c.HA.GARPRefreshInterval = time.Second + c.HA.GARPRefreshCount = -1 }, err: ErrInvalidConfigValue, }, diff --git a/pkg/manager/vip/manager.go b/pkg/manager/vip/manager.go index 4945d4535..0264d659b 100644 --- a/pkg/manager/vip/manager.go +++ b/pkg/manager/vip/manager.go @@ -11,7 +11,6 @@ import ( "time" "github.com/pingcap/tiproxy/lib/config" - "github.com/pingcap/tiproxy/lib/util/errors" "github.com/pingcap/tiproxy/pkg/manager/elect" "github.com/pingcap/tiproxy/pkg/util/waitgroup" clientv3 "go.etcd.io/etcd/client/v3" @@ -29,7 +28,7 @@ const ( // Refresh GARP for a bounded window after takeover so upstream devices have // repeated chances to overwrite stale VIP->MAC cache entries after abnormal // failover, while still avoiding permanent ARP noise. - garpRefreshRounds = 30 + garpRefreshInterval = 1 * time.Second ) type VIPManager interface { @@ -68,7 +67,7 @@ func NewVIPManager(lg *zap.Logger, cfgGetter config.ConfigGetter) (*vipManager, vm.lg.Warn("Both address and link must be specified to enable VIP. VIP is disabled") return nil, nil } - operation, err := NewNetworkOperation(cfg.HA.VirtualIP, cfg.HA.Interface, cfg.HA.GARPBurstCount, cfg.HA.GARPBurstInterval, lg) + operation, err := NewNetworkOperation(cfg.HA.VirtualIP, cfg.HA.Interface, cfg.HA.GARPBurstCount, lg) if err != nil { vm.lg.Error("init network operation failed", zap.Error(err)) return nil, err @@ -144,16 +143,7 @@ func (vm *vipManager) addVIP(ctx context.Context) bool { vm.lg.Error("adding address failed", zap.Error(err)) return false } - if err := ctx.Err(); err != nil { - vm.delVIP() - return false - } if err := vm.operation.SendARP(ctx); err != nil { - if errors.Is(err, context.Canceled) { - vm.lg.Info("broadcast ARP canceled") - vm.delVIP() - return false - } vm.lg.Error("broadcast ARP failed", zap.Error(err)) // The VIP is already bound locally. Keep the later refresh loop as a // best-effort retry path for notifying upstream devices. @@ -181,25 +171,22 @@ func (vm *vipManager) delVIP() { } func (vm *vipManager) startARPRefresh(ctx context.Context) { - refreshInterval := vm.cfgGetter.GetConfig().HA.GARPRefreshInterval - if refreshInterval <= 0 { + refreshCount := vm.cfgGetter.GetConfig().HA.GARPRefreshCount + if refreshCount <= 0 { return } vm.refreshWG.RunWithRecover(func() { - ticker := time.NewTicker(refreshInterval) + ticker := time.NewTicker(garpRefreshInterval) defer ticker.Stop() // The first burst is sent synchronously by addVIP. The follow-up bursts // cover devices that probe or refresh neighbor state a little later than // the handover moment. - for i := 0; i < garpRefreshRounds; i++ { + for i := 0; i < refreshCount; i++ { select { case <-ctx.Done(): return case <-ticker.C: if err := vm.operation.SendARP(ctx); err != nil { - if errors.Is(err, context.Canceled) { - return - } vm.lg.Warn("refreshing GARP failed", zap.Error(err)) return } diff --git a/pkg/manager/vip/manager_test.go b/pkg/manager/vip/manager_test.go index 6816178bd..d42ac8a4a 100644 --- a/pkg/manager/vip/manager_test.go +++ b/pkg/manager/vip/manager_test.go @@ -191,7 +191,7 @@ func TestGARPRefresh(t *testing.T) { lg, _ := logger.CreateLoggerForTest(t) cfg := newMockConfig() cfg.HA.GARPBurstCount = 1 - cfg.HA.GARPRefreshInterval = 10 * time.Millisecond + cfg.HA.GARPRefreshCount = 1 operation := newMockNetworkOperation() vm := &vipManager{ lg: lg, @@ -201,8 +201,8 @@ func TestGARPRefresh(t *testing.T) { vm.OnElected() require.Eventually(t, func() bool { - return operation.sendArpCnt.Load() >= 2 - }, time.Second, 10*time.Millisecond) + return operation.sendArpCnt.Load() > 0 + }, 2*garpRefreshInterval, 10*time.Millisecond) vm.OnRetired() sendArpCnt := operation.sendArpCnt.Load() @@ -215,7 +215,7 @@ func TestGARPRefreshNotStartedWhenVIPNotBound(t *testing.T) { lg, _ := logger.CreateLoggerForTest(t) cfg := newMockConfig() cfg.HA.GARPBurstCount = 1 - cfg.HA.GARPRefreshInterval = 10 * time.Millisecond + cfg.HA.GARPRefreshCount = 1 operation := newMockNetworkOperation() operation.addIPErr.Store(true) vm := &vipManager{ @@ -234,7 +234,7 @@ func TestPreCloseCancelsInFlightARP(t *testing.T) { lg, _ := logger.CreateLoggerForTest(t) cfg := newMockConfig() cfg.HA.GARPBurstCount = 2 - cfg.HA.GARPBurstInterval = time.Second + cfg.HA.GARPRefreshCount = 1 operation := newMockNetworkOperation() operation.sendArpDelay.Store(int64(200 * time.Millisecond)) vm := &vipManager{ diff --git a/pkg/manager/vip/network.go b/pkg/manager/vip/network.go index 0ca49e975..e993b2a53 100644 --- a/pkg/manager/vip/network.go +++ b/pkg/manager/vip/network.go @@ -41,11 +41,10 @@ type networkOperation struct { garpBurstInterval time.Duration } -func NewNetworkOperation(addressStr, linkStr string, garpBurstCount int, garpBurstInterval time.Duration, lg *zap.Logger) (NetworkOperation, error) { +func NewNetworkOperation(addressStr, linkStr string, garpBurstCount int, lg *zap.Logger) (NetworkOperation, error) { no := &networkOperation{ - lg: lg, - garpBurstCount: garpBurstCount, - garpBurstInterval: garpBurstInterval, + lg: lg, + garpBurstCount: garpBurstCount, } if err := no.initAddr(addressStr, linkStr); err != nil { return nil, err @@ -114,17 +113,6 @@ func (no *networkOperation) SendARP(ctx context.Context) error { if err := no.sendARPOneShot(); err != nil { return err } - if no.garpBurstInterval > 0 && i+1 < no.garpBurstCount { - timer := time.NewTimer(no.garpBurstInterval) - select { - case <-ctx.Done(): - if !timer.Stop() { - <-timer.C - } - return ctx.Err() - case <-timer.C: - } - } } return nil } @@ -143,7 +131,8 @@ func (no *networkOperation) sendARPOneShot() error { // one as success. libErr := arping.GratuitousArpOverIfaceByName(no.address.IP, no.link.Attrs().Name) if libErr != nil { - no.lg.Warn("gratuitous arping via library failed", + // Output a debug log to avoid user anxiety. + no.lg.Debug("gratuitous arping via library failed", zap.Stringer("ip", no.address.IP), zap.String("iface", no.link.Attrs().Name), zap.Error(libErr)) diff --git a/pkg/manager/vip/network_test.go b/pkg/manager/vip/network_test.go index eaea57362..c1ca59e35 100644 --- a/pkg/manager/vip/network_test.go +++ b/pkg/manager/vip/network_test.go @@ -52,7 +52,7 @@ func TestAddDelIP(t *testing.T) { } for i, test := range tests { - operation, err := NewNetworkOperation(test.virtualIP, test.link, 1, 0, zap.NewNop()) + operation, err := NewNetworkOperation(test.virtualIP, test.link, 1, zap.NewNop()) if test.initErr != "" { require.Error(t, err, "case %d", i) require.Contains(t, err.Error(), test.initErr, "case %d", i) From f7eb8afe2d7e77e51cfb921547d7158c9db088bd Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 3 Apr 2026 19:51:59 +0800 Subject: [PATCH 12/12] fix lint --- pkg/manager/vip/network.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/manager/vip/network.go b/pkg/manager/vip/network.go index e993b2a53..00eca9beb 100644 --- a/pkg/manager/vip/network.go +++ b/pkg/manager/vip/network.go @@ -8,7 +8,6 @@ import ( "runtime" "strings" "syscall" - "time" "github.com/j-keck/arping" "github.com/pingcap/tiproxy/lib/util/errors" @@ -35,10 +34,11 @@ type networkOperation struct { // the network interface link netlink.Link lg *zap.Logger - // garpBurstCount and garpBurstInterval define one takeover burst. The + // garpBurstCount defines the number of GARP packets sent immediately after the + // new owner binds the VIP. A small burst makes takeover visible quickly even + // if the first packet is dropped by the host, bond driver, or upstream device. The // manager may replay the whole burst later during the refresh window. - garpBurstCount int - garpBurstInterval time.Duration + garpBurstCount int } func NewNetworkOperation(addressStr, linkStr string, garpBurstCount int, lg *zap.Logger) (NetworkOperation, error) {