Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
* [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446
* [CHANGE] HA Tracker: Move `-distributor.ha-tracker.failover-timeout` from a global config to a per-tenant runtime config. The flag name and default value (30s) remain the same. #7481
* [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
Expand Down
12 changes: 6 additions & 6 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3101,12 +3101,6 @@ ha_tracker:
# CLI flag: -distributor.ha-tracker.update-timeout-jitter-max
[ha_tracker_update_timeout_jitter_max: <duration> | default = 5s]

# The timeout after which a new replica will be accepted if the currently
# elected replica stops sending data. This value must be greater than the
# update timeout plus the maximum jitter.
# CLI flag: -distributor.ha-tracker.failover-timeout
[ha_tracker_failover_timeout: <duration> | default = 30s]

# [Experimental] If enabled, fetches all tracked keys on startup to populate
# the local cache. This prevents duplicate GET calls for the same key while
# the cache is cold, but could cause a spike in GET requests during
Expand Down Expand Up @@ -4027,6 +4021,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -distributor.ha-tracker.max-clusters
[ha_max_clusters: <int> | default = 0]

# If the elected replica doesn't send samples in this time, the HA tracker will
# accept a new replica. This value must be greater than the update timeout plus
# the maximum jitter.
# CLI flag: -distributor.ha-tracker.failover-timeout
[ha_tracker_failover_timeout: <duration> | default = 30s]

# This flag can be used to specify label names that to drop during sample
# ingestion within the distributor and can be repeated in order to drop multiple
# labels.
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3402,9 +3402,9 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
EnableHATracker: true,
KVStore: kv.Config{Mock: mock},
UpdateTimeout: 100 * time.Millisecond,
FailoverTimeout: time.Hour,
}
cfg.limits.HAMaxClusters = 100
cfg.limits.HATrackerFailoverTimeout = model.Duration(time.Hour)
}

