Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,11 +471,19 @@ func buildQueryFunc(
failedQueries := metrics.FailedQueriesVec.WithLabelValues(userID)
metricsFunc := metricsQueryFunc(baseQueryFunc, totalQueries, failedQueries)

var qf rules.QueryFunc
// apply statistic middleware
if cfg.EnableQueryStats {
return recordAndReportRuleQueryMetrics(metricsFunc, userID, metrics, logger)
qf = recordAndReportRuleQueryMetrics(metricsFunc, userID, metrics, logger)
} else {
qf = metricsFunc
}

// apply select merger wrapper (checks context for prefetch cache)
if cfg.SelectMergerEnabled {
qf = selectMergerQueryFunc(qf)
}
return metricsFunc
return qf
}

type QueryableError struct {
Expand Down
16 changes: 16 additions & 0 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory
logger: logger,
ruleGroupIterationFunc: defaultRuleGroupIterationFunc,
}
if cfg.SelectMergerEnabled {
m.ruleGroupIterationFunc = mergedSelectIterationFunc(cfg.SelectMergerMinRules)
}
if cfg.RulesBackupEnabled() {
m.rulesBackupManager = newRulesBackupManager(cfg, logger, reg)
}
Expand Down Expand Up @@ -292,6 +295,19 @@ func defaultRuleGroupIterationFunc(ctx context.Context, g *promRules.Group, eval
promRules.DefaultEvalIterationFunc(ctx, g, evalTimestamp)
}

// mergedSelectIterationFunc returns a GroupEvalIterationFunc that pre-fetches
// merged selectors before evaluating the group, injecting the cache into
// context so the selectMergerQueryFunc wrapper can serve from it.
func mergedSelectIterationFunc(minRules int) promRules.GroupEvalIterationFunc {
return func(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {
plan := planMergedSelects(g.Rules(), minRules)
if len(plan) > 0 {
ctx = withSelectMergerPlan(ctx, plan)
}
defaultRuleGroupIterationFunc(ctx, g, evalTimestamp)
}
}

// newManager creates a prometheus rule manager wrapped with a user id
// configured storage, appendable, notifier, and instrumentation
func (r *DefaultMultiTenantManager) newManager(ctx context.Context, userID string) (RulesManager, error) {
Expand Down
162 changes: 162 additions & 0 deletions pkg/ruler/prefetch_queryable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package ruler

import (
"context"
"sync"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/rules"
)

type selectMergerCtxKey struct{}

// selectMergerState is injected into context by the iteration func.
// The QueryFunc wrapper lazily executes the prefetch on first access.
type selectMergerState struct {
plan []mergedSelect
once sync.Once
cache *prefetchCache
}

func withSelectMergerPlan(ctx context.Context, plan []mergedSelect) context.Context {
return context.WithValue(ctx, selectMergerCtxKey{}, &selectMergerState{plan: plan})
}

// selectMergerQueryFunc wraps a QueryFunc to check context for a merge plan.
// On first call, it lazily pre-fetches using the inner QueryFunc, then serves from cache.
func selectMergerQueryFunc(inner rules.QueryFunc) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
state, _ := ctx.Value(selectMergerCtxKey{}).(*selectMergerState)
if state == nil {
return inner(ctx, qs, t)
}

// Lazy prefetch: execute plan on first call.
state.once.Do(func() {
state.cache = executePrefetch(ctx, state.plan, inner, t)
})

if state.cache != nil {
selectors := extractSelectorsFromExpr(qs)
if len(selectors) == 1 {
if vec, ok := state.cache.get(selectors[0]); ok {
return vec, nil
}
}
}
return inner(ctx, qs, t)
}
}

// prefetchEntry holds pre-fetched results for a merged selector.
type prefetchEntry struct {
matchers []*labels.Matcher
vector promql.Vector
}

// prefetchCache holds all pre-fetched data for a single group evaluation.
type prefetchCache struct {
entries []prefetchEntry
}

func (c *prefetchCache) get(queryMatchers []*labels.Matcher) (promql.Vector, bool) {
for _, e := range c.entries {
if isMatcherSetSuperset(e.matchers, queryMatchers) {
extra := extraMatchers(e.matchers, queryMatchers)
if len(extra) == 0 {
return e.vector, true
}
return filterVector(e.vector, extra), true
}
}
return nil, false
}

func filterVector(vec promql.Vector, filters []*labels.Matcher) promql.Vector {
var result promql.Vector
for _, s := range vec {
if matchesAll(s.Metric, filters) {
result = append(result, s)
}
}
return result
}

// executePrefetch runs the merged selectors via QueryFunc and populates a cache.
// Called without cache in context, so inner falls through to the real query.
func executePrefetch(ctx context.Context, plan []mergedSelect, queryFunc rules.QueryFunc, ts time.Time) *prefetchCache {
// Remove the state from context to prevent recursion during prefetch.
ctx = context.WithValue(ctx, selectMergerCtxKey{}, (*selectMergerState)(nil))
cache := &prefetchCache{}
for _, ms := range plan {
vec, err := queryFunc(ctx, ms.prefetchExpr, ts)
if err != nil {
continue
}
cache.entries = append(cache.entries, prefetchEntry{
matchers: ms.mergedMatchers,
vector: vec,
})
}
return cache
}

func isMatcherSetSuperset(superMatchers, subMatchers []*labels.Matcher) bool {
for _, sup := range superMatchers {
found := false
for _, sub := range subMatchers {
if sub.Name == sup.Name {
found = true
if !isMatcherSuperset(sup, sub) {
return false
}
break
}
}
if !found {
return false
}
}
return true
}

