From 4691ef90b8d71cda97fcd9d10392d20679b01e3e Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 10 May 2026 22:56:18 +0000 Subject: [PATCH] feat(ingester): Add head-only queried series metrics Add two new metrics that track query activity against the TSDB head only: 1. cortex_ingester_queried_series_from_head: Estimated unique series queried from head within a configurable time window (HLL-based). 2. cortex_ingester_queried_metric_series_in_head: Current head cardinality for each metric name that was queried within the configured window. Implementation uses a BlockChunkQuerierFunc wrapper that intercepts Select calls only for head queriers (identified by RangeHead/Head ULIDs). The wrapper collects series hashes for the HLL tracker and records metric names for the per-metric cardinality tracker. Key design decisions: - Reuses existing ActiveQueriedSeries HLL and ActiveQueriedSeriesService for the total series metric (no new service needed) - Per-metric-name tracking uses a simple map[string]time.Time with lazy count lookup via seriesInMetric.getSeriesCountForMetric() on collection - Only __name__= equality matchers trigger per-metric tracking - Sampling is supported for the HLL metric to reduce overhead - Both metrics share the same configurable time window New configuration flags (all experimental): - ingester.head-queried-series-metrics-enabled - ingester.head-queried-series-metrics-windows - ingester.head-queried-series-metrics-window-duration - ingester.head-queried-series-metrics-sample-rate Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 18 ++ docs/configuration/v1-guarantees.md | 7 + pkg/ingester/head_queried_series_querier.go | 95 ++++++++ .../head_queried_series_querier_test.go | 212 ++++++++++++++++++ pkg/ingester/ingester.go | 129 +++++++++-- pkg/ingester/metrics.go | 20 ++ pkg/ingester/metrics_test.go | 9 +- pkg/ingester/queried_metric_tracker.go | 46 ++++ pkg/ingester/queried_metric_tracker_test.go | 79 +++++++ pkg/ingester/user_metrics_metadata_test.go | 1 + pkg/ingester/user_state.go | 7 + schemas/cortex-config-schema.json | 28 +++ 13 files changed, 630 insertions(+), 22 deletions(-) create mode 100644 pkg/ingester/head_queried_series_querier.go create mode 100644 pkg/ingester/head_queried_series_querier_test.go create mode 100644 pkg/ingester/queried_metric_tracker.go create mode 100644 pkg/ingester/queried_metric_tracker_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 447a8b092a1..5f852aebffd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446 * [CHANGE] HA Tracker: Move `-distributor.ha-tracker.failover-timeout` from a global config to a per-tenant runtime config. The flag name and default value (30s) remain the same. #7481 * [FEATURE] Ingester: Add experimental active series tracker that counts active series by configurable label matchers (including regex) per tenant and exposes `cortex_ingester_active_series_per_tracker` metric. Configured via `active_series_trackers` in runtime config overrides. #7476 +* [FEATURE] Ingester: Add experimental head-only queried series metrics. `cortex_ingester_queried_head_series` tracks unique series queried from head via HLL, and `cortex_ingester_queried_metric_head_series` reports current head cardinality for recently queried metric names. Enabled via `-ingester.head-queried-series-metrics-enabled`. #7500 * [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302 * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 6141667b42c..a4a09592db2 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3785,6 +3785,24 @@ lifecycler: # CLI flag: -ingester.active-queried-series-metrics-windows [active_queried_series_metrics_windows: | default = 2h0m0s] +# Experimental: Enable tracking of series queried from head only and expose them +# as metrics. +# CLI flag: -ingester.head-queried-series-metrics-enabled +[head_queried_series_metrics_enabled: | default = false] + +# Duration of each sub-window for head queried series tracking. +# CLI flag: -ingester.head-queried-series-metrics-window-duration +[head_queried_series_metrics_window_duration: | default = 15m] + +# Sampling rate for head queried series tracking (1.0 = 100%%). +# CLI flag: -ingester.head-queried-series-metrics-sample-rate +[head_queried_series_metrics_sample_rate: | default = 1] + +# Time windows to expose head queried series metrics. Also controls how long +# per-metric-name cardinality is reported after last query. +# CLI flag: -ingester.head-queried-series-metrics-windows +[head_queried_series_metrics_windows: | default = 2h0m0s] + # Enable uploading compacted blocks. # CLI flag: -ingester.upload-compacted-blocks-enabled [upload_compacted_blocks_enabled: | default = true] diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index e52c65a8a5b..fa8600bff02 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -133,3 +133,10 @@ Currently experimental features are: - Ingester: Active Series Tracker - Per-tenant `active_series_trackers` configuration in runtime config overrides - Counts active series matching PromQL label matchers and exposes `cortex_ingester_active_series_per_tracker` metric +- Ingester: Head Queried Series Metrics + - Enable on Ingester via `-ingester.head-queried-series-metrics-enabled=true` + - Tracks unique series queried from head only (not blocks) using HLL + - Tracks per-metric-name head cardinality for recently queried metrics + - `-ingester.head-queried-series-metrics-windows` time windows to report (default: 2h) + - `-ingester.head-queried-series-metrics-window-duration` HLL sub-window size + - `-ingester.head-queried-series-metrics-sample-rate` query sampling rate diff --git a/pkg/ingester/head_queried_series_querier.go b/pkg/ingester/head_queried_series_querier.go new file mode 100644 index 00000000000..9d48d3ca303 --- /dev/null +++ b/pkg/ingester/head_queried_series_querier.go @@ -0,0 +1,95 @@ +package ingester + +import ( + "context" + "time" + + "github.com/oklog/ulid/v2" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" +) + +var ( + rangeHeadULID = ulid.MustParse("0000000000XXXXXXXRANGEHEAD") + headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD") +) + +// isHead returns true if the given BlockReader is a head block (in-order or OOO). +func isHead(b tsdb.BlockReader) bool { + id := b.Meta().ULID + return id == rangeHeadULID || id == headULID +} + +// headQueriedSeriesChunkQuerier wraps a ChunkQuerier for the head block and +// intercepts Select calls to collect series hashes (for HLL) and record +// queried metric names. +type headQueriedSeriesChunkQuerier struct { + storage.ChunkQuerier + headQueriedSeries *ActiveQueriedSeries + activeQueriedSeriesService *ActiveQueriedSeriesService + queriedMetricTrackers []*QueriedMetricTracker + userID string + sampled bool +} + +func (q *headQueriedSeriesChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { + ss := q.ChunkQuerier.Select(ctx, sortSeries, hints, matchers...) + + // Record metric name for per-metric cardinality tracking (always, regardless of sampling). + if len(q.queriedMetricTrackers) > 0 { + for _, m := range matchers { + if m.Name == labels.MetricName && m.Type == labels.MatchEqual { + now := time.Now() + for _, tracker := range q.queriedMetricTrackers { + tracker.MarkQueried(m.Value, now) + } + break + } + } + } + + // Wrap series set for hash collection only if sampled. + if !q.sampled { + return ss + } + return &headQueriedSeriesSet{ + ChunkSeriesSet: ss, + headQueriedSeries: q.headQueriedSeries, + activeQueriedSeriesService: q.activeQueriedSeriesService, + userID: q.userID, + hashes: getQueriedSeriesHashesSlice(), + } +} + +// headQueriedSeriesSet wraps a ChunkSeriesSet to collect series label hashes +// during iteration and flush them to the HLL tracker when iteration completes. +type headQueriedSeriesSet struct { + storage.ChunkSeriesSet + headQueriedSeries *ActiveQueriedSeries + activeQueriedSeriesService *ActiveQueriedSeriesService + userID string + hashes []uint64 +} + +func (s *headQueriedSeriesSet) Next() bool { + if !s.ChunkSeriesSet.Next() { + s.flush() + return false + } + s.hashes = append(s.hashes, s.ChunkSeriesSet.At().Labels().Hash()) + return true +} + +func (s *headQueriedSeriesSet) At() storage.ChunkSeries { + return s.ChunkSeriesSet.At() +} + +func (s *headQueriedSeriesSet) flush() { + if len(s.hashes) > 0 && s.activeQueriedSeriesService != nil { + s.activeQueriedSeriesService.UpdateSeriesBatch(s.headQueriedSeries, s.hashes, time.Now(), s.userID) + } else if len(s.hashes) > 0 { + // If no service, return the slice to the pool. + putQueriedSeriesHashesSlice(s.hashes) + } +} diff --git a/pkg/ingester/head_queried_series_querier_test.go b/pkg/ingester/head_queried_series_querier_test.go new file mode 100644 index 00000000000..88649bd0199 --- /dev/null +++ b/pkg/ingester/head_queried_series_querier_test.go @@ -0,0 +1,212 @@ +package ingester + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/oklog/ulid/v2" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIsHead(t *testing.T) { + tests := []struct { + name string + ulid ulid.ULID + expected bool + }{ + {"rangeHead", rangeHeadULID, true}, + {"head", headULID, true}, + {"random block", ulid.MustNew(1, nil), false}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + b := &mockBlockReader{meta: tsdb.BlockMeta{ULID: tc.ulid}} + assert.Equal(t, tc.expected, isHead(b)) + }) + } +} + +func TestHeadQueriedSeriesChunkQuerier_RecordsMetricName(t *testing.T) { + tracker := NewQueriedMetricTracker(2 * time.Hour) + hll := NewActiveQueriedSeries( + []time.Duration{2 * time.Hour}, + 15*time.Minute, + 1.0, + nil, + ) + + wrapper := &headQueriedSeriesChunkQuerier{ + ChunkQuerier: &noopChunkQuerier{}, + headQueriedSeries: hll, + activeQueriedSeriesService: nil, + queriedMetricTrackers: []*QueriedMetricTracker{tracker}, + userID: "user-1", + sampled: false, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "http_requests_total"), + labels.MustNewMatcher(labels.MatchEqual, "job", "api"), + } + + wrapper.Select(context.Background(), false, nil, matchers...) + + now := time.Now() + metrics := tracker.GetActiveMetrics(now) + require.Len(t, metrics, 1) + assert.Equal(t, "http_requests_total", metrics[0]) +} + +func TestHeadQueriedSeriesChunkQuerier_SkipsRegexMetricName(t *testing.T) { + tracker := NewQueriedMetricTracker(2 * time.Hour) + + wrapper := &headQueriedSeriesChunkQuerier{ + ChunkQuerier: &noopChunkQuerier{}, + queriedMetricTrackers: []*QueriedMetricTracker{tracker}, + userID: "user-1", + sampled: false, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, "http_.*"), + } + + wrapper.Select(context.Background(), false, nil, matchers...) + + metrics := tracker.GetActiveMetrics(time.Now()) + assert.Empty(t, metrics) +} + +func TestHeadQueriedSeriesSet_CollectsHashes(t *testing.T) { + lbls1 := labels.FromStrings("__name__", "metric_a", "job", "foo") + lbls2 := labels.FromStrings("__name__", "metric_a", "job", "bar") + + inner := &mockChunkSeriesSet{ + series: []storage.ChunkSeries{ + &mockChunkSeries{lbls: lbls1}, + &mockChunkSeries{lbls: lbls2}, + }, + } + + hll := NewActiveQueriedSeries( + []time.Duration{2 * time.Hour}, + 15*time.Minute, + 1.0, + nil, + ) + + // Use a real service to process the hashes. + svc := NewActiveQueriedSeriesService(log.NewNopLogger(), nil) + require.NoError(t, svc.StartAsync(context.Background())) + defer svc.StopAsync() + require.NoError(t, svc.AwaitRunning(context.Background())) + + tracker := NewQueriedMetricTracker(2 * time.Hour) + wrapper := &headQueriedSeriesChunkQuerier{ + ChunkQuerier: &mockChunkQuerierWithSeries{inner: inner}, + headQueriedSeries: hll, + activeQueriedSeriesService: svc, + queriedMetricTrackers: []*QueriedMetricTracker{tracker}, + userID: "user-1", + sampled: true, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "metric_a"), + } + + ss := wrapper.Select(context.Background(), false, nil, matchers...) + count := 0 + for ss.Next() { + count++ + } + assert.Equal(t, 2, count) + + // Give the async worker time to process. + time.Sleep(50 * time.Millisecond) + + estimate, err := hll.GetSeriesQueried(time.Now(), 2*time.Hour) + require.NoError(t, err) + assert.Equal(t, uint64(2), estimate) + + // Also verify metric name was recorded. + metrics := tracker.GetActiveMetrics(time.Now()) + require.Len(t, metrics, 1) + assert.Equal(t, "metric_a", metrics[0]) +} + +// Mock implementations + +type mockBlockReader struct { + meta tsdb.BlockMeta +} + +func (m *mockBlockReader) Meta() tsdb.BlockMeta { return m.meta } +func (m *mockBlockReader) Index() (tsdb.IndexReader, error) { return nil, nil } +func (m *mockBlockReader) Chunks() (tsdb.ChunkReader, error) { return nil, nil } +func (m *mockBlockReader) Tombstones() (tombstones.Reader, error) { return nil, nil } +func (m *mockBlockReader) Size() int64 { return 0 } +func (m *mockBlockReader) String() string { return "" } +func (m *mockBlockReader) MinTime() int64 { return 0 } +func (m *mockBlockReader) MaxTime() int64 { return 0 } + +type noopChunkQuerier struct{} + +func (q *noopChunkQuerier) Select(_ context.Context, _ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.ChunkSeriesSet { + return storage.EmptyChunkSeriesSet() +} +func (q *noopChunkQuerier) LabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} +func (q *noopChunkQuerier) LabelNames(_ context.Context, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} +func (q *noopChunkQuerier) Close() error { return nil } + +type mockChunkQuerierWithSeries struct { + inner *mockChunkSeriesSet +} + +func (q *mockChunkQuerierWithSeries) Select(_ context.Context, _ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.ChunkSeriesSet { + return q.inner +} +func (q *mockChunkQuerierWithSeries) LabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} +func (q *mockChunkQuerierWithSeries) LabelNames(_ context.Context, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} +func (q *mockChunkQuerierWithSeries) Close() error { return nil } + +type mockChunkSeriesSet struct { + series []storage.ChunkSeries + idx int +} + +func (m *mockChunkSeriesSet) Next() bool { + if m.idx >= len(m.series) { + return false + } + m.idx++ + return true +} +func (m *mockChunkSeriesSet) At() storage.ChunkSeries { return m.series[m.idx-1] } +func (m *mockChunkSeriesSet) Err() error { return nil } +func (m *mockChunkSeriesSet) Warnings() annotations.Annotations { return nil } + +type mockChunkSeries struct { + lbls labels.Labels +} + +func (m *mockChunkSeries) Labels() labels.Labels { return m.lbls } +func (m *mockChunkSeries) Iterator(_ chunks.Iterator) chunks.Iterator { return nil } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 6116a318992..aac6bc7715f 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -136,6 +136,11 @@ type Config struct { ActiveQueriedSeriesMetricsSampleRate float64 `yaml:"active_queried_series_metrics_sample_rate"` ActiveQueriedSeriesMetricsWindows cortex_tsdb.DurationList `yaml:"active_queried_series_metrics_windows"` + HeadQueriedSeriesMetricsEnabled bool `yaml:"head_queried_series_metrics_enabled"` + HeadQueriedSeriesMetricsWindowDuration time.Duration `yaml:"head_queried_series_metrics_window_duration"` + HeadQueriedSeriesMetricsSampleRate float64 `yaml:"head_queried_series_metrics_sample_rate"` + HeadQueriedSeriesMetricsWindows cortex_tsdb.DurationList `yaml:"head_queried_series_metrics_windows"` + // Use blocks storage. BlocksStorageConfig cortex_tsdb.BlocksStorageConfig `yaml:"-"` @@ -202,6 +207,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.ActiveQueriedSeriesMetricsWindows = cortex_tsdb.DurationList{2 * time.Hour} f.Var(&cfg.ActiveQueriedSeriesMetricsWindows, "ingester.active-queried-series-metrics-windows", "Time windows to expose queried series metric. Each window tracks queried series within that time period.") + f.BoolVar(&cfg.HeadQueriedSeriesMetricsEnabled, "ingester.head-queried-series-metrics-enabled", false, "Experimental: Enable tracking of series queried from head only and expose them as metrics.") + f.DurationVar(&cfg.HeadQueriedSeriesMetricsWindowDuration, "ingester.head-queried-series-metrics-window-duration", 15*time.Minute, "Duration of each sub-window for head queried series tracking.") + f.Float64Var(&cfg.HeadQueriedSeriesMetricsSampleRate, "ingester.head-queried-series-metrics-sample-rate", 1.0, "Sampling rate for head queried series tracking (1.0 = 100%%).") + cfg.HeadQueriedSeriesMetricsWindows = cortex_tsdb.DurationList{2 * time.Hour} + f.Var(&cfg.HeadQueriedSeriesMetricsWindows, "ingester.head-queried-series-metrics-windows", "Time windows to expose head queried series metrics. Also controls how long per-metric-name cardinality is reported after last query.") + f.BoolVar(&cfg.UploadCompactedBlocksEnabled, "ingester.upload-compacted-blocks-enabled", true, "Enable uploading compacted blocks.") f.StringVar(&cfg.IgnoreSeriesLimitForMetricNames, "ingester.ignore-series-limit-for-metric-names", "", "Comma-separated list of metric names, for which -ingester.max-series-per-metric and -ingester.max-global-series-per-metric limits will be ignored. Does not affect max-series-per-user or max-global-series-per-metric limits.") f.StringVar(&cfg.AdminLimitMessage, "ingester.admin-limit-message", "please contact administrator to raise it", "Customize the message contained in limit errors") @@ -363,13 +374,15 @@ func (r tsdbCloseCheckResult) shouldClose() bool { } type userTSDB struct { - db *tsdb.DB - userID string - activeSeries *ActiveSeries - activeQueriedSeries *ActiveQueriedSeries - seriesInMetric *metricCounter - labelSetCounter *labelSetCounter - limiter *Limiter + db *tsdb.DB + userID string + activeSeries *ActiveSeries + activeQueriedSeries *ActiveQueriedSeries + headQueriedSeries *ActiveQueriedSeries + queriedMetricTrackers []*QueriedMetricTracker + seriesInMetric *metricCounter + labelSetCounter *labelSetCounter + limiter *Limiter instanceSeriesCount *atomic.Int64 // Shared across all userTSDB instances created by ingester. instanceLimitsFn func() *InstanceLimits @@ -803,7 +816,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe matchersCache: storecache.NoopMatchersCache, } - if cfg.ActiveQueriedSeriesMetricsEnabled { + if cfg.ActiveQueriedSeriesMetricsEnabled || cfg.HeadQueriedSeriesMetricsEnabled { i.activeQueriedSeriesService = NewActiveQueriedSeriesService(logger, registerer) } @@ -820,6 +833,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe false, cfg.ActiveSeriesMetricsEnabled, cfg.ActiveQueriedSeriesMetricsEnabled, + cfg.HeadQueriedSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, &i.maxInflightPushRequests, @@ -918,6 +932,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe false, false, false, + false, i.getInstanceLimits, nil, &i.maxInflightPushRequests, @@ -1058,6 +1073,13 @@ func (i *Ingester) updateLoop(ctx context.Context) error { defer t.Stop() } + var headQueriedSeriesTickerChan <-chan time.Time + if i.cfg.HeadQueriedSeriesMetricsEnabled { + t := time.NewTicker(i.cfg.ActiveQueriedSeriesMetricsUpdatePeriod) + headQueriedSeriesTickerChan = t.C + defer t.Stop() + } + // Similarly to the above, this is a hardcoded value. metadataPurgeTicker := time.NewTicker(metadataPurgePeriod) defer metadataPurgeTicker.Stop() @@ -1086,6 +1108,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error { i.updateActiveSeries(ctx) case <-activeQueriedSeriesTickerChan: i.updateActiveQueriedSeries(ctx) + case <-headQueriedSeriesTickerChan: + i.updateHeadQueriedMetrics(ctx) case <-maxTrackerResetTicker.C: i.maxInflightQueryRequests.Tick() i.maxInflightPushRequests.Tick() @@ -1188,6 +1212,41 @@ func (i *Ingester) updateActiveQueriedSeries(ctx context.Context) { } } +func (i *Ingester) updateHeadQueriedMetrics(ctx context.Context) { + now := time.Now() + for _, userID := range i.getTSDBUsers() { + userDB, err := i.getTSDB(userID) + if err != nil || userDB == nil { + continue + } + + // Metric 1: total series queried from head (HLL) + if userDB.headQueriedSeries != nil { + userDB.headQueriedSeries.Purge(now) + for _, windowDuration := range i.cfg.HeadQueriedSeriesMetricsWindows { + estimatedCount, err := userDB.headQueriedSeries.GetSeriesQueried(now, windowDuration) + if err != nil { + level.Error(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to get head queried series count", "user", userID, "window", windowDuration, "err", err) + continue + } + i.metrics.headQueriedSeriesPerUser.WithLabelValues(userID, windowDuration.String()).Set(float64(estimatedCount)) + } + } + + // Metric 2: per metric name cardinality + if len(userDB.queriedMetricTrackers) > 0 { + for idx, tracker := range userDB.queriedMetricTrackers { + window := i.cfg.HeadQueriedSeriesMetricsWindows[idx] + activeMetrics := tracker.GetActiveMetrics(now) + for _, metricName := range activeMetrics { + count := userDB.seriesInMetric.getSeriesCountForMetric(metricName) + i.metrics.headQueriedMetricSeries.WithLabelValues(userID, metricName, window.String()).Set(float64(count)) + } + } + } + } +} + func (i *Ingester) updateLabelSetMetrics() { activeUserSet := make(map[string]map[uint64]struct{}) for _, userID := range i.getTSDBUsers() { @@ -2870,11 +2929,29 @@ func (i *Ingester) blockChunkQuerierFunc(userId string) tsdb.BlockChunkQuerierFu // This occurs because the tsdb.PostingsForMatchers function can return invalid data in such scenarios. // For more details, see: https://github.com/cortexproject/cortex/issues/6556 // TODO: alanprot: Consider removing this logic when prometheus is updated as this logic is "fixed" upstream. + var q storage.ChunkQuerier if postingCache == nil || mint > db.Head().MaxTime() { - return tsdb.NewBlockChunkQuerier(b, mint, maxt) + q, err = tsdb.NewBlockChunkQuerier(b, mint, maxt) + } else { + q, err = cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt) + } + if err != nil { + return nil, err } - return cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt) + // Wrap only for head queriers when head queried series metrics are enabled. + if i.cfg.HeadQueriedSeriesMetricsEnabled && isHead(b) && db != nil && db.headQueriedSeries != nil { + q = &headQueriedSeriesChunkQuerier{ + ChunkQuerier: q, + headQueriedSeries: db.headQueriedSeries, + activeQueriedSeriesService: i.activeQueriedSeriesService, + queriedMetricTrackers: db.queriedMetricTrackers, + userID: userId, + sampled: db.headQueriedSeries.SampleRequest(), + } + } + + return q, nil } } @@ -2901,15 +2978,31 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { ) } + var headQueriedSeries *ActiveQueriedSeries + var queriedMetricTrackers []*QueriedMetricTracker + if i.cfg.HeadQueriedSeriesMetricsEnabled { + headQueriedSeries = NewActiveQueriedSeries( + i.cfg.HeadQueriedSeriesMetricsWindows, + i.cfg.HeadQueriedSeriesMetricsWindowDuration, + i.cfg.HeadQueriedSeriesMetricsSampleRate, + i.logger, + ) + for _, w := range i.cfg.HeadQueriedSeriesMetricsWindows { + queriedMetricTrackers = append(queriedMetricTrackers, NewQueriedMetricTracker(w)) + } + } + userDB := &userTSDB{ - userID: userID, - activeSeries: NewActiveSeries(), - activeQueriedSeries: activeQueriedSeries, - seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), - labelSetCounter: newLabelSetCounter(i.limiter), - trackerCounter: newTrackerCounter(), - ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), - ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), + userID: userID, + activeSeries: NewActiveSeries(), + activeQueriedSeries: activeQueriedSeries, + headQueriedSeries: headQueriedSeries, + queriedMetricTrackers: queriedMetricTrackers, + seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), + labelSetCounter: newLabelSetCounter(i.limiter), + trackerCounter: newTrackerCounter(), + ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), + ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), instanceLimitsFn: i.getInstanceLimits, instanceSeriesCount: &i.TSDBState.seriesCount, diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 238c578e656..0aa292873dd 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -58,6 +58,8 @@ type ingesterMetrics struct { activeSeriesPerUser *prometheus.GaugeVec activeNHSeriesPerUser *prometheus.GaugeVec activeQueriedSeriesPerUser *prometheus.GaugeVec + headQueriedSeriesPerUser *prometheus.GaugeVec + headQueriedMetricSeries *prometheus.GaugeVec limitsPerLabelSet *prometheus.GaugeVec usagePerLabelSet *prometheus.GaugeVec activeSeriesPerTracker *prometheus.GaugeVec @@ -87,6 +89,7 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSDB bool, activeSeriesEnabled bool, activeQueriedSeriesEnabled bool, + headQueriedSeriesEnabled bool, instanceLimitsFn func() *InstanceLimits, ingestionRate *util_math.EwmaRate, inflightPushRequests *util_math.MaxTracker, @@ -309,6 +312,16 @@ func newIngesterMetrics(r prometheus.Registerer, Name: "cortex_ingester_active_queried_series", Help: "Estimated number of currently active queried series per user (probabilistic count using HyperLogLog).", }, []string{"user", "window"}), + + // Not registered automatically, but only if headQueriedSeriesEnabled is true. + headQueriedSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_queried_head_series", + Help: "Estimated number of unique series queried from head within the configured time window.", + }, []string{"user", "window"}), + headQueriedMetricSeries: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_queried_metric_head_series", + Help: "Current head cardinality for metric names queried within the configured time window.", + }, []string{"user", "metric_name", "window"}), } if regexMatcherLimitsEnabled { @@ -356,6 +369,11 @@ func newIngesterMetrics(r prometheus.Registerer, r.MustRegister(m.activeQueriedSeriesPerUser) } + if headQueriedSeriesEnabled && r != nil { + r.MustRegister(m.headQueriedSeriesPerUser) + r.MustRegister(m.headQueriedMetricSeries) + } + if createMetricsConflictingWithTSDB { m.memSeriesCreatedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: memSeriesCreatedTotalName, @@ -382,6 +400,8 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) { m.activeNHSeriesPerUser.DeleteLabelValues(userID) m.activeSeriesPerTracker.DeletePartialMatch(prometheus.Labels{"user": userID}) m.activeQueriedSeriesPerUser.DeletePartialMatch(prometheus.Labels{"user": userID}) + m.headQueriedSeriesPerUser.DeletePartialMatch(prometheus.Labels{"user": userID}) + m.headQueriedMetricSeries.DeletePartialMatch(prometheus.Labels{"user": userID}) m.usagePerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.pushErrorsTotal.DeletePartialMatch(prometheus.Labels{"user": userID}) diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index 30d0c4cd5cd..378ada9cdfc 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -20,7 +20,7 @@ func TestRegexMatcherLimitsMetricsFeatureFlag(t *testing.T) { // Test with feature flag disabled - metrics should be nil t.Run("metrics are nil when feature flag is disabled", func(t *testing.T) { reg := prometheus.NewRegistry() - m := newIngesterMetrics(reg, false, false, false, + m := newIngesterMetrics(reg, false, false, false, false, func() *InstanceLimits { return &InstanceLimits{} }, ingestionRate, &inflightPushRequests, &maxInflightQueryRequests, false, false) @@ -33,7 +33,7 @@ func TestRegexMatcherLimitsMetricsFeatureFlag(t *testing.T) { // Test with feature flag enabled - metrics should be initialized t.Run("metrics are initialized when feature flag is enabled", func(t *testing.T) { reg := prometheus.NewRegistry() - m := newIngesterMetrics(reg, false, false, false, + m := newIngesterMetrics(reg, false, false, false, false, func() *InstanceLimits { return &InstanceLimits{} }, ingestionRate, &inflightPushRequests, &maxInflightQueryRequests, false, true) @@ -51,7 +51,7 @@ func TestUnoptimizedRegexRejectedMetric(t *testing.T) { t.Run("rejected metric increments correctly", func(t *testing.T) { reg := prometheus.NewRegistry() - m := newIngesterMetrics(reg, false, false, false, + m := newIngesterMetrics(reg, false, false, false, false, func() *InstanceLimits { return &InstanceLimits{} }, ingestionRate, &inflightPushRequests, &maxInflightQueryRequests, false, true) @@ -75,7 +75,7 @@ func TestUnoptimizedRegexRejectedMetric(t *testing.T) { t.Run("metric cleanup works correctly", func(t *testing.T) { reg := prometheus.NewRegistry() - m := newIngesterMetrics(reg, false, false, false, + m := newIngesterMetrics(reg, false, false, false, false, func() *InstanceLimits { return &InstanceLimits{} }, ingestionRate, &inflightPushRequests, &maxInflightQueryRequests, false, true) @@ -110,6 +110,7 @@ func TestIngesterMetrics(t *testing.T) { false, true, false, + false, func() *InstanceLimits { return &InstanceLimits{ MaxIngestionRate: 12, diff --git a/pkg/ingester/queried_metric_tracker.go b/pkg/ingester/queried_metric_tracker.go new file mode 100644 index 00000000000..529f40f7231 --- /dev/null +++ b/pkg/ingester/queried_metric_tracker.go @@ -0,0 +1,46 @@ +package ingester + +import ( + "sync" + "time" +) + +// QueriedMetricTracker tracks which metric names have been queried from the head +// within a configurable time window. On periodic collection, active entries are +// used to look up current head cardinality via seriesInMetric. +type QueriedMetricTracker struct { + mu sync.Mutex + entries map[string]time.Time // metric_name -> last queried time + window time.Duration +} + +// NewQueriedMetricTracker creates a new tracker with the given expiry window. +func NewQueriedMetricTracker(window time.Duration) *QueriedMetricTracker { + return &QueriedMetricTracker{ + entries: make(map[string]time.Time), + window: window, + } +} + +// MarkQueried records that a metric name was queried. Called on the query path. +// Repeated calls for the same metric name just update the timestamp. +func (t *QueriedMetricTracker) MarkQueried(metricName string, now time.Time) { + t.mu.Lock() + t.entries[metricName] = now + t.mu.Unlock() +} + +// GetActiveMetrics returns metric names queried within the window and purges expired entries. +func (t *QueriedMetricTracker) GetActiveMetrics(now time.Time) []string { + t.mu.Lock() + defer t.mu.Unlock() + var result []string + for name, lastQueried := range t.entries { + if now.Sub(lastQueried) <= t.window { + result = append(result, name) + } else { + delete(t.entries, name) + } + } + return result +} diff --git a/pkg/ingester/queried_metric_tracker_test.go b/pkg/ingester/queried_metric_tracker_test.go new file mode 100644 index 00000000000..6dbfd841bc7 --- /dev/null +++ b/pkg/ingester/queried_metric_tracker_test.go @@ -0,0 +1,79 @@ +package ingester + +import ( + "sort" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestQueriedMetricTracker_MarkQueried(t *testing.T) { + tracker := NewQueriedMetricTracker(2 * time.Hour) + now := time.Now() + + tracker.MarkQueried("http_requests_total", now) + tracker.MarkQueried("node_cpu_seconds_total", now) + + metrics := tracker.GetActiveMetrics(now) + sort.Strings(metrics) + assert.Equal(t, []string{"http_requests_total", "node_cpu_seconds_total"}, metrics) +} + +func TestQueriedMetricTracker_OverwritesTimestamp(t *testing.T) { + tracker := NewQueriedMetricTracker(2 * time.Hour) + now := time.Now() + + tracker.MarkQueried("http_requests_total", now.Add(-90*time.Minute)) + tracker.MarkQueried("http_requests_total", now) + + // Should still be active since last query was at now + metrics := tracker.GetActiveMetrics(now.Add(90 * time.Minute)) + assert.Equal(t, []string{"http_requests_total"}, metrics) +} + +func TestQueriedMetricTracker_ExpiresEntries(t *testing.T) { + tracker := NewQueriedMetricTracker(2 * time.Hour) + now := time.Now() + + tracker.MarkQueried("http_requests_total", now) + tracker.MarkQueried("old_metric", now.Add(-3*time.Hour)) + + metrics := tracker.GetActiveMetrics(now) + assert.Equal(t, []string{"http_requests_total"}, metrics) + + // Verify expired entry was purged + metrics = tracker.GetActiveMetrics(now) + assert.Equal(t, []string{"http_requests_total"}, metrics) +} + +func TestQueriedMetricTracker_EmptyAfterExpiry(t *testing.T) { + tracker := NewQueriedMetricTracker(1 * time.Minute) + now := time.Now() + + tracker.MarkQueried("http_requests_total", now) + + metrics := tracker.GetActiveMetrics(now.Add(2 * time.Minute)) + assert.Empty(t, metrics) +} + +func TestQueriedMetricTracker_ConcurrentAccess(t *testing.T) { + tracker := NewQueriedMetricTracker(2 * time.Hour) + now := time.Now() + + var wg sync.WaitGroup + for i := range 100 { + wg.Add(1) + go func(i int) { + defer wg.Done() + tracker.MarkQueried("metric", now) + }(i) + } + wg.Wait() + + metrics := tracker.GetActiveMetrics(now) + require.Len(t, metrics, 1) + assert.Equal(t, "metric", metrics[0]) +} diff --git a/pkg/ingester/user_metrics_metadata_test.go b/pkg/ingester/user_metrics_metadata_test.go index fa70e090c31..0946e4869c7 100644 --- a/pkg/ingester/user_metrics_metadata_test.go +++ b/pkg/ingester/user_metrics_metadata_test.go @@ -26,6 +26,7 @@ func Test_UserMetricsMetadata(t *testing.T) { false, false, false, + false, func() *InstanceLimits { return &InstanceLimits{} }, diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 2918c8993aa..3a257813022 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -86,6 +86,13 @@ func (m *metricCounter) increaseSeriesForMetric(metric string) { shard.mtx.Unlock() } +func (m *metricCounter) getSeriesCountForMetric(metric string) int { + shard := m.getShard(metric) + shard.mtx.Lock() + defer shard.mtx.Unlock() + return shard.m[metric] +} + type labelSetCounterEntry struct { count int labels labels.Labels diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 915c689d158..385a48f2e09 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -4620,6 +4620,34 @@ "type": "boolean", "x-cli-flag": "ingester.enable-regex-matcher-limits" }, + "head_queried_series_metrics_enabled": { + "default": false, + "description": "Experimental: Enable tracking of series queried from head only and expose them as metrics.", + "type": "boolean", + "x-cli-flag": "ingester.head-queried-series-metrics-enabled" + }, + "head_queried_series_metrics_sample_rate": { + "default": 1, + "description": "Sampling rate for head queried series tracking (1.0 = 100%%).", + "type": "number", + "x-cli-flag": "ingester.head-queried-series-metrics-sample-rate" + }, + "head_queried_series_metrics_window_duration": { + "default": "15m0s", + "description": "Duration of each sub-window for head queried series tracking.", + "type": "string", + "x-cli-flag": "ingester.head-queried-series-metrics-window-duration", + "x-format": "duration" + }, + "head_queried_series_metrics_windows": { + "default": "2h0m0s", + "description": "Time windows to expose head queried series metrics. Also controls how long per-metric-name cardinality is reported after last query.", + "items": { + "type": "string" + }, + "type": "array", + "x-cli-flag": "ingester.head-queried-series-metrics-windows" + }, "ignore_series_limit_for_metric_names": { "description": "Comma-separated list of metric names, for which -ingester.max-series-per-metric and -ingester.max-global-series-per-metric limits will be ignored. Does not affect max-series-per-user or max-global-series-per-metric limits.", "type": "string",