diff --git a/AGENTS.md b/AGENTS.md index dd553c8dc..802662c2f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -90,8 +90,14 @@ Guidelines: - **Error handling** - Prefer wrapping errors with context so they are actionable (what operation failed, which component, key parameters). - Reuse common error helpers from `lib/util` / `pkg/util` when available. + - Add `errors.WithStack()` only at the lowest layer that first creates or captures the concrete error. Higher layers should usually add context with `errors.Wrap()` / `errors.Wrapf()` instead of attaching another stack trace. - Do not silently ignore errors; either handle them explicitly or return them to callers. +- **Comments for non-obvious logic** + - Add succinct comments around code paths whose correctness depends on subtle concurrency, networking, ownership, or failure-handling assumptions. + - The goal is to explain *why* the code is written that way so future contributors do not "simplify" it into a regression. + - Avoid repeating what the code already says; focus comments on invariants, trade-offs, and external behavior. + - **Logging** - Use the shared logging facilities (for example, logger manager) rather than creating ad-hoc loggers. - Include important identifiers (such as namespace, connection ID, cluster information) in logs when they help debugging. @@ -100,6 +106,9 @@ Guidelines: - **Concurrency and context** - Always pass `context.Context` through call chains where operations may block, allocate resources, or perform I/O. - Do not start goroutines without a clear lifetime; ensure there is a way to stop them (via context cancellation or explicit shutdown). + - Avoid bare `go func()` for managed background work. Prefer the repository `waitgroup` helpers (for example `pkg/util/waitgroup.WaitGroup.Run` or `RunWithRecover`) so goroutine lifecycle and shutdown are tracked consistently. + - If a background goroutine should recover from panic instead of crashing the whole process, use `waitgroup.RunWithRecover()` and handle recovery through the shared helper rather than ad-hoc `recover()` logic. + - If a wait or sleep may delay shutdown, owner handoff, or other cancellation-sensitive flows, do not use an unconditional `time.Sleep`. Prefer `timer + context` (or an equivalent cancellable wait) so the code can exit promptly. - Avoid sharing mutable state across goroutines without proper synchronization. - Be careful when exposing channels and mutexes in public APIs; clearly document ownership and who is responsible for closing channels. diff --git a/lib/config/proxy.go b/lib/config/proxy.go index cb0cc3209..02462cf08 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -108,8 +108,20 @@ type LogFile struct { } type HA struct { + // VirtualIP is the floating service address managed by TiProxy. + // It is expected to use a host route such as /32 so only the elected node + // answers for the VIP itself. VirtualIP string `yaml:"virtual-ip,omitempty" toml:"virtual-ip,omitempty" json:"virtual-ip,omitempty" reloadable:"false"` + // Interface is the local NIC that binds VirtualIP and sends GARP from. Interface string `yaml:"interface,omitempty" toml:"interface,omitempty" json:"interface,omitempty" reloadable:"false"` + // GARPBurstCount is the number of GARP packets sent immediately after the + // new owner binds the VIP. A small burst makes takeover visible quickly even + // if the first packet is dropped by the host, bond driver, or upstream device. + GARPBurstCount int `yaml:"garp-burst-count,omitempty" toml:"garp-burst-count,omitempty" json:"garp-burst-count,omitempty" reloadable:"false"` + // GARPRefreshCount controls the number of follow-up bursts after + // takeover. It is used to refresh stale neighbor caches for a bounded window + // after failover instead of emitting high-rate GARP forever. + GARPRefreshCount int `yaml:"garp-refresh-count,omitempty" toml:"garp-refresh-count,omitempty" json:"garp-refresh-count,omitempty" reloadable:"false"` } func DefaultKeepAlive() (frontend, backendHealthy, backendUnhealthy KeepAlive) { @@ -150,6 +162,13 @@ func NewConfig() *Config { cfg.Balance = DefaultBalance() + // Match the common VRRP-style default of sending a small burst immediately + // after takeover, then keep refreshing for a short period. The refresh helps + // upstream devices overwrite stale VIP->MAC entries after an abnormal owner + // handover. + cfg.HA.GARPBurstCount = 5 + cfg.HA.GARPRefreshCount = 30 + cfg.EnableTrafficReplay = true return &cfg @@ -192,6 +211,15 @@ func (cfg *Config) Check() error { if err := cfg.Balance.Check(); err != nil { return err } + if cfg.HA.GARPBurstCount < 0 { + return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-burst-count must be greater than or equal to 0") + } + if cfg.HA.GARPBurstCount == 0 { + cfg.HA.GARPBurstCount = 1 + } + if cfg.HA.GARPRefreshCount < 0 { + return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-refresh-count must be greater than or equal to 0") + } return nil } diff --git a/lib/config/proxy_test.go b/lib/config/proxy_test.go index da9f3ecaa..ef032359b 100644 --- a/lib/config/proxy_test.go +++ b/lib/config/proxy_test.go @@ -85,6 +85,12 @@ var testProxyConfig = Config{ }, RequireBackendTLS: true, }, + HA: HA{ + VirtualIP: "10.10.10.10/32", + Interface: "eth0", + GARPBurstCount: 5, + GARPRefreshCount: 30, + }, Metering: mconfig.MeteringConfig{ Type: storage.ProviderTypeAzure, Bucket: "metering-container", @@ -188,6 +194,18 @@ func TestProxyCheck(t *testing.T) { }, err: ErrInvalidConfigValue, }, + { + pre: func(t *testing.T, c *Config) { + c.HA.GARPBurstCount = -1 + }, + err: ErrInvalidConfigValue, + }, + { + pre: func(t *testing.T, c *Config) { + c.HA.GARPRefreshCount = -1 + }, + err: ErrInvalidConfigValue, + }, } for _, tc := range testcases { cfg := testProxyConfig diff --git a/pkg/manager/elect/election.go b/pkg/manager/elect/election.go index aed8c0247..3ff3fd9c2 100644 --- a/pkg/manager/elect/election.go +++ b/pkg/manager/elect/election.go @@ -38,27 +38,23 @@ type Election interface { ID() string // GetOwnerID gets the owner ID. GetOwnerID(ctx context.Context) (string, error) - // Close resigns and but doesn't retire. + // Close stops campaigning and retires if this member currently owns the key. Close() } type ElectionConfig struct { - Timeout time.Duration - RetryIntvl time.Duration - QueryIntvl time.Duration - WaitBeforeRetire time.Duration - RetryCnt uint64 - SessionTTL int + Timeout time.Duration + RetryIntvl time.Duration + RetryCnt uint64 + SessionTTL int } func DefaultElectionConfig(sessionTTL int) ElectionConfig { return ElectionConfig{ - Timeout: 2 * time.Second, - RetryIntvl: 500 * time.Millisecond, - QueryIntvl: 1 * time.Second, - WaitBeforeRetire: 3 * time.Second, - RetryCnt: 3, - SessionTTL: sessionTTL, + Timeout: 2 * time.Second, + RetryIntvl: 500 * time.Millisecond, + RetryCnt: 3, + SessionTTL: sessionTTL, } } @@ -123,6 +119,9 @@ func (m *election) campaignLoop(ctx context.Context) { m.lg.Debug("begin campaign") select { case <-session.Done(): + // Keep the local owner state until we observe a stronger signal that + // another member has taken over. Retiring here would trade split-brain + // risk for a no-owner window during etcd faults. m.lg.Info("etcd session is done, creates a new one") leaseID := session.Lease() session, err = concurrency.NewSession(m.etcdCli, concurrency.WithTTL(m.cfg.SessionTTL), concurrency.WithContext(ctx)) @@ -149,26 +148,22 @@ func (m *election) campaignLoop(ctx context.Context) { continue } - var wg waitgroup.WaitGroup - childCtx, cancel := context.WithCancel(ctx) - if m.isOwner { - // Check if another member becomes the new owner during campaign. - wg.RunWithRecover(func() { - m.waitRetire(childCtx) - }, nil, m.lg) - } - elec := concurrency.NewElection(session, m.key) err = elec.Campaign(ctx, m.id) - cancel() - wg.Wait() if err != nil { + // Campaign may fail because etcd is temporarily unavailable while the + // current owner lease is still valid. Retiring immediately here would + // turn a transient etcd error into a local no-owner window. m.lg.Info("failed to campaign", zap.Error(errors.WithStack(err))) continue } kv, err := m.getOwnerInfo(ctx) if err != nil { + // Failing to read the owner key does not necessarily mean this member + // has lost its lease. Keep the current owner state until we see a + // stronger signal such as session.Done, ErrLeaseNotFound, or an owner + // mismatch. m.lg.Warn("failed to get owner info", zap.Error(err)) continue } @@ -206,30 +201,6 @@ func (m *election) onRetired() { metrics.OwnerGauge.MetricVec.DeletePartialMatch(map[string]string{metrics.LblType: m.trimedKey}) } -// waitRetire retires after another member becomes the owner so that there will always be an owner. -// It's allowed if multiple members act as the owner for some time but it's not allowed if no member acts as the owner. -// E.g. at least one member needs to bind the VIP even if the etcd server leader is down. -func (m *election) waitRetire(ctx context.Context) { - ticker := time.NewTicker(m.cfg.QueryIntvl) - defer ticker.Stop() - for ctx.Err() == nil { - select { - case <-ticker.C: - id, err := m.GetOwnerID(ctx) - if err != nil { - continue - } - // Another member becomes the owner, retire. - if id != m.id { - m.onRetired() - return - } - case <-ctx.Done(): - return - } - } -} - // revokeLease revokes the session lease so that other members can compaign immediately. func (m *election) revokeLease(leaseID clientv3.LeaseID) { // If revoke takes longer than the ttl, lease is expired anyway. @@ -270,17 +241,20 @@ func (m *election) watchOwner(ctx context.Context, session *concurrency.Session, select { case resp, ok := <-watchCh: if !ok { - m.lg.Info("watcher is closed, no owner") + m.lg.Info("watcher is closed, retry watching owner") return } if resp.Canceled { - m.lg.Info("watch canceled, no owner") + m.lg.Info("watch canceled, retry watching owner") return } for _, ev := range resp.Events { if ev.Type == mvccpb.DELETE { - m.lg.Info("watch failed, owner is deleted") + // The owner key may disappear before another member campaigns. + // Keep the local owner state and let the next campaign round decide + // whether a new owner has really taken over. + m.lg.Info("watch found owner deleted, retry campaigning") return } } diff --git a/pkg/manager/elect/election_test.go b/pkg/manager/elect/election_test.go index 456804596..35726402b 100644 --- a/pkg/manager/elect/election_test.go +++ b/pkg/manager/elect/election_test.go @@ -75,13 +75,13 @@ func TestEtcdServerDown(t *testing.T) { // server is down addr := ts.shutdownServer() + // Losing etcd temporarily should not force the current owner to retire + // locally. Otherwise VIP owner election would turn a control-plane fault + // into a no-owner data-plane outage. + ts.expectNoEvent("1", 1500*time.Millisecond) _, err := elec1.GetOwnerID(context.Background()) require.Error(t, err) - // the owner should not retire before the server is up again - ts.expectNoEvent("1") ts.startServer(addr) - // the owner should not retire because there's no other member - ts.expectNoEvent("1") ownerID := ts.getOwnerID() require.Equal(t, "1", ownerID) @@ -91,18 +91,10 @@ func TestEtcdServerDown(t *testing.T) { require.Error(t, err) elec2 := ts.newElection("2") elec2.Start(context.Background()) - // the owner should not retire before the server is up again - ts.expectNoEvent("1") // start the server again and the elections recover ts.startServer(addr) - ownerID = ts.getOwnerID() - if ownerID == "1" { - ts.expectNoEvent("1") - } else { - ts.expectEvent("1", eventTypeRetired) - ts.expectEvent(ownerID, eventTypeElected) - } + require.NotEmpty(t, ts.getOwnerID()) } func TestOwnerHang(t *testing.T) { diff --git a/pkg/manager/elect/mock_test.go b/pkg/manager/elect/mock_test.go index ad84c7178..b677490c2 100644 --- a/pkg/manager/elect/mock_test.go +++ b/pkg/manager/elect/mock_test.go @@ -5,6 +5,7 @@ package elect import ( "context" + "sync" "testing" "time" @@ -25,14 +26,19 @@ const ( var _ Member = (*mockMember)(nil) type mockMember struct { - ch chan int + ch chan int + hangElectedMu sync.Mutex + hangElectedCh chan struct{} } func newMockMember() *mockMember { - return &mockMember{ch: make(chan int, 2)} + return &mockMember{ch: make(chan int, 32)} } func (mo *mockMember) OnElected() { + if ch := mo.getHangElectedCh(); ch != nil { + <-ch + } mo.ch <- eventTypeElected } @@ -43,7 +49,7 @@ func (mo *mockMember) OnRetired() { func (mo *mockMember) expectEvent(t *testing.T, expected ...int) { for _, exp := range expected { select { - case <-time.After(3 * time.Second): + case <-time.After(5 * time.Second): t.Fatal("timeout") case event := <-mo.ch: require.Equal(t, exp, event) @@ -51,36 +57,36 @@ func (mo *mockMember) expectEvent(t *testing.T, expected ...int) { } } -func (mo *mockMember) expectNoEvent(t *testing.T) { +func (mo *mockMember) expectNoEvent(t *testing.T, timeout time.Duration) { select { - case <-time.After(100 * time.Millisecond): - return case event := <-mo.ch: - require.Fail(t, "unexpected event", event) + t.Fatalf("unexpected event %d", event) + case <-time.After(timeout): } } func (mo *mockMember) hang(hang bool) { - contn := true - for contn { - if hang { - // fill the channel - select { - case mo.ch <- eventTypeElected: - default: - contn = false - } - } else { - // clear the channel - select { - case <-mo.ch: - default: - contn = false - } + mo.hangElectedMu.Lock() + defer mo.hangElectedMu.Unlock() + + if hang { + if mo.hangElectedCh == nil { + mo.hangElectedCh = make(chan struct{}) } + return + } + if mo.hangElectedCh != nil { + close(mo.hangElectedCh) + mo.hangElectedCh = nil } } +func (mo *mockMember) getHangElectedCh() chan struct{} { + mo.hangElectedMu.Lock() + defer mo.hangElectedMu.Unlock() + return mo.hangElectedCh +} + type etcdTestSuite struct { elecCfg ElectionConfig key string @@ -156,9 +162,9 @@ func (ts *etcdTestSuite) expectEvent(id string, event ...int) { elec.member.(*mockMember).expectEvent(ts.t, event...) } -func (ts *etcdTestSuite) expectNoEvent(id string) { +func (ts *etcdTestSuite) expectNoEvent(id string, timeout time.Duration) { elec := ts.getElection(id) - elec.member.(*mockMember).expectNoEvent(ts.t) + elec.member.(*mockMember).expectNoEvent(ts.t, timeout) } func (ts *etcdTestSuite) hang(id string, hang bool) { @@ -196,11 +202,9 @@ func (ts *etcdTestSuite) shutdownServer() string { func electionConfigForTest(ttl int) ElectionConfig { return ElectionConfig{ - SessionTTL: ttl, - Timeout: 100 * time.Millisecond, - RetryIntvl: 10 * time.Millisecond, - QueryIntvl: 10 * time.Millisecond, - WaitBeforeRetire: 3 * time.Second, - RetryCnt: 2, + SessionTTL: ttl, + Timeout: 100 * time.Millisecond, + RetryIntvl: 10 * time.Millisecond, + RetryCnt: 2, } } diff --git a/pkg/manager/vip/manager.go b/pkg/manager/vip/manager.go index 8fe64fa6f..0264d659b 100644 --- a/pkg/manager/vip/manager.go +++ b/pkg/manager/vip/manager.go @@ -7,10 +7,12 @@ import ( "context" "fmt" "net" - "sync/atomic" + "sync" + "time" "github.com/pingcap/tiproxy/lib/config" "github.com/pingcap/tiproxy/pkg/manager/elect" + "github.com/pingcap/tiproxy/pkg/util/waitgroup" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -23,6 +25,10 @@ const ( // The etcd client keeps alive every TTL/3 seconds. // The TTL determines the failover time so it should be short. sessionTTL = 3 + // Refresh GARP for a bounded window after takeover so upstream devices have + // repeated chances to overwrite stale VIP->MAC cache entries after abnormal + // failover, while still avoiding permanent ARP noise. + garpRefreshInterval = 1 * time.Second ) type VIPManager interface { @@ -34,11 +40,18 @@ type VIPManager interface { var _ VIPManager = (*vipManager)(nil) type vipManager struct { - operation NetworkOperation - cfgGetter config.ConfigGetter - election elect.Election - lg *zap.Logger - delOnRetire atomic.Bool + mu sync.Mutex + // closing blocks late OnElected callbacks during controlled shutdown. + // A VIP must not be present on two nodes at the same time because upstream + // L3 devices cache only one VIP->MAC entry; if old and new owners both answer + // ARP, whichever reply is learned last may blackhole cross-subnet traffic. + closing bool + arpCancel context.CancelFunc + refreshWG waitgroup.WaitGroup + operation NetworkOperation + cfgGetter config.ConfigGetter + election elect.Election + lg *zap.Logger } func NewVIPManager(lg *zap.Logger, cfgGetter config.ConfigGetter) (*vipManager, error) { @@ -54,7 +67,7 @@ func NewVIPManager(lg *zap.Logger, cfgGetter config.ConfigGetter) (*vipManager, vm.lg.Warn("Both address and link must be specified to enable VIP. VIP is disabled") return nil, nil } - operation, err := NewNetworkOperation(cfg.HA.VirtualIP, cfg.HA.Interface, lg) + operation, err := NewNetworkOperation(cfg.HA.VirtualIP, cfg.HA.Interface, cfg.HA.GARPBurstCount, lg) if err != nil { vm.lg.Error("init network operation failed", zap.Error(err)) return nil, err @@ -64,9 +77,12 @@ func NewVIPManager(lg *zap.Logger, cfgGetter config.ConfigGetter) (*vipManager, } func (vm *vipManager) Start(ctx context.Context, etcdCli *clientv3.Client) error { + vm.mu.Lock() + defer vm.mu.Unlock() + vm.closing = false + // This node may have bound the VIP before last failure. vm.delVIP() - vm.delOnRetire.Store(true) cfg := vm.cfgGetter.GetConfig() ip, port, _, err := cfg.GetIPPort() @@ -83,34 +99,58 @@ func (vm *vipManager) Start(ctx context.Context, etcdCli *clientv3.Client) error } func (vm *vipManager) OnElected() { - vm.addVIP() + vm.mu.Lock() + // Election.Close may race with an already in-flight OnElected callback. + // Once controlled shutdown starts, never bind the VIP again locally. + if vm.closing { + vm.mu.Unlock() + vm.lg.Info("skip adding VIP because the manager is closing") + return + } + vm.stopARPRefresh() + ctx, cancel := context.WithCancel(context.Background()) + vm.arpCancel = cancel + vm.mu.Unlock() + + if vm.addVIP(ctx) { + vm.mu.Lock() + if !vm.closing { + vm.startARPRefresh(ctx) + } + vm.mu.Unlock() + } } func (vm *vipManager) OnRetired() { - if vm.delOnRetire.Load() { - vm.delVIP() - } + vm.mu.Lock() + defer vm.mu.Unlock() + + vm.stopARPRefresh() + vm.delVIP() } -func (vm *vipManager) addVIP() { +func (vm *vipManager) addVIP(ctx context.Context) bool { hasIP, err := vm.operation.HasIP() if err != nil { vm.lg.Error("checking addresses failed", zap.Error(err)) - return + return false } if hasIP { vm.lg.Debug("already has VIP, do nothing") - return + return true } if err := vm.operation.AddIP(); err != nil { vm.lg.Error("adding address failed", zap.Error(err)) - return + return false } - if err := vm.operation.SendARP(); err != nil { + if err := vm.operation.SendARP(ctx); err != nil { vm.lg.Error("broadcast ARP failed", zap.Error(err)) - return + // The VIP is already bound locally. Keep the later refresh loop as a + // best-effort retry path for notifying upstream devices. + return true } vm.lg.Info("adding VIP success") + return true } func (vm *vipManager) delVIP() { @@ -130,20 +170,66 @@ func (vm *vipManager) delVIP() { vm.lg.Info("deleting VIP success") } -// PreClose resigns the owner but doesn't delete the VIP. -// It makes use of the graceful-wait time to wait for the new owner to shorten the failover time. +func (vm *vipManager) startARPRefresh(ctx context.Context) { + refreshCount := vm.cfgGetter.GetConfig().HA.GARPRefreshCount + if refreshCount <= 0 { + return + } + vm.refreshWG.RunWithRecover(func() { + ticker := time.NewTicker(garpRefreshInterval) + defer ticker.Stop() + // The first burst is sent synchronously by addVIP. The follow-up bursts + // cover devices that probe or refresh neighbor state a little later than + // the handover moment. + for i := 0; i < refreshCount; i++ { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := vm.operation.SendARP(ctx); err != nil { + vm.lg.Warn("refreshing GARP failed", zap.Error(err)) + return + } + } + } + }, nil, vm.lg) +} + +func (vm *vipManager) stopARPRefresh() { + cancel := vm.arpCancel + vm.arpCancel = nil + if cancel != nil { + cancel() + } + vm.refreshWG.Wait() +} + +// PreClose deletes the VIP before resigning the owner so that controlled +// shutdowns do not expose the VIP on two nodes at the same time. func (vm *vipManager) PreClose() { - vm.delOnRetire.Store(false) - if vm.election != nil { - vm.election.Close() + election := vm.prepareForClose() + if election != nil { + election.Close() } } -// Close resigns the owner and deletes the VIP if it was the owner. -// The new owner may not be elected but we won't wait anymore. +// Close resigns the owner and makes sure the VIP is removed locally. func (vm *vipManager) Close() { - if vm.election != nil { - vm.election.Close() + election := vm.prepareForClose() + if election != nil { + election.Close() } +} + +func (vm *vipManager) prepareForClose() elect.Election { + vm.mu.Lock() + defer vm.mu.Unlock() + + // Drop the VIP before resigning. Letting the new owner add the VIP first is + // unsafe on real networks because upstream devices remember only one MAC for + // the VIP and may keep forwarding to the old node long after the overlap. + vm.closing = true + vm.stopARPRefresh() vm.delVIP() + return vm.election } diff --git a/pkg/manager/vip/manager_test.go b/pkg/manager/vip/manager_test.go index 0774e00f0..d42ac8a4a 100644 --- a/pkg/manager/vip/manager_test.go +++ b/pkg/manager/vip/manager_test.go @@ -126,7 +126,6 @@ func TestNetworkOperation(t *testing.T) { cfgGetter: newMockConfigGetter(newMockConfig()), operation: operation, } - vm.delOnRetire.Store(true) vm.election = newMockElection(ch, vm) childCtx, cancel := context.WithCancel(context.Background()) vm.election.Start(childCtx) @@ -147,6 +146,127 @@ func TestNetworkOperation(t *testing.T) { vm.Close() } +func TestPreCloseDeletesVIPBeforeCloseElection(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + operation := newMockNetworkOperation() + operation.hasIP.Store(true) + + vm := &vipManager{ + lg: lg, + cfgGetter: newMockConfigGetter(newMockConfig()), + operation: operation, + } + vm.election = &closeHookElection{ + closeFn: func() { + require.False(t, operation.hasIP.Load()) + require.EqualValues(t, 1, operation.delIPCnt.Load()) + }, + } + + vm.PreClose() +} + +func TestPreClosePreventsReacquireDuringElectionClose(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + operation := newMockNetworkOperation() + operation.hasIP.Store(true) + + vm := &vipManager{ + lg: lg, + cfgGetter: newMockConfigGetter(newMockConfig()), + operation: operation, + } + vm.election = &closeHookElection{ + closeFn: func() { + vm.OnElected() + }, + } + + vm.PreClose() + require.False(t, operation.hasIP.Load()) + require.EqualValues(t, 0, operation.addIPCnt.Load()) +} + +func TestGARPRefresh(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + cfg := newMockConfig() + cfg.HA.GARPBurstCount = 1 + cfg.HA.GARPRefreshCount = 1 + operation := newMockNetworkOperation() + vm := &vipManager{ + lg: lg, + cfgGetter: newMockConfigGetter(cfg), + operation: operation, + } + + vm.OnElected() + require.Eventually(t, func() bool { + return operation.sendArpCnt.Load() > 0 + }, 2*garpRefreshInterval, 10*time.Millisecond) + + vm.OnRetired() + sendArpCnt := operation.sendArpCnt.Load() + time.Sleep(50 * time.Millisecond) + require.Equal(t, sendArpCnt, operation.sendArpCnt.Load()) + require.False(t, operation.hasIP.Load()) +} + +func TestGARPRefreshNotStartedWhenVIPNotBound(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + cfg := newMockConfig() + cfg.HA.GARPBurstCount = 1 + cfg.HA.GARPRefreshCount = 1 + operation := newMockNetworkOperation() + operation.addIPErr.Store(true) + vm := &vipManager{ + lg: lg, + cfgGetter: newMockConfigGetter(cfg), + operation: operation, + } + + vm.OnElected() + time.Sleep(50 * time.Millisecond) + require.EqualValues(t, 0, operation.sendArpCnt.Load()) + require.False(t, operation.hasIP.Load()) +} + +func TestPreCloseCancelsInFlightARP(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + cfg := newMockConfig() + cfg.HA.GARPBurstCount = 2 + cfg.HA.GARPRefreshCount = 1 + operation := newMockNetworkOperation() + operation.sendArpDelay.Store(int64(200 * time.Millisecond)) + vm := &vipManager{ + lg: lg, + cfgGetter: newMockConfigGetter(cfg), + operation: operation, + } + + done := make(chan struct{}) + go func() { + vm.OnElected() + close(done) + }() + + require.Eventually(t, func() bool { + return operation.addIPCnt.Load() == 1 + }, time.Second, 10*time.Millisecond) + + start := time.Now() + vm.PreClose() + require.Less(t, time.Since(start), 500*time.Millisecond) + require.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + require.False(t, operation.hasIP.Load()) +} + func TestStartAndClose(t *testing.T) { lg, _ := logger.CreateLoggerForTest(t) vm, err := NewVIPManager(lg, newMockConfigGetter(newMockConfig())) @@ -216,3 +336,23 @@ func TestMultiVIP(t *testing.T) { vm1.Close() vm2.Close() } + +type closeHookElection struct { + closeFn func() +} + +func (e *closeHookElection) Start(context.Context) {} + +func (e *closeHookElection) ID() string { + return "" +} + +func (e *closeHookElection) GetOwnerID(context.Context) (string, error) { + return "", nil +} + +func (e *closeHookElection) Close() { + if e.closeFn != nil { + e.closeFn() + } +} diff --git a/pkg/manager/vip/mock_test.go b/pkg/manager/vip/mock_test.go index 582a2fe2a..7fd4be214 100644 --- a/pkg/manager/vip/mock_test.go +++ b/pkg/manager/vip/mock_test.go @@ -5,7 +5,9 @@ package vip import ( "context" + "sync" "sync/atomic" + "time" "github.com/pingcap/tiproxy/lib/config" "github.com/pingcap/tiproxy/lib/util/errors" @@ -37,15 +39,18 @@ const ( ) type mockElection struct { - wg waitgroup.WaitGroup - ch chan int - member elect.Member + wg waitgroup.WaitGroup + ch chan int + member elect.Member + closeOnce sync.Once + closeCh chan struct{} } func newMockElection(ch chan int, member elect.Member) *mockElection { return &mockElection{ - ch: ch, - member: member, + ch: ch, + member: member, + closeCh: make(chan struct{}), } } @@ -63,6 +68,8 @@ func (me *mockElection) Start(ctx context.Context) { select { case <-ctx.Done(): return + case <-me.closeCh: + return case event := <-me.ch: switch event { case eventTypeElected: @@ -76,17 +83,24 @@ func (me *mockElection) Start(ctx context.Context) { } func (me *mockElection) Close() { + me.closeOnce.Do(func() { + close(me.closeCh) + }) me.wg.Wait() } var _ NetworkOperation = (*mockNetworkOperation)(nil) type mockNetworkOperation struct { - hasIP atomic.Bool - hasIPErr atomic.Bool - addIPErr atomic.Bool - delIPErr atomic.Bool - sendArpErr atomic.Bool + hasIP atomic.Bool + hasIPErr atomic.Bool + addIPErr atomic.Bool + delIPErr atomic.Bool + sendArpErr atomic.Bool + sendArpCnt atomic.Int32 + addIPCnt atomic.Int32 + delIPCnt atomic.Int32 + sendArpDelay atomic.Int64 } func newMockNetworkOperation() *mockNetworkOperation { @@ -105,6 +119,8 @@ func (mno *mockNetworkOperation) AddIP() error { if mno.addIPErr.Load() { return errors.New("mock AddIP error") } + mno.addIPCnt.Add(1) + mno.hasIP.Store(true) return nil } @@ -112,13 +128,31 @@ func (mno *mockNetworkOperation) DeleteIP() error { if mno.delIPErr.Load() { return errors.New("mock DeleteIP error") } + mno.delIPCnt.Add(1) + mno.hasIP.Store(false) return nil } -func (mno *mockNetworkOperation) SendARP() error { +func (mno *mockNetworkOperation) SendARP(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } if mno.sendArpErr.Load() { return errors.New("mock SendARP error") } + delay := time.Duration(mno.sendArpDelay.Load()) + if delay > 0 { + timer := time.NewTimer(delay) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return ctx.Err() + case <-timer.C: + } + } + mno.sendArpCnt.Add(1) return nil } @@ -130,6 +164,10 @@ func newMockConfig() *config.Config { return &config.Config{ Proxy: config.ProxyServer{Addr: "0.0.0.0:6000"}, API: config.API{Addr: "0.0.0.0:3080"}, - HA: config.HA{VirtualIP: "127.0.0.2/24", Interface: "lo"}, + HA: config.HA{ + VirtualIP: "127.0.0.2/24", + Interface: "lo", + GARPBurstCount: 5, + }, } } diff --git a/pkg/manager/vip/network.go b/pkg/manager/vip/network.go index 6b162f475..00eca9beb 100644 --- a/pkg/manager/vip/network.go +++ b/pkg/manager/vip/network.go @@ -4,6 +4,7 @@ package vip import ( + "context" "runtime" "strings" "syscall" @@ -21,7 +22,7 @@ type NetworkOperation interface { HasIP() (bool, error) AddIP() error DeleteIP() error - SendARP() error + SendARP(context.Context) error Addr() string } @@ -33,11 +34,17 @@ type networkOperation struct { // the network interface link netlink.Link lg *zap.Logger + // garpBurstCount defines the number of GARP packets sent immediately after the + // new owner binds the VIP. A small burst makes takeover visible quickly even + // if the first packet is dropped by the host, bond driver, or upstream device. The + // manager may replay the whole burst later during the refresh window. + garpBurstCount int } -func NewNetworkOperation(addressStr, linkStr string, lg *zap.Logger) (NetworkOperation, error) { +func NewNetworkOperation(addressStr, linkStr string, garpBurstCount int, lg *zap.Logger) (NetworkOperation, error) { no := &networkOperation{ - lg: lg, + lg: lg, + garpBurstCount: garpBurstCount, } if err := no.initAddr(addressStr, linkStr); err != nil { return nil, err @@ -92,13 +99,22 @@ func (no *networkOperation) DeleteIP() error { return errors.WithStack(err) } -func (no *networkOperation) SendARP() error { - if err := arping.GratuitousArpOverIfaceByName(no.address.IP, no.link.Attrs().Name); err != nil { - no.lg.Warn("gratuitous arping failed", zap.Stringer("ip", no.address.IP), zap.String("iface", no.link.Attrs().Name), zap.Error(err)) +func (no *networkOperation) SendARP(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() } - // GratuitousArpOverIfaceByName may not work properly even if it returns nil, so always run a command. - err := no.execCmd("sudo", "arping", "-c", "3", "-U", "-I", no.link.Attrs().Name, no.address.IP.String()) - return errors.WithStack(err) + if no.garpBurstCount <= 0 { + return nil + } + for i := 0; i < no.garpBurstCount; i++ { + if err := ctx.Err(); err != nil { + return err + } + if err := no.sendARPOneShot(); err != nil { + return err + } + } + return nil } func (no *networkOperation) Addr() string { @@ -108,6 +124,30 @@ func (no *networkOperation) Addr() string { return no.address.IP.String() } +func (no *networkOperation) sendARPOneShot() error { + // Keep both sending paths for a single logical GARP attempt: the library + // path avoids depending on the external "arping" command, while the command + // path has proven more reliable on some customer environments. Treat either + // one as success. + libErr := arping.GratuitousArpOverIfaceByName(no.address.IP, no.link.Attrs().Name) + if libErr != nil { + // Output a debug log to avoid user anxiety. + no.lg.Debug("gratuitous arping via library failed", + zap.Stringer("ip", no.address.IP), + zap.String("iface", no.link.Attrs().Name), + zap.Error(libErr)) + } + // Always use "arping -c 1" here and let SendARP control the outer burst + // count and interval. Using "arping -c 5" would hide pacing inside the + // command, making takeover and refresh timing less predictable from TiProxy + // and harder to reason about in tests and production troubleshooting. + cmdErr := no.execCmd("sudo", "arping", "-c", "1", "-U", "-I", no.link.Attrs().Name, no.address.IP.String()) + if libErr == nil || cmdErr == nil { + return nil + } + return errors.Wrap(errors.WithStack(cmdErr), errors.WithStack(libErr)) +} + func (no *networkOperation) execCmd(args ...string) error { output, err := cmd.ExecCmd(args[0], args[1:]...) no.lg.Info("executed cmd", zap.String("cmd", strings.Join(args, " ")), zap.String("output", output), zap.Error(err)) diff --git a/pkg/manager/vip/network_test.go b/pkg/manager/vip/network_test.go index 2b00420b4..c1ca59e35 100644 --- a/pkg/manager/vip/network_test.go +++ b/pkg/manager/vip/network_test.go @@ -6,6 +6,7 @@ package vip import ( + "context" "runtime" "strings" "testing" @@ -51,7 +52,7 @@ func TestAddDelIP(t *testing.T) { } for i, test := range tests { - operation, err := NewNetworkOperation(test.virtualIP, test.link, zap.NewNop()) + operation, err := NewNetworkOperation(test.virtualIP, test.link, 1, zap.NewNop()) if test.initErr != "" { require.Error(t, err, "case %d", i) require.Contains(t, err.Error(), test.initErr, "case %d", i) @@ -68,7 +69,7 @@ func TestAddDelIP(t *testing.T) { continue } - err = operation.SendARP() + err = operation.SendARP(context.Background()) if err != nil { require.True(t, isOtherErr(err)) }