Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
* [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246
* [ENHANCEMENT] Distributor: Validate metric name before removing empty labels. #7253
* [ENHANCEMENT] discard ooo samples in some special cases #7227
Comment thread
Shvejan marked this conversation as resolved.
Outdated
* [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
Expand Down
241 changes: 146 additions & 95 deletions pkg/cortexpb/cortex.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/cortexpb/cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ message WriteRequest {

bool skip_label_name_validation = 1000; //set intentionally high to keep WriteRequest compatible with upstream Prometheus
MessageWithBufRef Ref = 1001 [(gogoproto.embed) = true, (gogoproto.customtype) = "MessageWithBufRef", (gogoproto.nullable) = false];
// When true, indicates that out-of-order samples should be discarded even if OOO is enabled.
bool discard_out_of_order = 1002;
}

// refer to https://github.com/prometheus/prometheus/blob/v3.5.0/prompb/io/prometheus/write/v2/types.proto
Expand Down
1 change: 1 addition & 0 deletions pkg/cortexpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func ReuseWriteRequest(req *PreallocWriteRequest) {
req.Source = 0
req.Metadata = nil
req.Timeseries = nil
req.DiscardOutOfOrder = false
writeRequestPool.Put(req)
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, s
}
}

return d.send(localCtx, ingester, timeseries, metadata, req.Source)
return d.send(localCtx, ingester, timeseries, metadata, req.Source, req.DiscardOutOfOrder)
}, func() {
cortexpb.ReuseSlice(req.Timeseries)
req.Free()
Expand Down Expand Up @@ -1230,7 +1230,7 @@ func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {
})
}

func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.SourceEnum) error {
func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.SourceEnum, discardOutOfOrder bool) error {
Comment thread
Shvejan marked this conversation as resolved.
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return err
Expand All @@ -1248,16 +1248,18 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time

if d.cfg.UseStreamPush {
req := &cortexpb.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
Source: source,
Timeseries: timeseries,
Metadata: metadata,
Source: source,
DiscardOutOfOrder: discardOutOfOrder,
}
_, err = c.PushStreamConnection(ctx, req)
} else {
req := cortexpb.PreallocWriteRequestFromPool()
req.Timeseries = timeseries
req.Metadata = metadata
req.Source = source
req.DiscardOutOfOrder = discardOutOfOrder

_, err = c.PushPreAlloc(ctx, req)

Expand Down
7 changes: 7 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

// Walk the samples, appending them to the users database
app := db.Appender(ctx).(extendedAppender)

// Even when OOO is enabled globally, we want to reject OOO samples in some cases.
// prometheus implementation: https://github.com/prometheus/prometheus/pull/14710
if req.DiscardOutOfOrder {
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
}

var newSeries []labels.Labels

for _, ts := range req.Timeseries {
Expand Down
205 changes: 205 additions & 0 deletions pkg/ingester/ingester_ooo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package ingester
Comment thread
Shvejan marked this conversation as resolved.
Outdated

import (
"context"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/test"
)

// mockAppender implements the extendedAppender interface for testing
type mockAppender struct {
storage.Appender
lastOptions *storage.AppendOptions
}

func (m *mockAppender) SetOptions(opts *storage.AppendOptions) {
m.lastOptions = opts
}

func TestIngester_Push_DiscardOutOfOrder_True(t *testing.T) {
req := &cortexpb.WriteRequest{
Source: cortexpb.RULE,
DiscardOutOfOrder: true,
Timeseries: []cortexpb.PreallocTimeseries{},
}

assert.True(t, req.DiscardOutOfOrder, "DiscardOutOfOrder should be true")
assert.True(t, req.GetDiscardOutOfOrder(), "GetDiscardOutOfOrder should return true")
Comment thread
Shvejan marked this conversation as resolved.
Outdated
}

func TestIngester_Push_DiscardOutOfOrder_Default(t *testing.T) {
Comment thread
Shvejan marked this conversation as resolved.
Outdated
// Create a WriteRequest without setting DiscardOutOfOrder
req := &cortexpb.WriteRequest{
Source: cortexpb.API,
Timeseries: []cortexpb.PreallocTimeseries{},
}

// Verify the default value is false
assert.False(t, req.DiscardOutOfOrder, "DiscardOutOfOrder should default to false")
assert.False(t, req.GetDiscardOutOfOrder(), "GetDiscardOutOfOrder should return false by default")
}

func TestIngester_WriteRequest_MultipleScenarios(t *testing.T) {
Comment thread
Shvejan marked this conversation as resolved.
Outdated
scenarios := []struct {
name string
setupReq func() *cortexpb.WriteRequest
expectOpts bool
description string
}{
{
name: "Stale marker during rule migration",
setupReq: func() *cortexpb.WriteRequest {
return &cortexpb.WriteRequest{
Source: cortexpb.RULE,
DiscardOutOfOrder: true,
}
},
expectOpts: true,
description: "Should set appender options to discard OOO",
},
{
name: "Normal rule evaluation",
setupReq: func() *cortexpb.WriteRequest {
return &cortexpb.WriteRequest{
Source: cortexpb.RULE,
DiscardOutOfOrder: false,
}
},
expectOpts: false,
description: "Should not set appender options",
},
{
name: "API write request",
setupReq: func() *cortexpb.WriteRequest {
return &cortexpb.WriteRequest{
Source: cortexpb.API,
DiscardOutOfOrder: false,
}
},
expectOpts: false,
description: "API requests should never trigger OOO discard",
},
{
name: "Default values",
setupReq: func() *cortexpb.WriteRequest {
return &cortexpb.WriteRequest{}
},
expectOpts: false,
description: "Default values should not trigger OOO discard",
},
}

for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
req := scenario.setupReq()
mock := &mockAppender{}

// Simulate the ingester logic
if req.DiscardOutOfOrder {
mock.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
}

// Verify expectations
if scenario.expectOpts {
require.NotNil(t, mock.lastOptions)
assert.True(t, mock.lastOptions.DiscardOutOfOrder)
}
})
}
}