distributorCfg.RemoteWriteV2Enabled = cfg.remoteWriteV2Enabled
Expand Down
21 changes: 6 additions & 15 deletions pkg/ha/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ const (

var (
errNegativeUpdateTimeoutJitterMax = errors.New("HA tracker max update timeout jitter shouldn't be negative")
errInvalidFailoverTimeout = "HA Tracker failover timeout (%v) must be at least 1s greater than update timeout - max jitter (%v)"
)

// nolint:revive
type HATrackerLimits interface {
// MaxHAReplicaGroups returns max number of replica groups that HA tracker should track for a user.
// Samples from additional replicaGroups are rejected.
MaxHAReplicaGroups(user string) int

// HATrackerFailoverTimeout returns the failover timeout for a user.
HATrackerFailoverTimeout(user string) time.Duration
}

// ProtoReplicaDescFactory makes new InstanceDescs
Expand All @@ -61,11 +63,6 @@ type HATrackerConfig struct {
// is more than this duration.
UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"`
UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"`
// We should only failover to accepting samples from a replica
// other than the replica written in the KVStore if the difference
// between the stored timestamp and the time we received a sample is
// more than this duration
FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"`
// EnableStartupSync controls whether to fetch all tracked keys from the KV store
// on startup to populate the local cache.
// This prevents duplicate GET calls for the same key while the cache is cold,
Expand Down Expand Up @@ -182,7 +179,6 @@ func (cfg *HATrackerConfig) RegisterFlagsWithPrefix(flagPrefix string, kvPrefix
f.BoolVar(&cfg.EnableHATracker, finalFlagPrefix+"ha-tracker.enable", false, "Enable the HA tracker so that it can accept data from Prometheus HA replicas gracefully (requires labels).")
f.DurationVar(&cfg.UpdateTimeout, finalFlagPrefix+"ha-tracker.update-timeout", 15*time.Second, "The time interval that must pass since the last timestamp update in the KV store before updating it again for a given cluster.")
f.DurationVar(&cfg.UpdateTimeoutJitterMax, finalFlagPrefix+"ha-tracker.update-timeout-jitter-max", 5*time.Second, "The maximum jitter applied to the update timeout to spread KV store updates over time.")
f.DurationVar(&cfg.FailoverTimeout, finalFlagPrefix+"ha-tracker.failover-timeout", 30*time.Second, "The timeout after which a new replica will be accepted if the currently elected replica stops sending data. This value must be greater than the update timeout plus the maximum jitter.")
f.BoolVar(&cfg.EnableStartupSync, finalFlagPrefix+"ha-tracker.enable-startup-sync", false, "[Experimental] If enabled, fetches all tracked keys on startup to populate the local cache. This prevents duplicate GET calls for the same key while the cache is cold, but could cause a spike in GET requests during initialization if the number of tracked keys is large.")

// We want the ability to use different Consul instances for the ring and
Expand All @@ -198,11 +194,6 @@ func (cfg *HATrackerConfig) Validate() error {
return errNegativeUpdateTimeoutJitterMax
}

minFailureTimeout := cfg.UpdateTimeout + cfg.UpdateTimeoutJitterMax + time.Second
if cfg.FailoverTimeout < minFailureTimeout {
return fmt.Errorf(errInvalidFailoverTimeout, cfg.FailoverTimeout, minFailureTimeout)
}

// Tracker kv store only supports consul, etcd, memberlist, and multi.
storeAllowedList := []string{"consul", "etcd", "memberlist", "multi"}
if !slices.Contains(storeAllowedList, cfg.KVStore.Store) {
Expand Down Expand Up @@ -612,7 +603,7 @@ func (c *HATracker) CheckReplica(ctx context.Context, userID, replicaGroup, repl
}
}

err := c.checkKVStore(ctx, key, replica, now)
err := c.checkKVStore(ctx, key, replica, userID, now)
c.kvCASCalls.WithLabelValues(userID, replicaGroup).Inc()
if err != nil {
// The callback within checkKVStore will return a ReplicasNotMatchError if the sample is being deduped,
Expand All @@ -624,7 +615,7 @@ func (c *HATracker) CheckReplica(ctx context.Context, userID, replicaGroup, repl
return err
}

func (c *HATracker) checkKVStore(ctx context.Context, key, replica string, now time.Time) error {
func (c *HATracker) checkKVStore(ctx context.Context, key, replica, userID string, now time.Time) error {
return c.client.CAS(ctx, key, func(in any) (out any, retry bool, err error) {
if desc, ok := in.(*ReplicaDesc); ok && desc.DeletedAt == 0 {
// We don't need to CAS and update the timestamp in the KV store if the timestamp we've received
Expand All @@ -635,7 +626,7 @@ func (c *HATracker) checkKVStore(ctx context.Context, key, replica string, now t

// We shouldn't failover to accepting a new replica if the timestamp we've received this sample at
// is less than failover timeout amount of time since the timestamp in the KV store.
if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.FailoverTimeout {
if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.limits.HATrackerFailoverTimeout(userID) {
return nil, false, ReplicasNotMatchError{replica: replica, elected: desc.Replica}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ha/ha_tracker_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (h *HATracker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Replica: desc.Replica,
ElectedAt: timestamp.Time(desc.ReceivedAt),
UpdateTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.UpdateTimeout)),
FailoverTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.FailoverTimeout)),
FailoverTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.limits.HATrackerFailoverTimeout(chunks[0]))),
})
}
h.electedLock.RUnlock()
Expand Down
66 changes: 18 additions & 48 deletions pkg/ha/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,30 +88,6 @@ func TestHATrackerConfig_Validate(t *testing.T) {
}(),
expectedErr: errNegativeUpdateTimeoutJitterMax,
},
"should fail if failover timeout is < update timeout + jitter + 1 sec": {
cfg: func() HATrackerConfig {
cfg := HATrackerConfig{}
flagext.DefaultValues(&cfg)
cfg.FailoverTimeout = 5 * time.Second
cfg.UpdateTimeout = 4 * time.Second
cfg.UpdateTimeoutJitterMax = 2 * time.Second

return cfg
}(),
expectedErr: fmt.Errorf(errInvalidFailoverTimeout, 5*time.Second, 7*time.Second),
},
"should pass if failover timeout is >= update timeout + jitter + 1 sec": {
cfg: func() HATrackerConfig {
cfg := HATrackerConfig{}
flagext.DefaultValues(&cfg)
cfg.FailoverTimeout = 7 * time.Second
cfg.UpdateTimeout = 4 * time.Second
cfg.UpdateTimeoutJitterMax = 2 * time.Second

return cfg
}(),
expectedErr: nil,
},
"should pass with memberlist kv store": {
cfg: func() HATrackerConfig {
cfg := HATrackerConfig{}
Expand Down Expand Up @@ -180,7 +156,7 @@ func TestHATracker_CleanupDeletesArePropagatedWithMemberlist(t *testing.T) {
Store: "memberlist",
},
}
tracker, err := NewHATracker(trackerCfg, nil, HATrackerStatusConfig{}, reg, "", logger)
tracker, err := NewHATracker(trackerCfg, trackerLimits{failoverTimeout: 2 * time.Second}, HATrackerStatusConfig{}, reg, "", logger)
require.NoError(t, err)

// Inject our test memberlist client into the tracker
Expand Down Expand Up @@ -247,11 +223,10 @@ func TestWatchPrefixNilPanicWithMemberlist(t *testing.T) {
EnableHATracker: false, // to inject our client before starting the tracker
UpdateTimeout: time.Second,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: 2 * time.Second,
KVStore: kv.Config{Store: "memberlist"},
}

tracker, err := NewHATracker(trackerCfg, nil, HATrackerStatusConfig{}, reg, "test", logger)
tracker, err := NewHATracker(trackerCfg, trackerLimits{failoverTimeout: 2 * time.Second}, HATrackerStatusConfig{}, reg, "test", logger)
require.NoError(t, err)
tracker.cfg.EnableHATracker = true
tracker.client = client
Expand Down Expand Up @@ -299,8 +274,7 @@ func TestWatchPrefixAssignment(t *testing.T) {
KVStore: kv.Config{Mock: mock},
UpdateTimeout: time.Millisecond,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Millisecond * 2,
}, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger())
}, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand All @@ -325,8 +299,7 @@ func TestCheckReplicaOverwriteTimeout(t *testing.T) {
KVStore: kv.Config{Store: "inmemory"},
UpdateTimeout: 100 * time.Millisecond,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger())
}, trackerLimits{maxReplicaGroups: 100, failoverTimeout: 2 * time.Millisecond}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -365,8 +338,7 @@ func TestCheckReplicaMultiCluster(t *testing.T) {
KVStore: kv.Config{Store: "inmemory"},
UpdateTimeout: 100 * time.Millisecond,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger())
}, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -417,8 +389,7 @@ func TestCheckReplicaMultiClusterTimeout(t *testing.T) {
KVStore: kv.Config{Store: "inmemory"},
UpdateTimeout: 100 * time.Millisecond,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger())
}, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -492,8 +463,7 @@ func TestCheckReplicaUpdateTimeout(t *testing.T) {
KVStore: kv.Config{Mock: mock},
UpdateTimeout: time.Second,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger())
}, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -542,8 +512,7 @@ func TestCheckReplicaMultiUser(t *testing.T) {
KVStore: kv.Config{Mock: mock},
UpdateTimeout: 100 * time.Millisecond,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger())
}, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -623,8 +592,7 @@ func TestCheckReplicaUpdateTimeoutJitter(t *testing.T) {
KVStore: kv.Config{Mock: mock},
UpdateTimeout: testData.updateTimeout,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger())
}, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -672,14 +640,13 @@ func TestHAClustersLimit(t *testing.T) {
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

mock := kv.PrefixClient(kvStore, "prefix")
limits := trackerLimits{maxReplicaGroups: 2}
limits := trackerLimits{maxReplicaGroups: 2, failoverTimeout: time.Second}

t1, err := NewHATracker(HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Mock: mock},
UpdateTimeout: time.Second,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}, limits, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger())