func extraMatchers(entryMatchers, queryMatchers []*labels.Matcher) []*labels.Matcher {
var result []*labels.Matcher
for _, qm := range queryMatchers {
isExtra := true
for _, em := range entryMatchers {
if em.Name == qm.Name && em.Type == qm.Type && em.Value == qm.Value {
isExtra = false
break
}
}
if isExtra {
result = append(result, qm)
}
}
return result
}

func matchesAll(lset labels.Labels, matchers []*labels.Matcher) bool {
for _, m := range matchers {
if !m.Matches(lset.Get(m.Name)) {
return false
}
}
return true
}

func extractSelectorsFromExpr(qs string) [][]*labels.Matcher {
expr, err := parser.ParseExpr(qs)
if err != nil {
return nil
}
var result [][]*labels.Matcher
extractSelectors(expr, func(vs *parser.VectorSelector) {
result = append(result, vs.LabelMatchers)
})
return result
}
169 changes: 169 additions & 0 deletions pkg/ruler/prefetch_queryable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package ruler

import (
"context"
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPrefetchCache_FindSuperset(t *testing.T) {
cache := &prefetchCache{
entries: []prefetchEntry{
{
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"),
labels.MustNewMatcher(labels.MatchRegexp, "job", ".*"),
},
vector: promql.Vector{
{Metric: labels.FromStrings("__name__", "http", "job", "api"), T: 1000, F: 1.0},
{Metric: labels.FromStrings("__name__", "http", "job", "web"), T: 1000, F: 2.0},
},
},
},
}

queryMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"),
labels.MustNewMatcher(labels.MatchEqual, "job", "api"),
}

vec, ok := cache.get(queryMatchers)
require.True(t, ok)
assert.Len(t, vec, 1)
assert.Equal(t, "api", vec[0].Metric.Get("job"))
assert.Equal(t, 1.0, vec[0].F)
}

func TestIsMatcherSetSuperset(t *testing.T) {
tests := []struct {
name string
super []*labels.Matcher
sub []*labels.Matcher
want bool
}{
{
name: "identical sets",
super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
want: true,
},
{
name: "regex superset of equal",
super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", ".*")},
sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
want: true,
},
{
name: "super has extra label — more restrictive, not superset",
super: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "job", "api"),
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
},
sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
want: false,
},
{
name: "sub has extra label — super is broader",
super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
sub: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "job", "api"),
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
},
want: true,
},
{
name: "different values — not superset",
super: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "web")},
sub: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "api")},
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, isMatcherSetSuperset(tt.super, tt.sub))
})
}
}

func TestSelectMergerQueryFunc(t *testing.T) {
allSeries := promql.Vector{
{Metric: labels.FromStrings("__name__", "http_requests", "job", "api"), T: 1000, F: 10.0},
{Metric: labels.FromStrings("__name__", "http_requests", "job", "web"), T: 1000, F: 20.0},
}

innerCalled := 0
inner := func(_ context.Context, qs string, _ time.Time) (promql.Vector, error) {
innerCalled++
return allSeries, nil
}

qf := selectMergerQueryFunc(inner)

// Without plan in context — falls through to inner.
vec, err := qf(context.Background(), `http_requests{job="api"}`, time.Unix(1, 0))
require.NoError(t, err)
assert.Equal(t, 1, innerCalled)
assert.Len(t, vec, 2) // inner returns all

// With plan in context — lazy prefetch then serve from cache.
innerCalled = 0
plan := []mergedSelect{
{
metricName: "http_requests",
mergedMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http_requests"),
labels.MustNewMatcher(labels.MatchRegexp, "job", ".*"),
},
prefetchExpr: `http_requests{job=~".*"}`,
},
}
ctx := withSelectMergerPlan(context.Background(), plan)

// First call triggers prefetch (1 inner call), then serves filtered result.
vec, err = qf(ctx, `http_requests{job="api"}`, time.Unix(1, 0))
require.NoError(t, err)
assert.Equal(t, 1, innerCalled) // prefetch call
assert.Len(t, vec, 1)
assert.Equal(t, "api", vec[0].Metric.Get("job"))

// Second call — served from cache, no additional inner call.
vec, err = qf(ctx, `http_requests{job="web"}`, time.Unix(1, 0))
require.NoError(t, err)
assert.Equal(t, 1, innerCalled) // still 1
assert.Len(t, vec, 1)
assert.Equal(t, "web", vec[0].Metric.Get("job"))

// Query not in cache — falls through to inner.
vec, err = qf(ctx, `other_metric{job="api"}`, time.Unix(1, 0))
require.NoError(t, err)
assert.Equal(t, 2, innerCalled)
}

func TestExecutePrefetch(t *testing.T) {
queryFunc := func(_ context.Context, _ string, ts time.Time) (promql.Vector, error) {
return promql.Vector{
{Metric: labels.FromStrings("__name__", "http", "job", "api"), T: ts.UnixMilli(), F: 5.0},
}, nil
}

plan := []mergedSelect{
{
metricName: "http",
mergedMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "__name__", "http"),
},
prefetchExpr: `http{job=~".*"}`,
},
}

cache := executePrefetch(context.Background(), plan, rules.QueryFunc(queryFunc), time.Unix(1, 0))
require.Len(t, cache.entries, 1)
assert.Len(t, cache.entries[0].vector, 1)
}
Loading
Loading