Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,14 @@ Guidelines:
- **Error handling**
- Prefer wrapping errors with context so they are actionable (what operation failed, which component, key parameters).
- Reuse common error helpers from `lib/util` / `pkg/util` when available.
- Add `errors.WithStack()` only at the lowest layer that first creates or captures the concrete error. Higher layers should usually add context with `errors.Wrap()` / `errors.Wrapf()` instead of attaching another stack trace.
- Do not silently ignore errors; either handle them explicitly or return them to callers.

- **Comments for non-obvious logic**
- Add succinct comments around code paths whose correctness depends on subtle concurrency, networking, ownership, or failure-handling assumptions.
- The goal is to explain *why* the code is written that way so future contributors do not "simplify" it into a regression.
- Avoid repeating what the code already says; focus comments on invariants, trade-offs, and external behavior.

- **Logging**
- Use the shared logging facilities (for example, logger manager) rather than creating ad-hoc loggers.
- Include important identifiers (such as namespace, connection ID, cluster information) in logs when they help debugging.
Expand All @@ -100,6 +106,9 @@ Guidelines:
- **Concurrency and context**
- Always pass `context.Context` through call chains where operations may block, allocate resources, or perform I/O.
- Do not start goroutines without a clear lifetime; ensure there is a way to stop them (via context cancellation or explicit shutdown).
- Avoid bare `go func()` for managed background work. Prefer the repository `waitgroup` helpers (for example `pkg/util/waitgroup.WaitGroup.Run` or `RunWithRecover`) so goroutine lifecycle and shutdown are tracked consistently.
- If a background goroutine should recover from panic instead of crashing the whole process, use `waitgroup.RunWithRecover()` and handle recovery through the shared helper rather than ad-hoc `recover()` logic.
- If a wait or sleep may delay shutdown, owner handoff, or other cancellation-sensitive flows, do not use an unconditional `time.Sleep`. Prefer `timer + context` (or an equivalent cancellable wait) so the code can exit promptly.
- Avoid sharing mutable state across goroutines without proper synchronization.
- Be careful when exposing channels and mutexes in public APIs; clearly document ownership and who is responsible for closing channels.

Expand Down
28 changes: 28 additions & 0 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,20 @@ type LogFile struct {
}

type HA struct {
// VirtualIP is the floating service address managed by TiProxy.
// It is expected to use a host route such as /32 so only the elected node
// answers for the VIP itself.
VirtualIP string `yaml:"virtual-ip,omitempty" toml:"virtual-ip,omitempty" json:"virtual-ip,omitempty" reloadable:"false"`
// Interface is the local NIC that binds VirtualIP and sends GARP from.
Interface string `yaml:"interface,omitempty" toml:"interface,omitempty" json:"interface,omitempty" reloadable:"false"`
// GARPBurstCount is the number of GARP packets sent immediately after the
// new owner binds the VIP. A small burst makes takeover visible quickly even
// if the first packet is dropped by the host, bond driver, or upstream device.
GARPBurstCount int `yaml:"garp-burst-count,omitempty" toml:"garp-burst-count,omitempty" json:"garp-burst-count,omitempty" reloadable:"false"`
// GARPRefreshCount controls the number of follow-up bursts after
// takeover. It is used to refresh stale neighbor caches for a bounded window
// after failover instead of emitting high-rate GARP forever.
GARPRefreshCount int `yaml:"garp-refresh-count,omitempty" toml:"garp-refresh-count,omitempty" json:"garp-refresh-count,omitempty" reloadable:"false"`
}