require.NoError(t, err)
Expand Down Expand Up @@ -767,12 +734,17 @@ func TestReplicasNotMatchError(t *testing.T) {

type trackerLimits struct {
maxReplicaGroups int
failoverTimeout time.Duration
}

func (l trackerLimits) MaxHAReplicaGroups(_ string) int {
return l.maxReplicaGroups
}

func (l trackerLimits) HATrackerFailoverTimeout(_ string) time.Duration {
return l.failoverTimeout
}

func TestHATracker_MetricsCleanup(t *testing.T) {
t.Parallel()
reg := prometheus.NewPedanticRegistry()
Expand Down Expand Up @@ -856,8 +828,7 @@ func TestCheckReplicaCleanup(t *testing.T) {
KVStore: kv.Config{Mock: mock},
UpdateTimeout: 1 * time.Second,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", util_log.Logger)
}, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", util_log.Logger)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand Down Expand Up @@ -944,7 +915,7 @@ func BenchmarkHATracker_syncKVStoreToLocalMap(b *testing.B) {
EnableStartupSync: true,
KVStore: kv.Config{Mock: mockKV},
}
tracker, _ := NewHATracker(cfg, trackerLimits{}, haTrackerStatusConfig, nil, "bench", log.NewNopLogger())
tracker, _ := NewHATracker(cfg, trackerLimits{failoverTimeout: time.Second}, haTrackerStatusConfig, nil, "bench", log.NewNopLogger())

b.ReportAllocs()
for b.Loop() {
Expand Down Expand Up @@ -1013,10 +984,9 @@ func TestHATracker_CacheWarmupOnStart(t *testing.T) {
KVStore: kv.Config{Mock: mockKV}, // Use the seeded KV
UpdateTimeout: time.Second,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}

tracker, err := NewHATracker(cfg, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger())
tracker, err := NewHATracker(cfg, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger())
require.NoError(t, err)

// Start ha tracker
Expand Down
1 change: 1 addition & 0 deletions pkg/util/validation/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestOverridesExporter_withConfig(t *testing.T) {
cortex_overrides{limit_name="enforce_metadata_metric_name",user="tenant-a"} 1
cortex_overrides{limit_name="enforce_metric_name",user="tenant-a"} 1
cortex_overrides{limit_name="ha_max_clusters",user="tenant-a"} 0
cortex_overrides{limit_name="ha_tracker_failover_timeout",user="tenant-a"} 30
cortex_overrides{limit_name="ingestion_burst_size",user="tenant-a"} 50000
cortex_overrides{limit_name="ingestion_rate",user="tenant-a"} 25000
cortex_overrides{limit_name="ingestion_tenant_shard_size",user="tenant-a"} 0
Expand Down
Loading
Loading