func TestIngester_DiscardOutOfOrderFlagIngegrationTest(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

limits := defaultLimitsTestConfig()
limits.EnableNativeHistograms = true
limits.OutOfOrderTimeWindow = model.Duration(60 * time.Minute)

i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any {
return i.lifecycler.GetState()
})

ctx := user.InjectOrgID(context.Background(), "test-user")

// Create labels for our test metric
metricLabels := labels.FromStrings("__name__", "test_metric", "job", "test")

currentTime := time.Now().UnixMilli()
olderTime := currentTime - 60000 // 1 minute earlier (within OOO window)

// First, push a sample with current timestamp with discardOutOfOrder=true
req1 := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 100, TimestampMs: currentTime}},
nil, nil, cortexpb.RULE)
req1.DiscardOutOfOrder = true

_, err = i.Push(ctx, req1)
require.NoError(t, err, "First sample push should succeed")

// Now try to push a sample with older timestamp with discardOutOfOrder=true
// This should be discarded because DiscardOutOfOrder is true
req2 := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 50, TimestampMs: olderTime}},
nil, nil, cortexpb.RULE)
req2.DiscardOutOfOrder = true

_, _ = i.Push(ctx, req2)

// Query back the data to ensure only the first (current time) sample was stored
s := &mockQueryStreamServer{ctx: ctx}
err = i.QueryStream(&client.QueryRequest{
StartTimestampMs: olderTime - 1000,
EndTimestampMs: currentTime + 1000,
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: "__name__", Value: "test_metric"},
},
}, s)
require.NoError(t, err)

// Verify we only have one series with one sample (the current time sample)
require.Len(t, s.series, 1, "Should have exactly one series")

// Convert chunks to samples to verify content
series := s.series[0]
require.Len(t, series.Chunks, 1, "Should have exactly one chunk")

chunk := series.Chunks[0]
chunkData, err := chunkenc.FromData(chunkenc.EncXOR, chunk.Data)
require.NoError(t, err)

iter := chunkData.Iterator(nil)
sampleCount := 0
for iter.Next() != chunkenc.ValNone {
ts, val := iter.At()
require.Equal(t, currentTime, ts, "Sample timestamp should match current time")
require.Equal(t, 100.0, val, "Sample value should match first push")
sampleCount++
}
require.NoError(t, iter.Err())
require.Equal(t, 1, sampleCount, "Should have exactly one sample stored")
}
11 changes: 10 additions & 1 deletion pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type PusherAppender struct {
histogramLabels []labels.Labels
histograms []cortexpb.Histogram
userID string
opts *storage.AppendOptions
}

func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
Expand All @@ -73,7 +74,9 @@ func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v
return 0, nil
}

func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) {}
func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) {
a.opts = opts
}

func (a *PusherAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// AppendHistogramCTZeroSample is a no-op for PusherAppender as it happens during scrape time only.
Expand All @@ -94,6 +97,12 @@ func (a *PusherAppender) Commit() error {

req := cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE)
req.AddHistogramTimeSeries(a.histogramLabels, a.histograms)

// Set DiscardOutOfOrder flag if requested via AppendOptions
if a.opts != nil && a.opts.DiscardOutOfOrder {
req.DiscardOutOfOrder = true
}

// Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push.
// We shouldn't call client.ReuseSlice here.
_, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), req)
Expand Down
Loading