func DefaultKeepAlive() (frontend, backendHealthy, backendUnhealthy KeepAlive) {
Expand Down Expand Up @@ -150,6 +162,13 @@ func NewConfig() *Config {

cfg.Balance = DefaultBalance()

// Match the common VRRP-style default of sending a small burst immediately
// after takeover, then keep refreshing for a short period. The refresh helps
// upstream devices overwrite stale VIP->MAC entries after an abnormal owner
// handover.
cfg.HA.GARPBurstCount = 5
cfg.HA.GARPRefreshCount = 30

cfg.EnableTrafficReplay = true

return &cfg
Expand Down Expand Up @@ -192,6 +211,15 @@ func (cfg *Config) Check() error {
if err := cfg.Balance.Check(); err != nil {
return err
}
if cfg.HA.GARPBurstCount < 0 {
return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-burst-count must be greater than or equal to 0")
}
if cfg.HA.GARPBurstCount == 0 {
cfg.HA.GARPBurstCount = 1
}
if cfg.HA.GARPRefreshCount < 0 {
return errors.Wrapf(ErrInvalidConfigValue, "ha.garp-refresh-count must be greater than or equal to 0")
}

return nil
}
Expand Down
18 changes: 18 additions & 0 deletions lib/config/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ var testProxyConfig = Config{
},
RequireBackendTLS: true,
},
HA: HA{
VirtualIP: "10.10.10.10/32",
Interface: "eth0",
GARPBurstCount: 5,
GARPRefreshCount: 30,
},
Metering: mconfig.MeteringConfig{
Type: storage.ProviderTypeAzure,
Bucket: "metering-container",
Expand Down Expand Up @@ -188,6 +194,18 @@ func TestProxyCheck(t *testing.T) {
},
err: ErrInvalidConfigValue,
},
{
pre: func(t *testing.T, c *Config) {
c.HA.GARPBurstCount = -1
},
err: ErrInvalidConfigValue,
},
{
pre: func(t *testing.T, c *Config) {
c.HA.GARPRefreshCount = -1
},
err: ErrInvalidConfigValue,
},
}
for _, tc := range testcases {
cfg := testProxyConfig
Expand Down
76 changes: 25 additions & 51 deletions pkg/manager/elect/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,23 @@ type Election interface {
ID() string
// GetOwnerID gets the owner ID.
GetOwnerID(ctx context.Context) (string, error)
// Close resigns and but doesn't retire.
// Close stops campaigning and retires if this member currently owns the key.
Close()
}

type ElectionConfig struct {
Timeout time.Duration
RetryIntvl time.Duration
QueryIntvl time.Duration
WaitBeforeRetire time.Duration
RetryCnt uint64
SessionTTL int
Timeout time.Duration
RetryIntvl time.Duration
RetryCnt uint64
SessionTTL int
}

func DefaultElectionConfig(sessionTTL int) ElectionConfig {
return ElectionConfig{
Timeout: 2 * time.Second,
RetryIntvl: 500 * time.Millisecond,
QueryIntvl: 1 * time.Second,
WaitBeforeRetire: 3 * time.Second,
RetryCnt: 3,
SessionTTL: sessionTTL,
Timeout: 2 * time.Second,
RetryIntvl: 500 * time.Millisecond,
RetryCnt: 3,
SessionTTL: sessionTTL,
}
}

Expand Down Expand Up @@ -123,6 +119,9 @@ func (m *election) campaignLoop(ctx context.Context) {
m.lg.Debug("begin campaign")
select {
case <-session.Done():
// Keep the local owner state until we observe a stronger signal that
// another member has taken over. Retiring here would trade split-brain
// risk for a no-owner window during etcd faults.
m.lg.Info("etcd session is done, creates a new one")
leaseID := session.Lease()
session, err = concurrency.NewSession(m.etcdCli, concurrency.WithTTL(m.cfg.SessionTTL), concurrency.WithContext(ctx))
Expand All @@ -149,26 +148,22 @@ func (m *election) campaignLoop(ctx context.Context) {
continue
}

var wg waitgroup.WaitGroup
childCtx, cancel := context.WithCancel(ctx)
if m.isOwner {
// Check if another member becomes the new owner during campaign.
wg.RunWithRecover(func() {
m.waitRetire(childCtx)
}, nil, m.lg)
}

elec := concurrency.NewElection(session, m.key)
err = elec.Campaign(ctx, m.id)
cancel()
wg.Wait()
if err != nil {
// Campaign may fail because etcd is temporarily unavailable while the
// current owner lease is still valid. Retiring immediately here would
// turn a transient etcd error into a local no-owner window.
m.lg.Info("failed to campaign", zap.Error(errors.WithStack(err)))
continue
}

kv, err := m.getOwnerInfo(ctx)
if err != nil {
// Failing to read the owner key does not necessarily mean this member
// has lost its lease. Keep the current owner state until we see a
// stronger signal such as session.Done, ErrLeaseNotFound, or an owner
// mismatch.
m.lg.Warn("failed to get owner info", zap.Error(err))
continue
}
Expand Down Expand Up @@ -206,30 +201,6 @@ func (m *election) onRetired() {
metrics.OwnerGauge.MetricVec.DeletePartialMatch(map[string]string{metrics.LblType: m.trimedKey})
}

// waitRetire retires after another member becomes the owner so that there will always be an owner.
// It's allowed if multiple members act as the owner for some time but it's not allowed if no member acts as the owner.
// E.g. at least one member needs to bind the VIP even if the etcd server leader is down.
func (m *election) waitRetire(ctx context.Context) {
ticker := time.NewTicker(m.cfg.QueryIntvl)
defer ticker.Stop()
for ctx.Err() == nil {
select {
case <-ticker.C:
id, err := m.GetOwnerID(ctx)
if err != nil {
continue
}
// Another member becomes the owner, retire.
if id != m.id {
m.onRetired()
return
}
case <-ctx.Done():
return
}
}
}

// revokeLease revokes the session lease so that other members can compaign immediately.
func (m *election) revokeLease(leaseID clientv3.LeaseID) {
// If revoke takes longer than the ttl, lease is expired anyway.
Expand Down Expand Up @@ -270,17 +241,20 @@ func (m *election) watchOwner(ctx context.Context, session *concurrency.Session,
select {
case resp, ok := <-watchCh:
if !ok {
m.lg.Info("watcher is closed, no owner")
m.lg.Info("watcher is closed, retry watching owner")
return
}
if resp.Canceled {
m.lg.Info("watch canceled, no owner")
m.lg.Info("watch canceled, retry watching owner")
return
}

for _, ev := range resp.Events {
if ev.Type == mvccpb.DELETE {
m.lg.Info("watch failed, owner is deleted")
// The owner key may disappear before another member campaigns.
// Keep the local owner state and let the next campaign round decide
// whether a new owner has really taken over.
m.lg.Info("watch found owner deleted, retry campaigning")
return
}
}
Expand Down
18 changes: 5 additions & 13 deletions pkg/manager/elect/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ func TestEtcdServerDown(t *testing.T) {

// server is down
addr := ts.shutdownServer()
// Losing etcd temporarily should not force the current owner to retire
// locally. Otherwise VIP owner election would turn a control-plane fault
// into a no-owner data-plane outage.
ts.expectNoEvent("1", 1500*time.Millisecond)
_, err := elec1.GetOwnerID(context.Background())
require.Error(t, err)
// the owner should not retire before the server is up again
ts.expectNoEvent("1")
ts.startServer(addr)
// the owner should not retire because there's no other member
ts.expectNoEvent("1")
ownerID := ts.getOwnerID()
require.Equal(t, "1", ownerID)

Expand All @@ -91,18 +91,10 @@ func TestEtcdServerDown(t *testing.T) {
require.Error(t, err)
elec2 := ts.newElection("2")
elec2.Start(context.Background())
// the owner should not retire before the server is up again
ts.expectNoEvent("1")

// start the server again and the elections recover
ts.startServer(addr)
ownerID = ts.getOwnerID()
if ownerID == "1" {
ts.expectNoEvent("1")
} else {
ts.expectEvent("1", eventTypeRetired)
ts.expectEvent(ownerID, eventTypeElected)
}
require.NotEmpty(t, ts.getOwnerID())
}

func TestOwnerHang(t *testing.T) {
Expand Down
Loading