Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
* [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246
* [ENHANCEMENT] Distributor: Validate metric name before removing empty labels. #7253
* [ENHANCEMENT] Ruler/Ingester: Propagate append hints to discard out of order samples on Ingester #7226
* [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
Expand Down
241 changes: 146 additions & 95 deletions pkg/cortexpb/cortex.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/cortexpb/cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ message WriteRequest {

bool skip_label_name_validation = 1000; //set intentionally high to keep WriteRequest compatible with upstream Prometheus
MessageWithBufRef Ref = 1001 [(gogoproto.embed) = true, (gogoproto.customtype) = "MessageWithBufRef", (gogoproto.nullable) = false];
// When true, indicates that out-of-order samples should be discarded even if OOO is enabled.
bool discard_out_of_order = 1002;
}

// refer to https://github.com/prometheus/prometheus/blob/v3.5.0/prompb/io/prometheus/write/v2/types.proto
Expand Down
1 change: 1 addition & 0 deletions pkg/cortexpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func ReuseWriteRequest(req *PreallocWriteRequest) {
req.Source = 0
req.Metadata = nil
req.Timeseries = nil
req.DiscardOutOfOrder = false
writeRequestPool.Put(req)
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, s
}
}

return d.send(localCtx, ingester, timeseries, metadata, req.Source)
return d.send(localCtx, ingester, timeseries, metadata, req.Source, req.DiscardOutOfOrder)
}, func() {
cortexpb.ReuseSlice(req.Timeseries)
req.Free()
Expand Down Expand Up @@ -1230,7 +1230,7 @@ func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {
})
}

func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.SourceEnum) error {
func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.SourceEnum, discardOutOfOrder bool) error {
Comment thread
Shvejan marked this conversation as resolved.
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return err
Expand All @@ -1248,16 +1248,18 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time

if d.cfg.UseStreamPush {
req := &cortexpb.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
Source: source,
Timeseries: timeseries,
Metadata: metadata,
Source: source,
DiscardOutOfOrder: discardOutOfOrder,
}
_, err = c.PushStreamConnection(ctx, req)
} else {
req := cortexpb.PreallocWriteRequestFromPool()
req.Timeseries = timeseries
req.Metadata = metadata
req.Source = source
req.DiscardOutOfOrder = discardOutOfOrder

_, err = c.PushPreAlloc(ctx, req)

Expand Down
92 changes: 84 additions & 8 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,78 @@ func TestDistributor_Push(t *testing.T) {
}
}

func TestDistributor_Push_DiscardOutOfOrder(t *testing.T) {
t.Parallel()

ctx := user.InjectOrgID(context.Background(), "userDiscardOOO")

tests := []struct {
name string
discardOutOfOrder bool
expectedDiscardOOO bool
useStreamPush bool
}{
{
name: "DiscardOutOfOrder=true with regular push",
discardOutOfOrder: true,
expectedDiscardOOO: true,
useStreamPush: false,
},
{
name: "DiscardOutOfOrder=false with regular push",
discardOutOfOrder: false,
expectedDiscardOOO: false,
useStreamPush: false,
},
{
name: "DiscardOutOfOrder=true with stream push",
discardOutOfOrder: true,
expectedDiscardOOO: true,
useStreamPush: true,
},
{
name: "DiscardOutOfOrder=false with stream push",
discardOutOfOrder: false,
expectedDiscardOOO: false,
useStreamPush: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

limits := &validation.Limits{}
flagext.DefaultValues(limits)

ds, ingesters, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
shardByAllLabels: true,
limits: limits,
useStreamPush: tc.useStreamPush,
})

request := makeWriteRequest(123456789000, 5, 0, 0)
request.DiscardOutOfOrder = tc.discardOutOfOrder

_, err := ds[0].Push(ctx, request)
require.NoError(t, err)

// Verify all ingesters received the correct DiscardOutOfOrder flag
for _, ing := range ingesters {
ing.Lock()
lastDiscardOOO := ing.lastDiscardOutOfOrder
ing.Unlock()

assert.Equal(t, tc.expectedDiscardOOO, lastDiscardOOO,
"ingester should have received DiscardOutOfOrder=%v", tc.expectedDiscardOOO)
}
})
}
}

