From af590170517da986e964359713b72f098aeb9344 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 7 May 2026 04:36:18 +0000 Subject: [PATCH] feat(ha): Move failover timeout to per-tenant runtime config Move -distributor.ha-tracker.failover-timeout from HATrackerConfig (global) to the per-tenant Limits struct. The flag name and default value (30s) remain the same, but it can now be overridden per-tenant via runtime config: overrides: "tenant-1": ha_tracker_failover_timeout: 60s Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 12 ++-- pkg/cortex/cortex.go | 2 +- pkg/cortex/runtime_config.go | 2 +- pkg/distributor/distributor.go | 6 +- pkg/distributor/distributor_test.go | 2 +- pkg/ha/ha_tracker.go | 21 ++----- pkg/ha/ha_tracker_http.go | 2 +- pkg/ha/ha_tracker_test.go | 66 ++++++--------------- pkg/util/validation/exporter_test.go | 1 + pkg/util/validation/limits.go | 17 +++++- pkg/util/validation/limits_test.go | 34 +++++++++-- schemas/cortex-config-schema.json | 14 ++--- 13 files changed, 92 insertions(+), 88 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a7640e63d8..19d9b48070f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160 * [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446 +* [CHANGE] HA Tracker: Move `-distributor.ha-tracker.failover-timeout` from a global config to a per-tenant runtime config. The flag name and default value (30s) remain the same. #7481 * [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302 * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f9c206ac537..bcb010c9ea5 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3101,12 +3101,6 @@ ha_tracker: # CLI flag: -distributor.ha-tracker.update-timeout-jitter-max [ha_tracker_update_timeout_jitter_max: | default = 5s] - # The timeout after which a new replica will be accepted if the currently - # elected replica stops sending data. This value must be greater than the - # update timeout plus the maximum jitter. - # CLI flag: -distributor.ha-tracker.failover-timeout - [ha_tracker_failover_timeout: | default = 30s] - # [Experimental] If enabled, fetches all tracked keys on startup to populate # the local cache. This prevents duplicate GET calls for the same key while # the cache is cold, but could cause a spike in GET requests during @@ -4027,6 +4021,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.ha-tracker.max-clusters [ha_max_clusters: | default = 0] +# If the elected replica doesn't send samples in this time, the HA tracker will +# accept a new replica. This value must be greater than the update timeout plus +# the maximum jitter. +# CLI flag: -distributor.ha-tracker.failover-timeout +[ha_tracker_failover_timeout: | default = 30s] + # This flag can be used to specify label names that to drop during sample # ingestion within the distributor and can be repeated in order to drop multiple # labels. diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 36eb3779313..21f346f590f 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -217,7 +217,7 @@ func (c *Config) Validate(log log.Logger) error { if err := c.BlocksStorage.Validate(); err != nil { return errors.Wrap(err, "invalid TSDB config") } - if err := c.LimitsConfig.Validate(c.NameValidationScheme, c.Distributor.ShardByAllLabels, c.Ingester.ActiveSeriesMetricsEnabled); err != nil { + if err := c.LimitsConfig.Validate(c.NameValidationScheme, c.Distributor.ShardByAllLabels, c.Ingester.ActiveSeriesMetricsEnabled, c.Distributor.HATrackerConfig.UpdateTimeout, c.Distributor.HATrackerConfig.UpdateTimeoutJitterMax); err != nil { return errors.Wrap(err, "invalid limits config") } if err := c.LimitsConfig.ValidateQueryLimits("default", c.BlocksStorage.TSDB.CloseIdleTSDBTimeout); err != nil { diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index e1471c33b7d..18d62a49f07 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -77,7 +77,7 @@ func (l runtimeConfigLoader) load(r io.Reader) (any, error) { // refer to https://github.com/cortexproject/cortex/issues/6741#issuecomment-3067244929 if overrides != nil { for userID, ul := range overrides.TenantLimits { - if err := ul.Validate(l.cfg.NameValidationScheme, l.cfg.Distributor.ShardByAllLabels, l.cfg.Ingester.ActiveSeriesMetricsEnabled); err != nil { + if err := ul.Validate(l.cfg.NameValidationScheme, l.cfg.Distributor.ShardByAllLabels, l.cfg.Ingester.ActiveSeriesMetricsEnabled, l.cfg.Distributor.HATrackerConfig.UpdateTimeout, l.cfg.Distributor.HATrackerConfig.UpdateTimeoutJitterMax); err != nil { return nil, err } if err := ul.ValidateQueryLimits(userID, l.cfg.BlocksStorage.TSDB.CloseIdleTSDBTimeout); err != nil { diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index b217a09e9c5..7686cd8c21c 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -246,7 +246,11 @@ func (cfg *Config) Validate(limits validation.Limits) error { return errInvalidTenantShardSize } - return cfg.HATrackerConfig.Validate() + if err := cfg.HATrackerConfig.Validate(); err != nil { + return err + } + + return nil } const ( diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index d8d270aaf3c..855e4e22f33 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -3402,9 +3402,9 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, [] EnableHATracker: true, KVStore: kv.Config{Mock: mock}, UpdateTimeout: 100 * time.Millisecond, - FailoverTimeout: time.Hour, } cfg.limits.HAMaxClusters = 100 + cfg.limits.HATrackerFailoverTimeout = model.Duration(time.Hour) } distributorCfg.RemoteWriteV2Enabled = cfg.remoteWriteV2Enabled diff --git a/pkg/ha/ha_tracker.go b/pkg/ha/ha_tracker.go index d798e07d184..cf8cef088d1 100644 --- a/pkg/ha/ha_tracker.go +++ b/pkg/ha/ha_tracker.go @@ -32,7 +32,6 @@ const ( var ( errNegativeUpdateTimeoutJitterMax = errors.New("HA tracker max update timeout jitter shouldn't be negative") - errInvalidFailoverTimeout = "HA Tracker failover timeout (%v) must be at least 1s greater than update timeout - max jitter (%v)" ) // nolint:revive @@ -40,6 +39,9 @@ type HATrackerLimits interface { // MaxHAReplicaGroups returns max number of replica groups that HA tracker should track for a user. // Samples from additional replicaGroups are rejected. MaxHAReplicaGroups(user string) int + + // HATrackerFailoverTimeout returns the failover timeout for a user. + HATrackerFailoverTimeout(user string) time.Duration } // ProtoReplicaDescFactory makes new InstanceDescs @@ -61,11 +63,6 @@ type HATrackerConfig struct { // is more than this duration. UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"` UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"` - // We should only failover to accepting samples from a replica - // other than the replica written in the KVStore if the difference - // between the stored timestamp and the time we received a sample is - // more than this duration - FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"` // EnableStartupSync controls whether to fetch all tracked keys from the KV store // on startup to populate the local cache. // This prevents duplicate GET calls for the same key while the cache is cold, @@ -182,7 +179,6 @@ func (cfg *HATrackerConfig) RegisterFlagsWithPrefix(flagPrefix string, kvPrefix f.BoolVar(&cfg.EnableHATracker, finalFlagPrefix+"ha-tracker.enable", false, "Enable the HA tracker so that it can accept data from Prometheus HA replicas gracefully (requires labels).") f.DurationVar(&cfg.UpdateTimeout, finalFlagPrefix+"ha-tracker.update-timeout", 15*time.Second, "The time interval that must pass since the last timestamp update in the KV store before updating it again for a given cluster.") f.DurationVar(&cfg.UpdateTimeoutJitterMax, finalFlagPrefix+"ha-tracker.update-timeout-jitter-max", 5*time.Second, "The maximum jitter applied to the update timeout to spread KV store updates over time.") - f.DurationVar(&cfg.FailoverTimeout, finalFlagPrefix+"ha-tracker.failover-timeout", 30*time.Second, "The timeout after which a new replica will be accepted if the currently elected replica stops sending data. This value must be greater than the update timeout plus the maximum jitter.") f.BoolVar(&cfg.EnableStartupSync, finalFlagPrefix+"ha-tracker.enable-startup-sync", false, "[Experimental] If enabled, fetches all tracked keys on startup to populate the local cache. This prevents duplicate GET calls for the same key while the cache is cold, but could cause a spike in GET requests during initialization if the number of tracked keys is large.") // We want the ability to use different Consul instances for the ring and @@ -198,11 +194,6 @@ func (cfg *HATrackerConfig) Validate() error { return errNegativeUpdateTimeoutJitterMax } - minFailureTimeout := cfg.UpdateTimeout + cfg.UpdateTimeoutJitterMax + time.Second - if cfg.FailoverTimeout < minFailureTimeout { - return fmt.Errorf(errInvalidFailoverTimeout, cfg.FailoverTimeout, minFailureTimeout) - } - // Tracker kv store only supports consul, etcd, memberlist, and multi. storeAllowedList := []string{"consul", "etcd", "memberlist", "multi"} if !slices.Contains(storeAllowedList, cfg.KVStore.Store) { @@ -612,7 +603,7 @@ func (c *HATracker) CheckReplica(ctx context.Context, userID, replicaGroup, repl } } - err := c.checkKVStore(ctx, key, replica, now) + err := c.checkKVStore(ctx, key, replica, userID, now) c.kvCASCalls.WithLabelValues(userID, replicaGroup).Inc() if err != nil { // The callback within checkKVStore will return a ReplicasNotMatchError if the sample is being deduped, @@ -624,7 +615,7 @@ func (c *HATracker) CheckReplica(ctx context.Context, userID, replicaGroup, repl return err } -func (c *HATracker) checkKVStore(ctx context.Context, key, replica string, now time.Time) error { +func (c *HATracker) checkKVStore(ctx context.Context, key, replica, userID string, now time.Time) error { return c.client.CAS(ctx, key, func(in any) (out any, retry bool, err error) { if desc, ok := in.(*ReplicaDesc); ok && desc.DeletedAt == 0 { // We don't need to CAS and update the timestamp in the KV store if the timestamp we've received @@ -635,7 +626,7 @@ func (c *HATracker) checkKVStore(ctx context.Context, key, replica string, now t // We shouldn't failover to accepting a new replica if the timestamp we've received this sample at // is less than failover timeout amount of time since the timestamp in the KV store. - if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.FailoverTimeout { + if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.limits.HATrackerFailoverTimeout(userID) { return nil, false, ReplicasNotMatchError{replica: replica, elected: desc.Replica} } } diff --git a/pkg/ha/ha_tracker_http.go b/pkg/ha/ha_tracker_http.go index 14480f89245..252efd2769d 100644 --- a/pkg/ha/ha_tracker_http.go +++ b/pkg/ha/ha_tracker_http.go @@ -82,7 +82,7 @@ func (h *HATracker) ServeHTTP(w http.ResponseWriter, req *http.Request) { Replica: desc.Replica, ElectedAt: timestamp.Time(desc.ReceivedAt), UpdateTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.UpdateTimeout)), - FailoverTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.FailoverTimeout)), + FailoverTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.limits.HATrackerFailoverTimeout(chunks[0]))), }) } h.electedLock.RUnlock() diff --git a/pkg/ha/ha_tracker_test.go b/pkg/ha/ha_tracker_test.go index 5376bb91ee8..fd2a53a0765 100644 --- a/pkg/ha/ha_tracker_test.go +++ b/pkg/ha/ha_tracker_test.go @@ -88,30 +88,6 @@ func TestHATrackerConfig_Validate(t *testing.T) { }(), expectedErr: errNegativeUpdateTimeoutJitterMax, }, - "should fail if failover timeout is < update timeout + jitter + 1 sec": { - cfg: func() HATrackerConfig { - cfg := HATrackerConfig{} - flagext.DefaultValues(&cfg) - cfg.FailoverTimeout = 5 * time.Second - cfg.UpdateTimeout = 4 * time.Second - cfg.UpdateTimeoutJitterMax = 2 * time.Second - - return cfg - }(), - expectedErr: fmt.Errorf(errInvalidFailoverTimeout, 5*time.Second, 7*time.Second), - }, - "should pass if failover timeout is >= update timeout + jitter + 1 sec": { - cfg: func() HATrackerConfig { - cfg := HATrackerConfig{} - flagext.DefaultValues(&cfg) - cfg.FailoverTimeout = 7 * time.Second - cfg.UpdateTimeout = 4 * time.Second - cfg.UpdateTimeoutJitterMax = 2 * time.Second - - return cfg - }(), - expectedErr: nil, - }, "should pass with memberlist kv store": { cfg: func() HATrackerConfig { cfg := HATrackerConfig{} @@ -180,7 +156,7 @@ func TestHATracker_CleanupDeletesArePropagatedWithMemberlist(t *testing.T) { Store: "memberlist", }, } - tracker, err := NewHATracker(trackerCfg, nil, HATrackerStatusConfig{}, reg, "", logger) + tracker, err := NewHATracker(trackerCfg, trackerLimits{failoverTimeout: 2 * time.Second}, HATrackerStatusConfig{}, reg, "", logger) require.NoError(t, err) // Inject our test memberlist client into the tracker @@ -247,11 +223,10 @@ func TestWatchPrefixNilPanicWithMemberlist(t *testing.T) { EnableHATracker: false, // to inject our client before starting the tracker UpdateTimeout: time.Second, UpdateTimeoutJitterMax: 0, - FailoverTimeout: 2 * time.Second, KVStore: kv.Config{Store: "memberlist"}, } - tracker, err := NewHATracker(trackerCfg, nil, HATrackerStatusConfig{}, reg, "test", logger) + tracker, err := NewHATracker(trackerCfg, trackerLimits{failoverTimeout: 2 * time.Second}, HATrackerStatusConfig{}, reg, "test", logger) require.NoError(t, err) tracker.cfg.EnableHATracker = true tracker.client = client @@ -299,8 +274,7 @@ func TestWatchPrefixAssignment(t *testing.T) { KVStore: kv.Config{Mock: mock}, UpdateTimeout: time.Millisecond, UpdateTimeoutJitterMax: 0, - FailoverTimeout: time.Millisecond * 2, - }, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger()) + }, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -325,8 +299,7 @@ func TestCheckReplicaOverwriteTimeout(t *testing.T) { KVStore: kv.Config{Store: "inmemory"}, UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, - FailoverTimeout: time.Second, - }, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger()) + }, trackerLimits{maxReplicaGroups: 100, failoverTimeout: 2 * time.Millisecond}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -365,8 +338,7 @@ func TestCheckReplicaMultiCluster(t *testing.T) { KVStore: kv.Config{Store: "inmemory"}, UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, - FailoverTimeout: time.Second, - }, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger()) + }, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -417,8 +389,7 @@ func TestCheckReplicaMultiClusterTimeout(t *testing.T) { KVStore: kv.Config{Store: "inmemory"}, UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, - FailoverTimeout: time.Second, - }, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger()) + }, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -492,8 +463,7 @@ func TestCheckReplicaUpdateTimeout(t *testing.T) { KVStore: kv.Config{Mock: mock}, UpdateTimeout: time.Second, UpdateTimeoutJitterMax: 0, - FailoverTimeout: time.Second, - }, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger()) + }, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -542,8 +512,7 @@ func TestCheckReplicaMultiUser(t *testing.T) { KVStore: kv.Config{Mock: mock}, UpdateTimeout: 100 * time.Millisecond, UpdateTimeoutJitterMax: 0, - FailoverTimeout: time.Second, - }, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger()) + }, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -623,8 +592,7 @@ func TestCheckReplicaUpdateTimeoutJitter(t *testing.T) { KVStore: kv.Config{Mock: mock}, UpdateTimeout: testData.updateTimeout, UpdateTimeoutJitterMax: 0, - FailoverTimeout: time.Second, - }, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger()) + }, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -672,14 +640,13 @@ func TestHAClustersLimit(t *testing.T) { t.Cleanup(func() { assert.NoError(t, closer.Close()) }) mock := kv.PrefixClient(kvStore, "prefix") - limits := trackerLimits{maxReplicaGroups: 2} + limits := trackerLimits{maxReplicaGroups: 2, failoverTimeout: time.Second} t1, err := NewHATracker(HATrackerConfig{ EnableHATracker: true, KVStore: kv.Config{Mock: mock}, UpdateTimeout: time.Second, UpdateTimeoutJitterMax: 0, - FailoverTimeout: time.Second, }, limits, haTrackerStatusConfig, nil, "test-ha-tracker", log.NewNopLogger()) require.NoError(t, err) @@ -767,12 +734,17 @@ func TestReplicasNotMatchError(t *testing.T) { type trackerLimits struct { maxReplicaGroups int + failoverTimeout time.Duration } func (l trackerLimits) MaxHAReplicaGroups(_ string) int { return l.maxReplicaGroups } +func (l trackerLimits) HATrackerFailoverTimeout(_ string) time.Duration { + return l.failoverTimeout +} + func TestHATracker_MetricsCleanup(t *testing.T) { t.Parallel() reg := prometheus.NewPedanticRegistry() @@ -856,8 +828,7 @@ func TestCheckReplicaCleanup(t *testing.T) { KVStore: kv.Config{Mock: mock}, UpdateTimeout: 1 * time.Second, UpdateTimeoutJitterMax: 0, - FailoverTimeout: time.Second, - }, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", util_log.Logger) + }, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", util_log.Logger) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck @@ -944,7 +915,7 @@ func BenchmarkHATracker_syncKVStoreToLocalMap(b *testing.B) { EnableStartupSync: true, KVStore: kv.Config{Mock: mockKV}, } - tracker, _ := NewHATracker(cfg, trackerLimits{}, haTrackerStatusConfig, nil, "bench", log.NewNopLogger()) + tracker, _ := NewHATracker(cfg, trackerLimits{failoverTimeout: time.Second}, haTrackerStatusConfig, nil, "bench", log.NewNopLogger()) b.ReportAllocs() for b.Loop() { @@ -1013,10 +984,9 @@ func TestHATracker_CacheWarmupOnStart(t *testing.T) { KVStore: kv.Config{Mock: mockKV}, // Use the seeded KV UpdateTimeout: time.Second, UpdateTimeoutJitterMax: 0, - FailoverTimeout: time.Second, } - tracker, err := NewHATracker(cfg, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger()) + tracker, err := NewHATracker(cfg, trackerLimits{maxReplicaGroups: 100, failoverTimeout: time.Second}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger()) require.NoError(t, err) // Start ha tracker diff --git a/pkg/util/validation/exporter_test.go b/pkg/util/validation/exporter_test.go index fa24aa4a4f6..6067ed96067 100644 --- a/pkg/util/validation/exporter_test.go +++ b/pkg/util/validation/exporter_test.go @@ -59,6 +59,7 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="enforce_metadata_metric_name",user="tenant-a"} 1 cortex_overrides{limit_name="enforce_metric_name",user="tenant-a"} 1 cortex_overrides{limit_name="ha_max_clusters",user="tenant-a"} 0 + cortex_overrides{limit_name="ha_tracker_failover_timeout",user="tenant-a"} 30 cortex_overrides{limit_name="ingestion_burst_size",user="tenant-a"} 50000 cortex_overrides{limit_name="ingestion_rate",user="tenant-a"} 25000 cortex_overrides{limit_name="ingestion_tenant_shard_size",user="tenant-a"} 0 diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index a9b92c866c5..cfc8388139a 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -138,6 +138,7 @@ type Limits struct { HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"` HAReplicaLabel string `yaml:"ha_replica_label" json:"ha_replica_label"` HAMaxClusters int `yaml:"ha_max_clusters" json:"ha_max_clusters"` + HATrackerFailoverTimeout model.Duration `yaml:"ha_tracker_failover_timeout" json:"ha_tracker_failover_timeout"` DropLabels flagext.StringSlice `yaml:"drop_labels" json:"drop_labels"` MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"` MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"` @@ -281,6 +282,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.") f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.") f.IntVar(&l.HAMaxClusters, "distributor.ha-tracker.max-clusters", 0, "Maximum number of clusters that HA tracker will keep track of for single user. 0 to disable the limit.") + _ = l.HATrackerFailoverTimeout.Set("30s") + f.Var(&l.HATrackerFailoverTimeout, "distributor.ha-tracker.failover-timeout", "If the elected replica doesn't send samples in this time, the HA tracker will accept a new replica. This value must be greater than the update timeout plus the maximum jitter.") f.Var((*flagext.StringSliceCSV)(&l.PromoteResourceAttributes), "distributor.promote-resource-attributes", "Comma separated list of resource attributes that should be converted to labels.") f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.") f.BoolVar(&l.EnableTypeAndUnitLabels, "distributor.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This applies to remote write v2 and OTLP requests.") @@ -397,7 +400,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { // Validate the limits config and returns an error if the validation // doesn't pass -func (l *Limits) Validate(nameValidationScheme model.ValidationScheme, shardByAllLabels bool, activeSeriesMetricsEnabled bool) error { +func (l *Limits) Validate(nameValidationScheme model.ValidationScheme, shardByAllLabels bool, activeSeriesMetricsEnabled bool, haTrackerUpdateTimeout time.Duration, haTrackerUpdateTimeoutJitterMax time.Duration) error { // The ingester.max-global-series-per-user metric is not supported // if shard-by-all-labels is disabled if l.MaxGlobalSeriesPerUser > 0 && !shardByAllLabels { @@ -444,6 +447,13 @@ func (l *Limits) Validate(nameValidationScheme model.ValidationScheme, shardByAl } } + if l.HATrackerFailoverTimeout > 0 { + minFailoverTimeout := haTrackerUpdateTimeout + haTrackerUpdateTimeoutJitterMax + time.Second + if time.Duration(l.HATrackerFailoverTimeout) < minFailoverTimeout { + return fmt.Errorf("HA Tracker failover timeout (%v) must be at least 1s greater than update timeout - max jitter (%v)", time.Duration(l.HATrackerFailoverTimeout), minFailoverTimeout) + } + } + return nil } func (l *Limits) ValidateQueryLimits(userID string, closeIdleTSDBTimeout time.Duration) error { @@ -1070,6 +1080,11 @@ func (o *Overrides) MaxHAReplicaGroups(user string) int { return o.GetOverridesForUser(user).HAMaxClusters } +// HATrackerFailoverTimeout returns the per-tenant HA tracker failover timeout. +func (o *Overrides) HATrackerFailoverTimeout(user string) time.Duration { + return time.Duration(o.GetOverridesForUser(user).HATrackerFailoverTimeout) +} + // S3SSEType returns the per-tenant S3 SSE type. func (o *Overrides) S3SSEType(user string) string { return o.GetOverridesForUser(user).S3SSEType diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index a0b0b90e3c1..08e2c844d7b 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -45,11 +45,13 @@ func TestLimits_Validate(t *testing.T) { t.Parallel() tests := map[string]struct { - limits Limits - shardByAllLabels bool - activeSeriesMetricsEnabled bool - expected error - nameValidationScheme model.ValidationScheme + limits Limits + shardByAllLabels bool + activeSeriesMetricsEnabled bool + expected error + nameValidationScheme model.ValidationScheme + haTrackerUpdateTimeout time.Duration + haTrackerUpdateTimeoutJitterMax time.Duration }{ "max-global-series-per-user disabled and shard-by-all-labels=false": { limits: Limits{MaxGlobalSeriesPerUser: 0}, @@ -186,6 +188,18 @@ func TestLimits_Validate(t *testing.T) { }, expected: errInvalidMetricRelabelConfigs, }, + "ha_tracker_failover_timeout too small": { + limits: Limits{HATrackerFailoverTimeout: model.Duration(5 * time.Second)}, + haTrackerUpdateTimeout: 4 * time.Second, + haTrackerUpdateTimeoutJitterMax: 2 * time.Second, + expected: fmt.Errorf("HA Tracker failover timeout (5s) must be at least 1s greater than update timeout - max jitter (7s)"), + }, + "ha_tracker_failover_timeout valid": { + limits: Limits{HATrackerFailoverTimeout: model.Duration(7 * time.Second)}, + haTrackerUpdateTimeout: 4 * time.Second, + haTrackerUpdateTimeoutJitterMax: 2 * time.Second, + expected: nil, + }, } for testName, testData := range tests { @@ -194,7 +208,15 @@ func TestLimits_Validate(t *testing.T) { if testData.nameValidationScheme == model.UTF8Validation { nameValidationScheme = testData.nameValidationScheme } - assert.ErrorIs(t, testData.limits.Validate(nameValidationScheme, testData.shardByAllLabels, testData.activeSeriesMetricsEnabled), testData.expected) + err := testData.limits.Validate(nameValidationScheme, testData.shardByAllLabels, testData.activeSeriesMetricsEnabled, testData.haTrackerUpdateTimeout, testData.haTrackerUpdateTimeoutJitterMax) + if testData.expected == nil { + assert.NoError(t, err) + } else if testData.haTrackerUpdateTimeout > 0 { + // HA tracker validation uses formatted errors, not sentinel errors. + assert.EqualError(t, err, testData.expected.Error()) + } else { + assert.ErrorIs(t, err, testData.expected) + } }) } } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 50b9a88bb71..dca7b2f66b7 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -3734,13 +3734,6 @@ "type": "boolean", "x-cli-flag": "distributor.ha-tracker.enable-startup-sync" }, - "ha_tracker_failover_timeout": { - "default": "30s", - "description": "The timeout after which a new replica will be accepted if the currently elected replica stops sending data. This value must be greater than the update timeout plus the maximum jitter.", - "type": "string", - "x-cli-flag": "distributor.ha-tracker.failover-timeout", - "x-format": "duration" - }, "ha_tracker_update_timeout": { "default": "15s", "description": "The time interval that must pass since the last timestamp update in the KV store before updating it again for a given cluster.", @@ -5119,6 +5112,13 @@ "type": "string", "x-cli-flag": "distributor.ha-tracker.replica" }, + "ha_tracker_failover_timeout": { + "default": "30s", + "description": "If the elected replica doesn't send samples in this time, the HA tracker will accept a new replica. This value must be greater than the update timeout plus the maximum jitter.", + "type": "string", + "x-cli-flag": "distributor.ha-tracker.failover-timeout", + "x-format": "duration" + }, "ingestion_burst_size": { "default": 50000, "description": "Per-user allowed ingestion burst size (in number of samples).",