func TestDistributor_MetricsCleanup(t *testing.T) {
t.Parallel()
dists, _, regs, r := prepare(t, prepConfig{
Expand Down Expand Up @@ -3547,14 +3619,15 @@ type mockIngester struct {
sync.Mutex
client.IngesterClient
grpc_health_v1.HealthClient
happy atomic.Bool
failResp atomic.Error
stats client.UsersStatsResponse
timeseries map[uint32]*cortexpb.PreallocTimeseries
metadata map[uint32]map[cortexpb.MetricMetadata]struct{}
queryDelay time.Duration
calls map[string]int
lblsValues []string
happy atomic.Bool
failResp atomic.Error
stats client.UsersStatsResponse
timeseries map[uint32]*cortexpb.PreallocTimeseries
metadata map[uint32]map[cortexpb.MetricMetadata]struct{}
queryDelay time.Duration
calls map[string]int
lblsValues []string
lastDiscardOutOfOrder bool
}

func newMockIngester(id int, ps *prepState, cfg prepConfig) *mockIngester {
Expand Down Expand Up @@ -3625,6 +3698,9 @@ func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opt

i.trackCall("Push")

// Store the DiscardOutOfOrder flag for test assertions
i.lastDiscardOutOfOrder = req.DiscardOutOfOrder

if !i.happy.Load() {
return nil, i.failResp.Load()
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

// Walk the samples, appending them to the users database
app := db.Appender(ctx).(extendedAppender)

// Even when OOO is enabled globally, we want to reject OOO samples in some cases.
// prometheus implementation: https://github.com/prometheus/prometheus/pull/14710
if req.DiscardOutOfOrder {
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
}

var newSeries []labels.Labels

for _, ts := range req.Timeseries {
Expand Down
80 changes: 80 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7863,3 +7863,83 @@ func TestIngester_checkRegexMatcherLimits(t *testing.T) {
})
}
}
func TestIngester_DiscardOutOfOrderFlagIntegration(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

limits := defaultLimitsTestConfig()
limits.EnableNativeHistograms = true
limits.OutOfOrderTimeWindow = model.Duration(60 * time.Minute)

i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, time.Second, ring.ACTIVE, func() any {
return i.lifecycler.GetState()
})

ctx := user.InjectOrgID(context.Background(), "test-user")

// Create labels for our test metric
metricLabels := labels.FromStrings(labels.MetricName, "test_metric", "job", "test")

currentTime := time.Now().UnixMilli()
olderTime := currentTime - 60000 // 1 minute earlier (within OOO window)

// First, push a sample with current timestamp with discardOutOfOrder=true
req1 := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 100, TimestampMs: currentTime}},
nil, nil, cortexpb.RULE)
req1.DiscardOutOfOrder = true

_, err = i.Push(ctx, req1)
require.NoError(t, err, "First sample push should succeed")

// Now try to push a sample with older timestamp with discardOutOfOrder=true
// This should be discarded because DiscardOutOfOrder is true
req2 := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 50, TimestampMs: olderTime}},
nil, nil, cortexpb.RULE)
req2.DiscardOutOfOrder = true

_, _ = i.Push(ctx, req2)

// Query back the data to ensure only the first (current time) sample was stored
s := &mockQueryStreamServer{ctx: ctx}
err = i.QueryStream(&client.QueryRequest{
StartTimestampMs: olderTime - 1000,
EndTimestampMs: currentTime + 1000,
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: labels.MetricName, Value: "test_metric"},
},
}, s)
require.NoError(t, err)

// Verify we only have one series with one sample (the current time sample)
require.Len(t, s.series, 1, "Should have exactly one series")

// Convert chunks to samples to verify content
series := s.series[0]
require.Len(t, series.Chunks, 1, "Should have exactly one chunk")

chunk := series.Chunks[0]
chunkData, err := chunkenc.FromData(chunkenc.EncXOR, chunk.Data)
require.NoError(t, err)

iter := chunkData.Iterator(nil)
sampleCount := 0
for iter.Next() != chunkenc.ValNone {
ts, val := iter.At()
require.Equal(t, currentTime, ts, "Sample timestamp should match current time")
require.Equal(t, 100.0, val, "Sample value should match first push")
sampleCount++
}
require.NoError(t, iter.Err())
require.Equal(t, 1, sampleCount, "Should have exactly one sample stored")
}
11 changes: 10 additions & 1 deletion pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type PusherAppender struct {
histogramLabels []labels.Labels
histograms []cortexpb.Histogram
userID string
opts *storage.AppendOptions
}

func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
Expand All @@ -73,7 +74,9 @@ func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v
return 0, nil
}

func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) {}
func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) {
a.opts = opts
}

func (a *PusherAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// AppendHistogramCTZeroSample is a no-op for PusherAppender as it happens during scrape time only.
Expand All @@ -94,6 +97,12 @@ func (a *PusherAppender) Commit() error {

req := cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE)
req.AddHistogramTimeSeries(a.histogramLabels, a.histograms)

// Set DiscardOutOfOrder flag if requested via AppendOptions
if a.opts != nil && a.opts.DiscardOutOfOrder {
req.DiscardOutOfOrder = true
}

// Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push.
// We shouldn't call client.ReuseSlice here.
_, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), req)
Expand Down
24 changes: 24 additions & 0 deletions pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -413,3 +414,26 @@ func TestRecordAndReportRuleQueryMetrics(t *testing.T) {
require.Equal(t, testutil.ToFloat64(metrics.RulerQueryChunkBytes.WithLabelValues("userID")), float64(10))
require.Equal(t, testutil.ToFloat64(metrics.RulerQueryDataBytes.WithLabelValues("userID")), float64(14))
}
func TestPusherAppender_Commit_WithDiscardOutOfOrder(t *testing.T) {
pusher := &fakePusher{response: &cortexpb.WriteResponse{}}
counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"})

appender := &PusherAppender{
ctx: context.Background(),
pusher: pusher,
userID: "test-user",
totalWrites: counter,
failedWrites: counter,
labels: []labels.Labels{labels.FromStrings(labels.MetricName, "test_metric")},
samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}},
}

appender.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})

err := appender.Commit()
require.NoError(t, err)

// Verify that DiscardOutOfOrder was set in the WriteRequest
require.NotNil(t, pusher.request, "WriteRequest should have been sent")
require.True(t, pusher.request.DiscardOutOfOrder, "DiscardOutOfOrder should be true in WriteRequest")
}